Skip to content

[SQL][connectors] Support for preprocessors in connectors#5681

Open
mihaibudiu wants to merge 6 commits intomainfrom
preprocessor
Open

[SQL][connectors] Support for preprocessors in connectors#5681
mihaibudiu wants to merge 6 commits intomainfrom
preprocessor

Conversation

@mihaibudiu
Copy link
Contributor

@mihaibudiu mihaibudiu commented Feb 23, 2026

Fixes #5608

See the issue for a description of the architecture

Describe Manual Test Plan

I have used the web console to inspect the logs of the new python test, showing that indeed the preprocessor is installed and executed.

Checklist

  • Unit tests added/updated
  • Integration tests added/updated
  • Documentation updated
  • Changelog updated

Breaking Changes?

None, but this introduces an experimental API

@mihaibudiu
Copy link
Contributor Author

Here are some log snippets from the pipeline manager logs:

[local_test_udps] 2026-02-23T23:26:26.836802Z  INFO feldera_adapterlib::preprocess:  Registering factory for preprocessor 'logger'
[local_test_udps] 2026-02-23T23:26:26.840303Z  INFO dbsp_adapters::controller:  Creating preprocessor for logger
[local_test_udps] 2026-02-23T23:26:26.843035Z  INFO dbsp_adapters::server:  Pipeline initialization complete
[manager] 2026-02-23T23:26:27.832577Z  INFO pipeline_manager::runner::pipeline_automata: Resources transition: provisioning -> provisioned (desired: provisioned) pipeline-id="019c8cd3-4232-7fe0-b9cd-a2eba32e69f2" pipeline-name="local_test_udps"
[manager] 2026-02-23T23:26:27.832608Z  INFO pipeline_manager::runner::pipeline_automata: Runtime transition: (none) -> initializing (desired: paused) pipeline-id="019c8cd3-4232-7fe0-b9cd-a2eba32e69f2" pipeline-name="local_test_udps"
[manager] 2026-02-23T23:26:27.834499Z  INFO pipeline_manager::runner::pipeline_automata: Runtime transition: initializing -> paused (desired: paused) pipeline-id="019c8cd3-4232-7fe0-b9cd-a2eba32e69f2" pipeline-name="local_test_udps"
[local_test_udps] 2026-02-23T23:26:30.884922Z  INFO dbsp_adapters::server:  start: Transitioning from Paused to Running
[manager] 2026-02-23T23:26:30.887240Z  INFO pipeline_manager::runner::pipeline_automata: Runtime transition: paused -> running (desired: running) pipeline-id="019c8cd3-4232-7fe0-b9cd-a2eba32e69f2" pipeline-name="local_test_udps"
[local_test_udps] 2026-02-23T23:26:30.929652Z  INFO feldera_pipe_pipeline_019c8cd3_4232_7fe0_b9cd_a2eba32e69f2_globals::udf:  Processed 1068899 bytes of data log.target="feldera_pipe_pipeline_019c8cd3_4232_7fe0_b9cd_a2eba32e69f2_globals::udf" log.module_path="feldera_pipe_pipeline_019c8cd3_4232_7fe0_b9cd_a2eba32e69f2_globals::udf" log.file="crates/feldera_pipe_pipeline_019c8cd3_4232_7fe0_b9cd_a2eba32e69f2_globals/src/udf.rs" log.line=19
[local_test_udps] 2026-02-23T23:26:30.972396Z  INFO 

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request adds experimental support for user-defined preprocessors in SQL connectors. Preprocessors are Rust components that transform raw data bytes before they reach the format parser, enabling use cases like decryption, decompression, and protocol-specific framing removal.

Changes:

  • Introduces new Preprocessor and PreprocessorFactory traits in the adapterlib crate with a global registry mechanism
  • Adds preprocessor field to connector configuration with validation in the SQL compiler
  • Implements preprocessor registration code generation in the Rust backend that auto-registers user-defined preprocessors
  • Removes deprecated --jdbcSource compiler option and fixes several documentation typos

Reviewed changes

