Skip to content

Datagen fixes for better ingestions/higher throughput#5747

Merged
gz merged 2 commits intomainfrom
datagen-fix
Mar 4, 2026
Merged

Datagen fixes for better ingestions/higher throughput#5747
gz merged 2 commits intomainfrom
datagen-fix

Conversation

@gz
Copy link
Contributor

@gz gz commented Mar 3, 2026

Describe Manual Test Plan

I ran datagen and it reaches higher throughput and needs many more datagen workers to cause buffering. But I have low confidence in the changes because I'm not very familiar with this code.

gz added 2 commits March 3, 2026 14:49
This fixes an issue where the datagen main thread flushed
unstaged batches which took too long, hence starving the workers.

Another unfortunate problem is that these records were considered
buffered by the pipeline so it wasn't obvious. I haven't fixed that
problem because I wouldn't know where to fix it.

Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
@gz gz requested review from blp and ryzhyk and removed request for ryzhyk March 3, 2026 23:58
@gz gz changed the title Datagen fix Datagen fixes for better ingestions/higher throughput Mar 4, 2026
@mihaibudiu
Copy link
Contributor

Didn't you write this code?
Is it still parsing the config for every record emitted?

}
}

impl Hasher for HashBytesRecorder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so the trick is to hash and stage in one single pass?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from what I can tell the trick is to move the parsing into the worker threads... I just did what leonid told me is the solution

@gz
Copy link
Contributor Author

gz commented Mar 4, 2026

Didn't you write this code?

not this piece

Copy link
Contributor

@ryzhyk ryzhyk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another unfortunate problem is that these records were considered buffered by the pipeline

The semantics of buffered is that this data has been parsed by the input connector and is ready for ingestion, so this is the intended behavior. We just don't distinguish between staged and unstaged buffers, and there's probably no reason not to implement staging in all connectors.

Copy link
Collaborator

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM — moving staging and hashing into worker threads is a clean win. One thing worth verifying given your low-confidence note:

The old code used take_some to split a completion at the transaction boundary. The new code avoids splitting entirely: if total.records + flushed.records > n it pushes the whole completion back. But the total.records > 0 guard means the first completion in a transaction is always taken whole, even if it exceeds n. So with transaction_size = k and batch sizes > k, the first transaction will be oversized. For datagen/benchmarking this is almost certainly fine, just worth knowing.

@gz gz added this pull request to the merge queue Mar 4, 2026
Merged via the queue into main with commit 772ac9b Mar 4, 2026
7 checks passed
@gz gz deleted the datagen-fix branch March 4, 2026 05:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants