Conversation
This patch replaces `awc` with `reqwest` as the HTTP client for talking to the pipeline. The main reason for this are various hard-to-explain HTTP errors we've observed in CI. On top of this, awc doesn't support a clean way to disable connection caching. It can only be achieved by setting the `connection: Close` header, which we suspect confuses the server. In contrast, reqwest supports setting connection pool size to 0. This commit was mostly generated by cursor. There are a couple of subtle points here (which cursor missed) - In awc, ClientRequest::timeout() controls timeout from request start till HTTP response header is returned. reqwest doesn't have a similar method, so we use tokio::timeout instead. - reqwest seems to handle disconnects differently from awc: in awc when the pipeline drops its end of a streaming response, the byte stream simply ends (without an error); in reqwest, the stream contains an error, which gets propagated to the API client. This may be the right thing to do, but currently the Python SDK doesn't expect that. We simulate the old behavior to avoid a breaking change. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
There was a problem hiding this comment.
Pull Request Overview
This PR replaces the awc HTTP client with reqwest for pipeline communication to address HTTP errors observed in CI and improve connection management. The main change is replacing awc::Client with reqwest::Client throughout the codebase, with special handling for connection pooling and streaming response behavior.
Key Changes
- Replaced
awc::Clientwithreqwest::Clientfor all HTTP requests to pipelines - Disabled connection pooling by setting
pool_max_idle_per_host(0)to prevent stale connection errors - Modified streaming response error handling to gracefully terminate on disconnects instead of propagating errors to maintain backward compatibility with the Python SDK
Reviewed Changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
crates/pipeline-manager/src/runner/interaction.rs |
Core HTTP client replacement with new request/response handling and streaming error suppression |
crates/pipeline-manager/src/config.rs |
Added connection pool configuration to disable idle connection caching |
crates/pipeline-manager/src/api/support_data_collector.rs |
Updated client type references and streaming response handling |
crates/pipeline-manager/src/api/main.rs |
Modified server initialization to use both reqwest and awc clients (awc for WebSocket) |
crates/pipeline-manager/src/api/endpoints/pipeline_management.rs |
Updated function signatures to accept reqwest::Client |
crates/pipeline-manager/src/api/endpoints/pipeline_interaction/support_bundle.rs |
Updated function signature to accept reqwest::Client |
crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs |
Updated function signatures and added both client types for WebSocket vs HTTP handling |
crates/pipeline-manager/src/api/endpoints/metrics.rs |
Updated function signature to accept reqwest::Client |
crates/pipeline-manager/src/api/endpoints/config.rs |
Updated function signature to accept reqwest::Client |
Comments suppressed due to low confidence (1)
crates/pipeline-manager/src/runner/interaction.rs:1
- The comment is incomplete. It appears to be cut off mid-sentence ('Other').
use crate::api::error::ApiError;
| Method::PATCH => reqwest::Method::PATCH, | ||
| Method::OPTIONS => reqwest::Method::OPTIONS, | ||
| Method::HEAD => reqwest::Method::HEAD, | ||
| _ => reqwest::Method::GET, |
There was a problem hiding this comment.
The default fallback to GET for unrecognized HTTP methods could mask errors. Consider returning an error or using a more explicit match pattern to handle unsupported methods explicitly.
| _ => reqwest::Method::GET, | |
| _ => { | |
| return Err(RunnerError::PipelineInteractionInvalidRequest { | |
| pipeline_name: pipeline_name.to_string(), | |
| error: format!("unsupported HTTP method: {}", method), | |
| }.into()); | |
| } |
There was a problem hiding this comment.
Which other methods cause this to not be exhaustive?
| Method::PATCH => reqwest::Method::PATCH, | ||
| Method::OPTIONS => reqwest::Method::OPTIONS, | ||
| Method::HEAD => reqwest::Method::HEAD, | ||
| _ => reqwest::Method::GET, |
There was a problem hiding this comment.
The default fallback to GET for unrecognized HTTP methods could mask errors. Consider returning an error or using a more explicit match pattern to handle unsupported methods explicitly.
| _ => reqwest::Method::GET, | |
| _ => { | |
| return Err(ManagerError::UnsupportedMethod(request.method().to_string())); | |
| } |
| // let request_str_clone = request_str.clone(); | ||
|
|
||
| // Convert reqwest streaming response to actix streaming response | ||
| // Both reqwest and actix use the same Bytes type from the bytes crate, so no conversion needed | ||
| // Handle errors gracefully to avoid "response ended prematurely" on client disconnects | ||
| let stream = response | ||
| .bytes_stream() | ||
| // .inspect(move |result| { | ||
| // match result { | ||
| // Ok(bytes) => println!("Stream {request_str_clone} chunk: {} bytes - {:?}", bytes.len(), bytes), | ||
| // Err(e) => println!("Stream {request_str_clone} error: {:?}", e), | ||
| // } | ||
| // }) | ||
|
|
There was a problem hiding this comment.
Remove commented-out debugging code before merging. The commented inspection logic and unused request_str_clone should be cleaned up.
| // let request_str_clone = request_str.clone(); | |
| // Convert reqwest streaming response to actix streaming response | |
| // Both reqwest and actix use the same Bytes type from the bytes crate, so no conversion needed | |
| // Handle errors gracefully to avoid "response ended prematurely" on client disconnects | |
| let stream = response | |
| .bytes_stream() | |
| // .inspect(move |result| { | |
| // match result { | |
| // Ok(bytes) => println!("Stream {request_str_clone} chunk: {} bytes - {:?}", bytes.len(), bytes), | |
| // Err(e) => println!("Stream {request_str_clone} error: {:?}", e), | |
| // } | |
| // }) | |
| // Convert reqwest streaming response to actix streaming response | |
| // Both reqwest and actix use the same Bytes type from the bytes crate, so no conversion needed | |
| // Handle errors gracefully to avoid "response ended prematurely" on client disconnects | |
| let stream = response | |
| .bytes_stream() |
|
In general, I'm for this change, because the fact that the |
This patch replaces
awcwithreqwestas the HTTP client for talking to the pipeline. The main reason for this are various hard-to-explain HTTP errors we've observed in CI. On top of this, awc doesn't support a clean way to disable connection caching. It can only be achieved by setting theconnection: Closeheader, which we suspect confuses the server. In contrast, reqwest supports setting connection pool size to 0.This commit was mostly generated by cursor.
There are a couple of subtle points here (which cursor missed)
In awc, ClientRequest::timeout() controls timeout from request start till HTTP response header is returned. reqwest doesn't have a similar method, so we use tokio::timeout instead.
reqwest seems to handle disconnects differently from awc: in awc when the pipeline drops its end of a streaming response, the byte stream simply ends (without an error); in reqwest, the stream contains an error, which gets propagated to the API client. This may be the right thing to do, but currently the Python SDK doesn't expect that. We simulate the old behavior to avoid a breaking change.