Copilot reviewed 33 out of 34 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
crates/adapterlib/src/preprocess.rs New module defining preprocessor traits, error types, and global registry
crates/adapterlib/src/format.rs Implements PreprocessedParser that chains preprocessor with parser
crates/adapters/src/preprocess.rs Defines PassthroughPreprocessor for testing (not registered)
crates/adapters/src/controller.rs Creates and integrates preprocessors into input pipelines
crates/feldera-types/src/config.rs Adds preprocessor field to ConnectorConfig
sql-to-dbsp-compiler/.../SqlToRelCompiler.java Validates preprocessor configuration at compile time
sql-to-dbsp-compiler/.../ToRustVisitor.java Generates preprocessor registration code in compiled Rust output
sql-to-dbsp-compiler/.../ProgramMetadata.java Collects preprocessor names from connector metadata
sql-to-dbsp-compiler/.../CompilerOptions.java Removes deprecated --jdbcSource option
python/tests/runtime/test_udp.py Adds integration test for user-defined preprocessors (with bug)
docs.feldera.com/docs/sql/udf.md Documents preprocessor API, traits, and usage examples
sql-to-dbsp-compiler/.../MetadataTests.java Adds validation and integration tests for preprocessor config
crates/adapters/src/controller/test.rs Adds unit test for passthrough preprocessor

### Example: passthrough preprocessor

The following example shows a minimal preprocessor that returns its
input unchanged but logs a message for every 1M data bytes processd.
Copy link

Copilot AI Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in comment: "processd" should be "processed".

Suggested change
input unchanged but logs a message for every 1M data bytes processd.
input unchanged but logs a message for every 1M data bytes processed.

Copilot uses AI. Check for mistakes.
Copy link
Collaborator

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two blocking issues; see inline comments.

}
}
} catch (JsonProcessingException e) {
// Ignore
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Silent // Ignore on JsonProcessingException is a broken window. If the connector JSON is malformed, preprocessors silently fail to register — the pipeline starts, runs without the preprocessor, and the user gets no diagnostic.

At minimum log a warning. Better: propagate the error so the compiler can reject malformed connector config at compile time rather than silently producing wrong runtime behaviour.

Copy link
Contributor

@gz gz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this the right design? why does this need to be part of the pipeline?

from feldera.testutils import FELDERA_TEST_NUM_WORKERS, FELDERA_TEST_NUM_HOSTS


# TEst user-defined preprocessor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# TEst user-defined preprocessor
# Test user-defined preprocessor

Copy link
Contributor

@ryzhyk ryzhyk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Looks nice and simple. I'm not sure this covers all use cases, e.g., can a preprocessor have checkpointable state? But starting simple is probably a good idea.

Important question: will secret resolution work for this new config field? cc @snkas

} => {
write!(
f,
"Error creating preprocessor for endpoint {endpoint_name}: {error}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the preprocessor itself have a name too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really see the need for a preprocessor name, unless somehow you could stack multiple copies of the same preprocessor


/// Get the unique instance of the PreprocessorRegistry
fn get_instance() -> &'static Mutex<PreprocessorRegistry> {
static INSTANCE: OnceLock<Mutex<PreprocessorRegistry>> = OnceLock::new();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point, but this would require creating the registry inside the runtime. I haven't read the rest of the PR yet, so not sure how it's done yet.

@ryzhyk
Copy link
Contributor

ryzhyk commented Feb 24, 2026

why is this the right design? why does this need to be part of the pipeline?

There are folks who don't want to build sidecars or have any other kind of a middle man between the data stream and the pipeline.

@mihaibudiu
Copy link
Contributor Author

I have addressed all Rust-related comments, except moving the registry to the Runtime (or Controller). I will need some advice to do that.

I haven't addressed two Java comments because I want to rebase on #5683 to improve the JSON-related errors.

Copy link
Collaborator

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good progress. Blocker #1 (passthrough factory not registered) is fixed.

Blocker #2 still open — type coverage.
The Python integration test still uses CREATE TABLE t (i INT). Per the rules for connector changes, the integration test needs to cover the full type matrix: integer variants, float, decimal, boolean, varchar, varbinary, date/time/timestamp, nullable. A preprocessor is data-type-agnostic in principle, but connector integration tests serve as a regression net for the entire pipeline. A single INT column does not provide that coverage.

Understand the Java comments are pending the #5683 rebase — that is fine, not blocking anything here.

@snkas
Copy link
Contributor

snkas commented Feb 25, 2026

What is the user flow in terms of defining preprocessors? It is part of the connector configuration I see, but then PreprocessorConfig has only a name and an opaque Value.

@ryzhyk Per https://docs.feldera.com/connectors/secret-references secrets are resolved for transport.config and format.config, as such preprocessor would not be resolved out of the box.

@mihaibudiu
Copy link
Contributor Author

There is a python test case which shows the intended user flow. Besides connector configuration you implement two or three traits in udf.rs. I don't have a test with secrets.

@snkas
Copy link
Contributor

snkas commented Feb 25, 2026

Will the template also show up in udf_stubs ? It seems the user-defined prepreprocessor also has a lot of boiler plate that might make it more difficult to remember (e.g., LoggerPreprocessorFactory).

In the Python tests, it's configured as:

   "preprocessor": [{
      "name": "logger",
      "config": {}
   }]

Can the preprocessor configuration become strongly typed? Or if there is currently nothing to configure, then it can be left out?

@mihaibudiu
Copy link
Contributor Author

This part of the schema is fixed. Both name and config are designed to be mandatory.
The configuration contents really depends on the type of preprocessor; each of them can have very different needs, user-defined, so it cannot be strongly-typed.

@mihaibudiu
Copy link
Contributor Author

I have pushed one more commit which fixes most of the remaining comments.

The only thing I don't know how to do is to put the registry in the Runtime or Controller. I would appreciate a little help in figuring this out.

@mihaibudiu
Copy link
Contributor Author

I plan to squash the commits before I merge, but I left them this way to show what's new

Copy link
Collaborator

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both blockers resolved: passthrough factory registered, and Python test now covers the full type matrix (INT, TINYINT, SMALLINT, BIGINT, REAL, DOUBLE, VARBINARY, DATE, TIME, TIMESTAMP, ARRAY, BOOLEAN, DECIMAL, VARCHAR). The JSON pointer error location is a nice improvement.

If using rebase merges: "Address most comments from first round of reviews" should be folded in before landing.

@mihaibudiu
Copy link
Contributor Author

I think in general you may not be able to use the parser splitter for the preprocessor. There should be a way to say: no splitter, and don't use the parser splitter.

@ryzhyk
Copy link
Contributor

ryzhyk commented Feb 27, 2026

I think in general you may not be able to use the parser splitter for the preprocessor. There should be a way to say: no splitter, and don't use the parser splitter.

The preprocessor exposes its own splitter to the transport connector. Internally, it uses the parser's splitter, which respects the parser's framing.

fn clear(&mut self);
}

/// Helper for breaking a stream of data into groups of records using a
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code was moved unchanged from the adapters crate

@mihaibudiu
Copy link
Contributor Author

@ryzhyk maybe you can review the last commit for the splitter implementation in PreprocessedParser.

Note that all tests with decryption were produced by Claude.
They do not really cover the code completely, since there is no splitting, but I plan to produce some tests that do as well.

count: Arc<Mutex<u64>>,
}

