[SQL][connectors] Support for preprocessors in connectors#5681
[SQL][connectors] Support for preprocessors in connectors#5681mihaibudiu wants to merge 6 commits intomainfrom
Conversation
280ee98 to
7963bc4
Compare
|
Here are some log snippets from the pipeline manager logs: |
There was a problem hiding this comment.
Pull request overview
This pull request adds experimental support for user-defined preprocessors in SQL connectors. Preprocessors are Rust components that transform raw data bytes before they reach the format parser, enabling use cases like decryption, decompression, and protocol-specific framing removal.
Changes:
- Introduces new
PreprocessorandPreprocessorFactorytraits in the adapterlib crate with a global registry mechanism - Adds
preprocessorfield to connector configuration with validation in the SQL compiler - Implements preprocessor registration code generation in the Rust backend that auto-registers user-defined preprocessors
- Removes deprecated
--jdbcSourcecompiler option and fixes several documentation typos
Reviewed changes
Copilot reviewed 33 out of 34 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| crates/adapterlib/src/preprocess.rs | New module defining preprocessor traits, error types, and global registry |
| crates/adapterlib/src/format.rs | Implements PreprocessedParser that chains preprocessor with parser |
| crates/adapters/src/preprocess.rs | Defines PassthroughPreprocessor for testing (not registered) |
| crates/adapters/src/controller.rs | Creates and integrates preprocessors into input pipelines |
| crates/feldera-types/src/config.rs | Adds preprocessor field to ConnectorConfig |
| sql-to-dbsp-compiler/.../SqlToRelCompiler.java | Validates preprocessor configuration at compile time |
| sql-to-dbsp-compiler/.../ToRustVisitor.java | Generates preprocessor registration code in compiled Rust output |
| sql-to-dbsp-compiler/.../ProgramMetadata.java | Collects preprocessor names from connector metadata |
| sql-to-dbsp-compiler/.../CompilerOptions.java | Removes deprecated --jdbcSource option |
| python/tests/runtime/test_udp.py | Adds integration test for user-defined preprocessors (with bug) |
| docs.feldera.com/docs/sql/udf.md | Documents preprocessor API, traits, and usage examples |
| sql-to-dbsp-compiler/.../MetadataTests.java | Adds validation and integration tests for preprocessor config |
| crates/adapters/src/controller/test.rs | Adds unit test for passthrough preprocessor |
docs.feldera.com/docs/sql/udf.md
Outdated
| ### Example: passthrough preprocessor | ||
|
|
||
| The following example shows a minimal preprocessor that returns its | ||
| input unchanged but logs a message for every 1M data bytes processd. |
There was a problem hiding this comment.
Typo in comment: "processd" should be "processed".
| input unchanged but logs a message for every 1M data bytes processd. | |
| input unchanged but logs a message for every 1M data bytes processed. |
...ler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/backend/rust/ToRustVisitor.java
Show resolved
Hide resolved
eb78a3f to
66b52d7
Compare
mythical-fred
left a comment
There was a problem hiding this comment.
Two blocking issues; see inline comments.
| } | ||
| } | ||
| } catch (JsonProcessingException e) { | ||
| // Ignore |
There was a problem hiding this comment.
Silent // Ignore on JsonProcessingException is a broken window. If the connector JSON is malformed, preprocessors silently fail to register — the pipeline starts, runs without the preprocessor, and the user gets no diagnostic.
At minimum log a warning. Better: propagate the error so the compiler can reject malformed connector config at compile time rather than silently producing wrong runtime behaviour.
...ler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/backend/rust/ToRustVisitor.java
Outdated
Show resolved
Hide resolved
gz
left a comment
There was a problem hiding this comment.
why is this the right design? why does this need to be part of the pipeline?
python/tests/runtime/test_udp.py
Outdated
| from feldera.testutils import FELDERA_TEST_NUM_WORKERS, FELDERA_TEST_NUM_HOSTS | ||
|
|
||
|
|
||
| # TEst user-defined preprocessor |
There was a problem hiding this comment.
| # TEst user-defined preprocessor | |
| # Test user-defined preprocessor |
ryzhyk
left a comment
There was a problem hiding this comment.
Thanks! Looks nice and simple. I'm not sure this covers all use cases, e.g., can a preprocessor have checkpointable state? But starting simple is probably a good idea.
Important question: will secret resolution work for this new config field? cc @snkas
| } => { | ||
| write!( | ||
| f, | ||
| "Error creating preprocessor for endpoint {endpoint_name}: {error}" |
There was a problem hiding this comment.
Does the preprocessor itself have a name too?
There was a problem hiding this comment.
I don't really see the need for a preprocessor name, unless somehow you could stack multiple copies of the same preprocessor
crates/adapterlib/src/preprocess.rs
Outdated
|
|
||
| /// Get the unique instance of the PreprocessorRegistry | ||
| fn get_instance() -> &'static Mutex<PreprocessorRegistry> { | ||
| static INSTANCE: OnceLock<Mutex<PreprocessorRegistry>> = OnceLock::new(); |
There was a problem hiding this comment.
That's a good point, but this would require creating the registry inside the runtime. I haven't read the rest of the PR yet, so not sure how it's done yet.
There are folks who don't want to build sidecars or have any other kind of a middle man between the data stream and the pipeline. |
a01379f to
3ef39cf
Compare
|
I have addressed all Rust-related comments, except moving the registry to the Runtime (or Controller). I will need some advice to do that. I haven't addressed two Java comments because I want to rebase on #5683 to improve the JSON-related errors. |
mythical-fred
left a comment
There was a problem hiding this comment.
Good progress. Blocker #1 (passthrough factory not registered) is fixed.
Blocker #2 still open — type coverage.
The Python integration test still uses CREATE TABLE t (i INT). Per the rules for connector changes, the integration test needs to cover the full type matrix: integer variants, float, decimal, boolean, varchar, varbinary, date/time/timestamp, nullable. A preprocessor is data-type-agnostic in principle, but connector integration tests serve as a regression net for the entire pipeline. A single INT column does not provide that coverage.
Understand the Java comments are pending the #5683 rebase — that is fine, not blocking anything here.
|
What is the user flow in terms of defining preprocessors? It is part of the connector configuration I see, but then @ryzhyk Per https://docs.feldera.com/connectors/secret-references secrets are resolved for |
|
There is a python test case which shows the intended user flow. Besides connector configuration you implement two or three traits in udf.rs. I don't have a test with secrets. |
|
Will the template also show up in In the Python tests, it's configured as: Can the preprocessor configuration become strongly typed? Or if there is currently nothing to configure, then it can be left out? |
|
This part of the schema is fixed. Both |
1bd1b28 to
a2db4af
Compare
|
I have pushed one more commit which fixes most of the remaining comments. The only thing I don't know how to do is to put the registry in the Runtime or Controller. I would appreciate a little help in figuring this out. |
|
I plan to squash the commits before I merge, but I left them this way to show what's new |
mythical-fred
left a comment
There was a problem hiding this comment.
Both blockers resolved: passthrough factory registered, and Python test now covers the full type matrix (INT, TINYINT, SMALLINT, BIGINT, REAL, DOUBLE, VARBINARY, DATE, TIME, TIMESTAMP, ARRAY, BOOLEAN, DECIMAL, VARCHAR). The JSON pointer error location is a nice improvement.
If using rebase merges: "Address most comments from first round of reviews" should be folded in before landing.
|
I think in general you may not be able to use the parser splitter for the preprocessor. There should be a way to say: no splitter, and don't use the parser splitter. |
The preprocessor exposes its own splitter to the transport connector. Internally, it uses the parser's splitter, which respects the parser's framing. |
a2db4af to
544aeac
Compare
| fn clear(&mut self); | ||
| } | ||
|
|
||
| /// Helper for breaking a stream of data into groups of records using a |
There was a problem hiding this comment.
This code was moved unchanged from the adapters crate
|
@ryzhyk maybe you can review the last commit for the splitter implementation in PreprocessedParser. Note that all tests with decryption were produced by Claude. |
| count: Arc<Mutex<u64>>, | ||
| } | ||
|
|
||
| impl Preprocessor for LoggerPreprocessor { |
There was a problem hiding this comment.
a better example than just couting bytes might be to use snappy library to decompress the data that comes in because it's an example that actually transforms the input
Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
crates/adapterlib/src/format.rs
Outdated
| fn len(&self) -> BufferSize { | ||
| let mut size = BufferSize::empty(); | ||
| for v in self.iter() { | ||
| size.add_assign(v.len()); |
There was a problem hiding this comment.
| size.add_assign(v.len()); | |
| size += v.len(); |
| for v in self.iter_mut() { | ||
| if remaining <= 0 { | ||
| break; | ||
| } | ||
| let buf = v.take_some(remaining); | ||
| if let Some(ib) = buf { | ||
| let len = ib.len().records; | ||
| remaining = remaining.saturating_sub(len); | ||
| result.push(ib); | ||
| } | ||
| } |
There was a problem hiding this comment.
Most of the time, this loop is going to reduce some (or all) of the inner input buffers to empty, and I would argue that in that case it should remove them entirely from the vector.
| for chunk in chunks { | ||
| let (mut buffer, errors) = details.parser.parse(&chunk, None); | ||
| details.consumer.buffered(buffer.len()); | ||
| let len = InputBuffer::len(&buffer); |
There was a problem hiding this comment.
| let len = InputBuffer::len(&buffer); | |
| let len = buffer.len(); |
| for chunk in chunks { | ||
| let (mut buffer, errors) = details.parser.parse(&chunk, None); | ||
| details.consumer.buffered(buffer.len()); | ||
| let len = InputBuffer::len(&buffer); |
There was a problem hiding this comment.
| let len = InputBuffer::len(&buffer); | |
| let len = buffer.len(); |
unless there's something weird going on that I don't recognize
| --jdbcSource | ||
| Connection string to a database that contains table metadata | ||
| Default: <empty string> |
3e94576 to
5ac34a1
Compare
…tagram preprocessors are supported with fault-tolerance Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
Fixes #5608
See the issue for a description of the architecture
Describe Manual Test Plan
I have used the web console to inspect the logs of the new python test, showing that indeed the preprocessor is installed and executed.
Checklist
Breaking Changes?
None, but this introduces an experimental API