adapters: postgres-output: support parallel worker threads#5712
adapters: postgres-output: support parallel worker threads#5712
Conversation
Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
Adds support for multiple worker threads in the postgres output connector. The postgres output endpoint spawns n (default: 4) PostgresWorkers (a postgres worker is the same as the old PostgresOutputEndpoint), and then each worker is passed a SplitCursor to process. Each worker has its own retry mechanism, and each worker performs its own transaction on the table, so, if a worker fails only this fraction of the batch will not be committed to Postgres. Workers are created at the start, and then sent BatchStart, BatchEnd, Encode and Shutdown messages. Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
mythical-fred
left a comment
There was a problem hiding this comment.
Three issues before this merges.
|
|
||
| fn default_writer_threads() -> usize { | ||
| 4 | ||
| } |
There was a problem hiding this comment.
Default of 4 is a behavioral breaking change for existing users. A pipeline that worked fine with 1 connection now silently opens 4. That could trip Postgres connection limits or surprise operators.
Default should be 1 (existing behavior). If you want 4 as the recommended value, put it in the docs and let users opt in — or add a changelog entry and call it a deliberate default change.
| } | ||
| self.retry_connecting_with_backoff(); | ||
| } | ||
| match self.broadcast_and_collect(BroadcastCommand::BatchEnd) { |
There was a problem hiding this comment.
In cdc mode, each worker runs an independent transaction. If some commit and one fails, those rows are permanent. On retry, the plain INSERTs re-run with no conflict resolution, producing duplicates or errors. Materialized mode is safe (upserts are idempotent). Either restrict threads > 1 to materialized mode only, or document that cdc mode requires threads = 1.
| /// | ||
| /// Default: 4 | ||
| #[serde(default = "default_writer_threads")] | ||
| #[schema(default = default_writer_threads)] |
There was a problem hiding this comment.
I thought utoipa inherited serde default annotations. Could you please check?
There was a problem hiding this comment.
Rechecked, the default field in the openapi spec doesn't get populated without the schema annotation.
|
|
||
| fn default_writer_threads() -> usize { | ||
| 4 | ||
| } |
Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
Signed-off-by: feldera-bot <feldera-bot@feldera.com>

Describe Manual Test Plan
Needs some more testing before I am comfortable merging this.
Added some tests and benchmarks as well. One of the simpler tests is to run a pipeline as follows:
And at times stop and start the postgres container.
Checklist