impl Preprocessor for LoggerPreprocessor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a better example than just couting bytes might be to use snappy library to decompress the data that comes in because it's an example that actually transforms the input

Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
fn len(&self) -> BufferSize {
let mut size = BufferSize::empty();
for v in self.iter() {
size.add_assign(v.len());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
size.add_assign(v.len());
size += v.len();

Comment on lines +248 to +258
for v in self.iter_mut() {
if remaining <= 0 {
break;
}
let buf = v.take_some(remaining);
if let Some(ib) = buf {
let len = ib.len().records;
remaining = remaining.saturating_sub(len);
result.push(ib);
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the time, this loop is going to reduce some (or all) of the inner input buffers to empty, and I would argue that in that case it should remove them entirely from the vector.

for chunk in chunks {
let (mut buffer, errors) = details.parser.parse(&chunk, None);
details.consumer.buffered(buffer.len());
let len = InputBuffer::len(&buffer);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let len = InputBuffer::len(&buffer);
let len = buffer.len();

for chunk in chunks {
let (mut buffer, errors) = details.parser.parse(&chunk, None);
details.consumer.buffered(buffer.len());
let len = InputBuffer::len(&buffer);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let len = InputBuffer::len(&buffer);
let len = buffer.len();

unless there's something weird going on that I don't recognize

Comment on lines -1092 to -1094
--jdbcSource
Connection string to a database that contains table metadata
Default: <empty string>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems unrelated to the PR.

@mihaibudiu mihaibudiu force-pushed the preprocessor branch 2 times, most recently from 3e94576 to 5ac34a1 Compare March 3, 2026 05:18
…tagram preprocessors are supported with fault-tolerance

Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Connectors] Add support for user-defined functions for pre-parsing in connectors

7 participants