Datagen fixes for better ingestions/higher throughput#5747
Conversation
This fixes an issue where the datagen main thread flushed unstaged batches which took too long, hence starving the workers. Another unfortunate problem is that these records were considered buffered by the pipeline so it wasn't obvious. I haven't fixed that problem because I wouldn't know where to fix it. Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
|
Didn't you write this code? |
| } | ||
| } | ||
|
|
||
| impl Hasher for HashBytesRecorder { |
There was a problem hiding this comment.
so the trick is to hash and stage in one single pass?
There was a problem hiding this comment.
from what I can tell the trick is to move the parsing into the worker threads... I just did what leonid told me is the solution
not this piece |
ryzhyk
left a comment
There was a problem hiding this comment.
Another unfortunate problem is that these records were considered buffered by the pipeline
The semantics of buffered is that this data has been parsed by the input connector and is ready for ingestion, so this is the intended behavior. We just don't distinguish between staged and unstaged buffers, and there's probably no reason not to implement staging in all connectors.
mythical-fred
left a comment
There was a problem hiding this comment.
LGTM — moving staging and hashing into worker threads is a clean win. One thing worth verifying given your low-confidence note:
The old code used take_some to split a completion at the transaction boundary. The new code avoids splitting entirely: if total.records + flushed.records > n it pushes the whole completion back. But the total.records > 0 guard means the first completion in a transaction is always taken whole, even if it exceeds n. So with transaction_size = k and batch sizes > k, the first transaction will be oversized. For datagen/benchmarking this is almost certainly fine, just worth knowing.
Describe Manual Test Plan
I ran datagen and it reaches higher throughput and needs many more datagen workers to cause buffering. But I have low confidence in the changes because I'm not very familiar with this code.