Skip to content

adapters: postgres-output: support parallel worker threads#5712

Merged
abhizer merged 4 commits intomainfrom
postgres-parallel-output
Mar 3, 2026
Merged

adapters: postgres-output: support parallel worker threads#5712
abhizer merged 4 commits intomainfrom
postgres-parallel-output

Conversation

@abhizer
Copy link
Contributor

@abhizer abhizer commented Feb 27, 2026

Describe Manual Test Plan

Needs some more testing before I am comfortable merging this.

Added some tests and benchmarks as well. One of the simpler tests is to run a pipeline as follows:

create table t0 (id int, s varchar) with (
  'connectors' = '[{
    "transport": {
      "name": "datagen",
      "config": {
        "plan": [{
          "rate": 1
        }]
      }
    }
  }]'
);

create materialized view v1 with (
    'connectors' = '[{
        "index": "v1_idx",
        "transport": {
            "name": "postgres_output",
            "config": {
                "uri": "postgres://postgres:password@localhost:5432/postgres",
                "table": "feldera_out"
            }
        }
    }]'
) as select * from t0;

create index v1_idx on v1(id);

And at times stop and start the postgres container.

Checklist

  • Unit tests added/updated
  • Integration tests added/updated
  • Documentation updated
  • Changelog updated

Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
Adds support for multiple worker threads in the postgres output connector.
The postgres output endpoint spawns n (default: 4) PostgresWorkers
(a postgres worker is the same as the old PostgresOutputEndpoint), and then
each worker is passed a SplitCursor to process.

Each worker has its own retry mechanism, and each worker performs its
own transaction on the table, so, if a worker fails only this fraction
of the batch will not be committed to Postgres.

Workers are created at the start, and then sent BatchStart, BatchEnd,
Encode and Shutdown messages.

Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
@abhizer
Copy link
Contributor Author

abhizer commented Feb 27, 2026

Screenshot 2026-02-27 at 13 54 43

Copy link
Collaborator

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Three issues before this merges.


fn default_writer_threads() -> usize {
4
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default of 4 is a behavioral breaking change for existing users. A pipeline that worked fine with 1 connection now silently opens 4. That could trip Postgres connection limits or surprise operators.

Default should be 1 (existing behavior). If you want 4 as the recommended value, put it in the docs and let users opt in — or add a changelog entry and call it a deliberate default change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point.

}
self.retry_connecting_with_backoff();
}
match self.broadcast_and_collect(BroadcastCommand::BatchEnd) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In cdc mode, each worker runs an independent transaction. If some commit and one fails, those rows are permanent. On retry, the plain INSERTs re-run with no conflict resolution, producing duplicates or errors. Materialized mode is safe (upserts are idempotent). Either restrict threads > 1 to materialized mode only, or document that cdc mode requires threads = 1.

///
/// Default: 4
#[serde(default = "default_writer_threads")]
#[schema(default = default_writer_threads)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought utoipa inherited serde default annotations. Could you please check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rechecked, the default field in the openapi spec doesn't get populated without the schema annotation.


fn default_writer_threads() -> usize {
4
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point.

abhizer and others added 2 commits March 3, 2026 13:00
Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
Signed-off-by: feldera-bot <feldera-bot@feldera.com>
@abhizer abhizer added this pull request to the merge queue Mar 3, 2026
@abhizer abhizer removed this pull request from the merge queue due to a manual request Mar 3, 2026
@abhizer abhizer added this pull request to the merge queue Mar 3, 2026
Merged via the queue into main with commit 225111c Mar 3, 2026
1 check passed
@abhizer abhizer deleted the postgres-parallel-output branch March 3, 2026 11:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants