Skip to content

[datagen] Support connector-requested transactions.#5679

Merged
blp merged 2 commits intomainfrom
datagen-transactions
Feb 24, 2026
Merged

[datagen] Support connector-requested transactions.#5679
blp merged 2 commits intomainfrom
datagen-transactions

Conversation

@blp
Copy link
Member

@blp blp commented Feb 23, 2026

I used this for testing transactions.

Describe Manual Test Plan

I tested this with a simple pipeline in both single- and multi-host scenarios.

Checklist

  • Unit tests added/updated
  • Integration tests added/updated
  • Documentation updated
  • Changelog updated

@blp blp requested a review from gz February 23, 2026 22:51
@blp blp self-assigned this Feb 23, 2026
@blp blp added documentation Improvements or additions to documentation connectors Issues related to the adapters/connectors crate QA Testing and quality assurance rust Pull requests that update Rust code multihost Related to multihost or distributed pipelines labels Feb 23, 2026
Copy link
Contributor

@mihaibudiu mihaibudiu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if you set a transaction of 100 and a limit of 2?

@blp
Copy link
Member Author

blp commented Feb 23, 2026

What if you set a transaction of 100 and a limit of 2?

You end up with a transaction of 2.

@blp blp enabled auto-merge February 23, 2026 23:00
@blp blp added this pull request to the merge queue Feb 23, 2026
serde_json::to_value(metadata).unwrap(),
hasher.map(|h| h.finish()),
);
if !started_transaction
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd have expected to read if started_transaction here to commit when something is outstanding?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens when datagen is done but there is a transaction is ongoing (it would be good to call commit then) it might already do that, I just can't tell from the logic that's whats happening

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the same question that I asked

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable is true if a transaction was started in the same iteration through the loop. I don't want it to both start and finish a transaction in a single iteration because the adapters code treats that as a no-op (no transaction really ever gets initiated) so I use this variable to keep that from happening.

I'll add a comment.

@blp blp removed this pull request from the merge queue due to a manual request Feb 23, 2026
blp added 2 commits February 23, 2026 16:09
I used this for testing transactions.

Signed-off-by: Ben Pfaff <blp@feldera.com>
Signed-off-by: Ben Pfaff <blp@feldera.com>
@blp blp force-pushed the datagen-transactions branch from ad2da93 to a100d30 Compare February 24, 2026 00:09
@blp blp enabled auto-merge February 24, 2026 00:10
@blp blp added this pull request to the merge queue Feb 24, 2026
Copy link
Collaborator

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One hard block: the new test_transaction test uses naked multi-second thread::sleep calls to time record generation, which introduces a flakiness risk. See inline comments. The rest of the logic looks sound—I traced the started_transaction / transaction_size invariants, and the EOI commit path is correct.

mk_pipeline::<TestStruct2, TestStruct2>(config, TestStruct2::schema()).unwrap();

// Sleep for 2 seconds, then queue, then verify that a transaction started.
thread::sleep(Duration::from_secs(2));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flaky sleep. thread::sleep(Duration::from_secs(2)) is a naked timing-dependent wait. The test then asserts transaction_in_progress is true at line 170, which requires at least one record to have been generated and flushed during that sleep. On a loaded CI runner where the datagen worker thread hasn't had enough CPU time, 0 records may have been generated and the assertion will fail (no transaction started → transaction_in_progress == false).

The rate: 100 + 2-second window gives a comfortable margin in normal conditions, but bare sleeps don't survive adversarial scheduling—this is exactly the pattern the zero-tolerance flakiness rule exists to catch.

Suggested fix: restructure the test to avoid timing dependence. One clean option is to drop rate and use a small transaction_size with a small limit, so all records are generated instantly and polling loops replace timed sleeps:

// e.g. limit: 5, transaction_size: 10
// → guaranteed no mid-stream commit; all data ready before the first queue call;
//   EOI commit happens cleanly without any bare sleep.

If the intent is specifically to test mid-stream queuing while data is still arriving, replace the bare sleep with a poll loop that waits until completed is non-empty (or some other observable state), so the test advances as soon as the condition is true rather than relying on wall time.

}
assert!(consumer.state().transaction_in_progress);

// Sleep for 10 seconds, then queue, then verify that the transaction committed.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Misleading comment. "verify that the transaction committed" but the assertion on line 179 checks that transaction_in_progress is still true—the transaction has not committed yet, which is the correct expected behaviour since transaction_size < goal. Should read something like: // verify that the transaction is still in progress (goal not yet reached).

}
assert!(consumer.state().transaction_in_progress);

wait_for_data(endpoint.as_ref(), &consumer);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing assertion for the EOI commit path. After wait_for_data returns, consumer.eoi() has already been called, which means the EOI commit branch (if transaction_size > 0 { consumer.commit_transaction(); }) has run. This is the only path that commits the transaction under this test config (transaction_size: 2000 > limit: 1000), yet there is no assertion that the transaction was actually committed. Consider adding:

assert!(!consumer.state().transaction_in_progress, "transaction should have been committed at EOI");

This turns the test into a genuine end-to-end check of the EOI commit path rather than leaving it implicit.

Merged via the queue into main with commit 3532660 Feb 24, 2026
5 checks passed
@blp blp deleted the datagen-transactions branch February 24, 2026 04:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

connectors Issues related to the adapters/connectors crate documentation Improvements or additions to documentation multihost Related to multihost or distributed pipelines QA Testing and quality assurance rust Pull requests that update Rust code

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants