[datagen] Support connector-requested transactions.#5679
Conversation
mihaibudiu
left a comment
There was a problem hiding this comment.
What if you set a transaction of 100 and a limit of 2?
You end up with a transaction of 2. |
| serde_json::to_value(metadata).unwrap(), | ||
| hasher.map(|h| h.finish()), | ||
| ); | ||
| if !started_transaction |
There was a problem hiding this comment.
I'd have expected to read if started_transaction here to commit when something is outstanding?
There was a problem hiding this comment.
what happens when datagen is done but there is a transaction is ongoing (it would be good to call commit then) it might already do that, I just can't tell from the logic that's whats happening
There was a problem hiding this comment.
I think this is the same question that I asked
There was a problem hiding this comment.
This variable is true if a transaction was started in the same iteration through the loop. I don't want it to both start and finish a transaction in a single iteration because the adapters code treats that as a no-op (no transaction really ever gets initiated) so I use this variable to keep that from happening.
I'll add a comment.
I used this for testing transactions. Signed-off-by: Ben Pfaff <blp@feldera.com>
Signed-off-by: Ben Pfaff <blp@feldera.com>
ad2da93 to
a100d30
Compare
mythical-fred
left a comment
There was a problem hiding this comment.
One hard block: the new test_transaction test uses naked multi-second thread::sleep calls to time record generation, which introduces a flakiness risk. See inline comments. The rest of the logic looks sound—I traced the started_transaction / transaction_size invariants, and the EOI commit path is correct.
| mk_pipeline::<TestStruct2, TestStruct2>(config, TestStruct2::schema()).unwrap(); | ||
|
|
||
| // Sleep for 2 seconds, then queue, then verify that a transaction started. | ||
| thread::sleep(Duration::from_secs(2)); |
There was a problem hiding this comment.
Flaky sleep. thread::sleep(Duration::from_secs(2)) is a naked timing-dependent wait. The test then asserts transaction_in_progress is true at line 170, which requires at least one record to have been generated and flushed during that sleep. On a loaded CI runner where the datagen worker thread hasn't had enough CPU time, 0 records may have been generated and the assertion will fail (no transaction started → transaction_in_progress == false).
The rate: 100 + 2-second window gives a comfortable margin in normal conditions, but bare sleeps don't survive adversarial scheduling—this is exactly the pattern the zero-tolerance flakiness rule exists to catch.
Suggested fix: restructure the test to avoid timing dependence. One clean option is to drop rate and use a small transaction_size with a small limit, so all records are generated instantly and polling loops replace timed sleeps:
// e.g. limit: 5, transaction_size: 10
// → guaranteed no mid-stream commit; all data ready before the first queue call;
// EOI commit happens cleanly without any bare sleep.If the intent is specifically to test mid-stream queuing while data is still arriving, replace the bare sleep with a poll loop that waits until completed is non-empty (or some other observable state), so the test advances as soon as the condition is true rather than relying on wall time.
| } | ||
| assert!(consumer.state().transaction_in_progress); | ||
|
|
||
| // Sleep for 10 seconds, then queue, then verify that the transaction committed. |
There was a problem hiding this comment.
Misleading comment. "verify that the transaction committed" but the assertion on line 179 checks that transaction_in_progress is still true—the transaction has not committed yet, which is the correct expected behaviour since transaction_size < goal. Should read something like: // verify that the transaction is still in progress (goal not yet reached).
| } | ||
| assert!(consumer.state().transaction_in_progress); | ||
|
|
||
| wait_for_data(endpoint.as_ref(), &consumer); |
There was a problem hiding this comment.
Missing assertion for the EOI commit path. After wait_for_data returns, consumer.eoi() has already been called, which means the EOI commit branch (if transaction_size > 0 { consumer.commit_transaction(); }) has run. This is the only path that commits the transaction under this test config (transaction_size: 2000 > limit: 1000), yet there is no assertion that the transaction was actually committed. Consider adding:
assert!(!consumer.state().transaction_in_progress, "transaction should have been committed at EOI");This turns the test into a genuine end-to-end check of the EOI commit path rather than leaving it implicit.
I used this for testing transactions.
Describe Manual Test Plan
I tested this with a simple pipeline in both single- and multi-host scenarios.
Checklist