From f17763491c544632a46bba6f5758380305ab26d1 Mon Sep 17 00:00:00 2001 From: Aram Grigoryan <132480+aram356@users.noreply.github.com> Date: Thu, 25 Jun 2026 19:30:20 -0700 Subject: [PATCH 1/2] Stream EdgeZero asset responses instead of buffering them MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The EdgeZero entry point drained every asset/image-optimizer response into the Wasm heap before sending: `dispatch_asset_fallback` ran `buffer_asset_body` (a `BoundedWriter` capped by `publisher.max_buffered_body_bytes`) and re-attached the bytes as a buffered `Body::Once`. This defeated the streaming that `handle_asset_proxy_request` already produces (it requests a streaming origin response and hands back a `Body::Stream`), so large optimized images were fully materialized in memory. The legacy `main.rs` path never buffered — its `HandlerOutcome::AssetStreaming` arm commits headers via `stream_to_client()` and pipes the body chunk-by-chunk with `stream_asset_body`. Wire the same seam into the EdgeZero path: - `dispatch_asset_fallback` now attaches the origin `Body::Stream` to the response (dropping it only for HEAD/204/304, which keep the origin Content-Length and carry no body) instead of buffering. - `edgezero_main` sends through a new `send_core_response` helper that streams `Body::Stream` responses with `stream_to_client()` + `stream_asset_body` and sends buffered `Body::Once` responses in one shot. - Remove the now-dead `buffer_asset_body`/`BoundedWriter` usage and refresh the stale buffering docs. No edgezero dependency change is needed: `edgezero_core::body::Body` already carries a `Stream` variant. Publisher responses remain buffered (unchanged). Adds `dispatch_asset_fallback_streams_origin_body_without_buffering`, which injects a streaming HTTP client and asserts the dispatched response body is `Body::Stream`. --- .../trusted-server-adapter-fastly/src/app.rs | 158 +++++++++++------- .../trusted-server-adapter-fastly/src/main.rs | 41 ++++- .../src/route_tests.rs | 14 +- 3 files changed, 147 insertions(+), 66 deletions(-) diff --git a/crates/trusted-server-adapter-fastly/src/app.rs b/crates/trusted-server-adapter-fastly/src/app.rs index 4d0a94a8..a183dcdb 100644 --- a/crates/trusted-server-adapter-fastly/src/app.rs +++ b/crates/trusted-server-adapter-fastly/src/app.rs @@ -64,8 +64,10 @@ //! run on these responses. Legacy ran EC finalization on its own auth //! challenges. Like the 401 geo-skip, this is privacy-conservative: no EC //! cookies are issued to unauthenticated callers. -//! - **Streaming publisher responses** are buffered (bounded by +//! - **Publisher responses** are buffered (bounded by //! `publisher.max_buffered_body_bytes`) instead of streamed to the client. +//! Asset responses are streamed straight to the client (see +//! [`dispatch_asset_fallback`]), matching legacy. //! - **Router-level 405s** (unregistered verbs) skip EC finalization along //! with the middleware chain; the entry point still adds TS headers. //! @@ -109,12 +111,9 @@ use trusted_server_core::integrations::{ use trusted_server_core::platform::{ClientInfo, GeoInfo, PlatformKvStore, RuntimeServices}; use trusted_server_core::proxy::{ handle_asset_proxy_request, handle_first_party_click, handle_first_party_proxy, - handle_first_party_proxy_rebuild, handle_first_party_proxy_sign, stream_asset_body, - AssetProxyCachePolicy, -}; -use trusted_server_core::publisher::{ - handle_publisher_request, handle_tsjs_dynamic, BoundedWriter, + handle_first_party_proxy_rebuild, handle_first_party_proxy_sign, AssetProxyCachePolicy, }; +use trusted_server_core::publisher::{handle_publisher_request, handle_tsjs_dynamic}; use trusted_server_core::request_signing::{ handle_deactivate_key, handle_rotate_key, handle_trusted_server_discovery, handle_verify_signature, @@ -726,13 +725,13 @@ async fn dispatch_fallback( attach_dispatch_extensions(response, ec, effects) } -/// Returns `true` when an asset response should carry a buffered body and a -/// recomputed `Content-Length`. +/// Returns `true` when an asset response should carry a (streamed) body. /// /// `HEAD` responses and bodiless statuses (204, 304) advertise the origin /// representation length in their `Content-Length` header while carrying no -/// body. Rewriting that header to the buffered byte count (0) would corrupt the -/// metadata, so those responses keep the origin's `Content-Length` untouched. +/// body. Attaching the origin stream would either contradict that header or +/// stream bytes a client does not expect, so those responses drop the stream and +/// keep the origin's `Content-Length` untouched. fn asset_response_carries_body(method: &Method, status: StatusCode) -> bool { *method != Method::HEAD && status != StatusCode::NO_CONTENT @@ -748,13 +747,13 @@ fn asset_response_carries_body(method: &Method, status: StatusCode) -> bool { /// is intentionally skipped: no [`EcFinalizeState`] is attached, matching the /// legacy `should_finalize_ec = false` behavior for asset responses. /// -/// Unlike legacy `route_request`, which streams asset bodies straight to the -/// client with no cap, the `EdgeZero` path buffers them: `edgezero_main` -/// converts the whole response before sending, so there is no streaming seam -/// yet. The buffer is bounded by `publisher.max_buffered_body_bytes` as an -/// interim Wasm-heap OOM guard. Reusing the publisher cap and restoring -/// uncapped streaming are both resolved by the streaming cutover (issue #495); -/// whether assets get a dedicated cap is deferred to that work. +/// Like legacy `route_request`, asset bodies are streamed straight to the client +/// with no cap: the origin stream is attached to the response and `edgezero_main` +/// commits the headers, then pipes the body chunk-by-chunk via +/// [`fastly::Response::stream_to_client`] and +/// [`stream_asset_body`](trusted_server_core::proxy::stream_asset_body). The +/// origin stream is left untouched here, so large images never materialize in +/// the Wasm heap. async fn dispatch_asset_fallback( state: &AppState, services: &RuntimeServices, @@ -771,30 +770,13 @@ async fn dispatch_asset_fallback( let cache_policy = asset_response.cache_policy(); let (mut response, stream_body) = asset_response.into_response_and_body(); + // Attach the origin stream so `edgezero_main` streams it to the + // client. HEAD and bodiless statuses (204, 304) advertise the origin + // Content-Length but carry no body, so their stream is dropped to + // preserve that header and avoid a length/body mismatch. if let Some(body) = stream_body { - match buffer_asset_body(body, state.settings.publisher.max_buffered_body_bytes) - .await - { - Ok(bytes) => { - // Preserve the origin's Content-Length for HEAD and - // bodiless statuses; only body-bearing responses get a - // recomputed length and the buffered body attached. - if asset_response_carries_body(&method, response.status()) { - response.headers_mut().insert( - header::CONTENT_LENGTH, - HeaderValue::from(bytes.len() as u64), - ); - *response.body_mut() = edgezero_core::body::Body::from(bytes); - } - } - Err(report) => { - let mut response = http_error(&report); - response - .extensions_mut() - .insert(AssetProxyCachePolicy::NoStorePrivate); - attach_request_filter_effects(&mut response, effects); - return response; - } + if asset_response_carries_body(&method, response.status()) { + *response.body_mut() = body; } } @@ -825,23 +807,6 @@ fn attach_request_filter_effects(response: &mut Response, effects: &RequestFilte } } -/// Buffers a streaming asset body into memory, bounded by `max_bytes` -/// (the interim `publisher.max_buffered_body_bytes` OOM guard; see -/// [`dispatch_asset_fallback`]). -/// -/// # Errors -/// -/// Returns an error if the body exceeds the configured cap or the underlying -/// stream yields an error. -async fn buffer_asset_body( - body: edgezero_core::body::Body, - max_bytes: usize, -) -> Result, Report> { - let mut output = BoundedWriter::new(max_bytes); - stream_asset_body(body, &mut output).await?; - Ok(output.into_inner()) -} - // --------------------------------------------------------------------------- // Error helper // --------------------------------------------------------------------------- @@ -1922,6 +1887,85 @@ mod tests { ); } + #[test] + fn dispatch_asset_fallback_streams_origin_body_without_buffering() { + // Regression guard for the EdgeZero asset streaming cutover: a successful + // asset proxy must hand `edgezero_main` a streaming body (`Body::Stream`) + // rather than draining the origin into a buffered `Body::Once`. Injecting a + // streaming HTTP client lets us assert the body type the entry point will + // pipe straight to the client via `stream_to_client`. + use crate::route_tests::{ + test_runtime_services_with_secret_http_client_and_geo, us_california_geo, FixedBackend, + FixedGeo, NoopSecretStore, StreamingRecordingHttpClient, + }; + use trusted_server_core::platform::PlatformHttpClient; + + let settings = Settings::from_toml( + r#" + [[handlers]] + path = "^/_ts/admin" + username = "admin" + password = "admin-pass" + + [publisher] + domain = "test-publisher.com" + cookie_domain = ".test-publisher.com" + origin_url = "https://origin.test-publisher.com" + proxy_secret = "unit-test-proxy-secret" + + [ec] + passphrase = "test-secret-key-32-bytes-minimum" + + [request_signing] + enabled = false + config_store_id = "test-config-store-id" + secret_store_id = "test-secret-store-id" + + [proxy] + + [[proxy.asset_routes]] + prefix = "/.images/" + origin_url = "https://assets.example.com" + "#, + ) + .expect("should parse asset-route settings"); + let state = build_state_from_settings(settings).expect("should build state"); + + let fastly_req = fastly::Request::get("https://test-publisher.com/.images/logo.png"); + let http_client = Arc::new(StreamingRecordingHttpClient::new()); + let services = test_runtime_services_with_secret_http_client_and_geo( + &fastly_req, + Arc::new(FixedBackend), + Arc::new(NoopSecretStore), + Arc::clone(&http_client) as Arc, + Arc::new(FixedGeo(us_california_geo())), + ); + let req = crate::compat::from_fastly_request(fastly_req); + let asset_route = state + .settings + .asset_route_for_path("/.images/logo.png") + .expect("should match the configured asset route"); + let effects = RequestFilterEffects::default(); + + let response = block_on(super::dispatch_asset_fallback( + &state, + &services, + req, + asset_route, + &effects, + )); + + assert_eq!( + response.status(), + StatusCode::OK, + "should preserve the streaming origin status" + ); + assert!( + matches!(response.body(), Body::Stream(_)), + "EdgeZero asset dispatch must stream the origin body, not buffer it" + ); + } + #[test] fn dispatch_runs_request_filter_and_threads_response_effects() { // Regression guard for the EdgeZero request-filter bypass: the publisher diff --git a/crates/trusted-server-adapter-fastly/src/main.rs b/crates/trusted-server-adapter-fastly/src/main.rs index f5f62f46..5fb3755e 100644 --- a/crates/trusted-server-adapter-fastly/src/main.rs +++ b/crates/trusted-server-adapter-fastly/src/main.rs @@ -370,7 +370,7 @@ fn edgezero_main(mut req: FastlyRequest, config_store: ConfigStoreHandle) { if let Some(effects) = &request_filter_effects { effects.apply_to_response(&mut response); } - compat::to_fastly_response(response).send_to_client(); + send_core_response(response); if ec_state.is_real_browser { if let Some(context) = build_pull_sync_context(&ec_state.ec_context) { @@ -399,7 +399,44 @@ fn edgezero_main(mut req: FastlyRequest, config_store: ConfigStoreHandle) { if let Some(effects) = &request_filter_effects { effects.apply_to_response(&mut response); } - compat::to_fastly_response(response).send_to_client(); + send_core_response(response); +} + +/// Sends a finalized `EdgeZero` response to the client. +/// +/// Asset responses carry an [`EdgeBody::Stream`] body: the headers are committed +/// first via [`fastly::Response::stream_to_client`], then the origin body is +/// piped chunk-by-chunk with [`stream_asset_body`] so large images never +/// materialize in the Wasm heap. This mirrors the legacy +/// `HandlerOutcome::AssetStreaming` path. All other responses carry a buffered +/// [`EdgeBody::Once`] body and are sent in a single shot. +fn send_core_response(response: HttpResponse) { + let (parts, body) = response.into_parts(); + match body { + EdgeBody::Stream(_) => { + let skeleton = compat::to_fastly_response_skeleton(HttpResponse::from_parts( + parts, + EdgeBody::empty(), + )); + let mut streaming_body = skeleton.stream_to_client(); + match futures::executor::block_on(stream_asset_body(body, &mut streaming_body)) { + Ok(()) => { + if let Err(e) = streaming_body.finish() { + log::error!("failed to finish EdgeZero asset streaming body: {e}"); + } + } + Err(e) => { + log::error!("EdgeZero asset streaming failed: {e:?}"); + // Headers already committed; drop the body so the client sees + // a truncated response (EOF mid-stream), standard proxy behavior. + drop(streaming_body); + } + } + } + once => { + compat::to_fastly_response(HttpResponse::from_parts(parts, once)).send_to_client(); + } + } } fn take_finalize_sentinel(response: &mut HttpResponse) -> bool { diff --git a/crates/trusted-server-adapter-fastly/src/route_tests.rs b/crates/trusted-server-adapter-fastly/src/route_tests.rs index 1c58a760..12bc1157 100644 --- a/crates/trusted-server-adapter-fastly/src/route_tests.rs +++ b/crates/trusted-server-adapter-fastly/src/route_tests.rs @@ -63,7 +63,7 @@ impl PlatformConfigStore for StubJwksConfigStore { } } -struct NoopSecretStore; +pub(crate) struct NoopSecretStore; struct HashMapSecretStore { data: HashMap>, @@ -145,7 +145,7 @@ struct RecordingHttpClient { response_body: Vec, } -struct StreamingRecordingHttpClient { +pub(crate) struct StreamingRecordingHttpClient { calls: Mutex>, } @@ -177,7 +177,7 @@ impl RecordingHttpClient { } impl StreamingRecordingHttpClient { - fn new() -> Self { + pub(crate) fn new() -> Self { Self { calls: Mutex::new(Vec::new()), } @@ -191,7 +191,7 @@ struct RecordedHttpCall { stream_response: bool, } -struct FixedBackend; +pub(crate) struct FixedBackend; impl PlatformBackend for FixedBackend { fn predict_name(&self, spec: &PlatformBackendSpec) -> Result> { @@ -355,7 +355,7 @@ impl AuctionProvider for DisabledRouteProvider { } } -struct FixedGeo(GeoInfo); +pub(crate) struct FixedGeo(pub(crate) GeoInfo); impl PlatformGeo for FixedGeo { fn lookup(&self, _client_ip: Option) -> Result, Report> { @@ -363,7 +363,7 @@ impl PlatformGeo for FixedGeo { } } -fn us_california_geo() -> GeoInfo { +pub(crate) fn us_california_geo() -> GeoInfo { GeoInfo { city: "Example City".to_string(), country: "US".to_string(), @@ -532,7 +532,7 @@ fn test_runtime_services_with_secret_and_http_client( ) } -fn test_runtime_services_with_secret_http_client_and_geo( +pub(crate) fn test_runtime_services_with_secret_http_client_and_geo( req: &Request, backend: Arc, secret_store: Arc, From 9673b6b85eeab2504f73e796b3c37c1c7372814d Mon Sep 17 00:00:00 2001 From: Christian Date: Fri, 26 Jun 2026 15:48:37 -0500 Subject: [PATCH 2/2] Move streaming asset test imports to module scope --- crates/trusted-server-adapter-fastly/src/app.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/crates/trusted-server-adapter-fastly/src/app.rs b/crates/trusted-server-adapter-fastly/src/app.rs index a183dcdb..97d599cf 100644 --- a/crates/trusted-server-adapter-fastly/src/app.rs +++ b/crates/trusted-server-adapter-fastly/src/app.rs @@ -1062,6 +1062,10 @@ mod tests { use std::sync::Arc; use super::{build_state_from_settings, startup_error_router, AppState, TrustedServerApp}; + use crate::route_tests::{ + test_runtime_services_with_secret_http_client_and_geo, us_california_geo, FixedBackend, + FixedGeo, NoopSecretStore, StreamingRecordingHttpClient, + }; use edgezero_core::body::Body; use edgezero_core::http::{header, request_builder, Method, StatusCode}; @@ -1079,7 +1083,7 @@ mod tests { HeaderMutation, IntegrationRegistry, IntegrationRequestFilter, RequestFilterDecision, RequestFilterEffects, RequestFilterInput, }; - use trusted_server_core::platform::ClientInfo; + use trusted_server_core::platform::{ClientInfo, PlatformHttpClient}; use trusted_server_core::settings::Settings; fn settings_with_missing_consent_store() -> Settings { @@ -1894,12 +1898,6 @@ mod tests { // rather than draining the origin into a buffered `Body::Once`. Injecting a // streaming HTTP client lets us assert the body type the entry point will // pipe straight to the client via `stream_to_client`. - use crate::route_tests::{ - test_runtime_services_with_secret_http_client_and_geo, us_california_geo, FixedBackend, - FixedGeo, NoopSecretStore, StreamingRecordingHttpClient, - }; - use trusted_server_core::platform::PlatformHttpClient; - let settings = Settings::from_toml( r#" [[handlers]]