Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 100 additions & 58 deletions crates/trusted-server-adapter-fastly/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,10 @@
//! run on these responses. Legacy ran EC finalization on its own auth
//! challenges. Like the 401 geo-skip, this is privacy-conservative: no EC
//! cookies are issued to unauthenticated callers.
//! - **Streaming publisher responses** are buffered (bounded by
//! - **Publisher responses** are buffered (bounded by
//! `publisher.max_buffered_body_bytes`) instead of streamed to the client.
//! Asset responses are streamed straight to the client (see
//! [`dispatch_asset_fallback`]), matching legacy.
//! - **Router-level 405s** (unregistered verbs) skip EC finalization along
//! with the middleware chain; the entry point still adds TS headers.
//!
Expand Down Expand Up @@ -109,12 +111,9 @@ use trusted_server_core::integrations::{
use trusted_server_core::platform::{ClientInfo, GeoInfo, PlatformKvStore, RuntimeServices};
use trusted_server_core::proxy::{
handle_asset_proxy_request, handle_first_party_click, handle_first_party_proxy,
handle_first_party_proxy_rebuild, handle_first_party_proxy_sign, stream_asset_body,
AssetProxyCachePolicy,
};
use trusted_server_core::publisher::{
handle_publisher_request, handle_tsjs_dynamic, BoundedWriter,
handle_first_party_proxy_rebuild, handle_first_party_proxy_sign, AssetProxyCachePolicy,
};
use trusted_server_core::publisher::{handle_publisher_request, handle_tsjs_dynamic};
use trusted_server_core::request_signing::{
handle_deactivate_key, handle_rotate_key, handle_trusted_server_discovery,
handle_verify_signature,
Expand Down Expand Up @@ -726,13 +725,13 @@ async fn dispatch_fallback(
attach_dispatch_extensions(response, ec, effects)
}

/// Returns `true` when an asset response should carry a buffered body and a
/// recomputed `Content-Length`.
/// Returns `true` when an asset response should carry a (streamed) body.
///
/// `HEAD` responses and bodiless statuses (204, 304) advertise the origin
/// representation length in their `Content-Length` header while carrying no
/// body. Rewriting that header to the buffered byte count (0) would corrupt the
/// metadata, so those responses keep the origin's `Content-Length` untouched.
/// body. Attaching the origin stream would either contradict that header or
/// stream bytes a client does not expect, so those responses drop the stream and
/// keep the origin's `Content-Length` untouched.
fn asset_response_carries_body(method: &Method, status: StatusCode) -> bool {
*method != Method::HEAD
&& status != StatusCode::NO_CONTENT
Expand All @@ -748,13 +747,13 @@ fn asset_response_carries_body(method: &Method, status: StatusCode) -> bool {
/// is intentionally skipped: no [`EcFinalizeState`] is attached, matching the
/// legacy `should_finalize_ec = false` behavior for asset responses.
///
/// Unlike legacy `route_request`, which streams asset bodies straight to the
/// client with no cap, the `EdgeZero` path buffers them: `edgezero_main`
/// converts the whole response before sending, so there is no streaming seam
/// yet. The buffer is bounded by `publisher.max_buffered_body_bytes` as an
/// interim Wasm-heap OOM guard. Reusing the publisher cap and restoring
/// uncapped streaming are both resolved by the streaming cutover (issue #495);
/// whether assets get a dedicated cap is deferred to that work.
/// Like legacy `route_request`, asset bodies are streamed straight to the client
/// with no cap: the origin stream is attached to the response and `edgezero_main`
/// commits the headers, then pipes the body chunk-by-chunk via
/// [`fastly::Response::stream_to_client`] and
/// [`stream_asset_body`](trusted_server_core::proxy::stream_asset_body). The
/// origin stream is left untouched here, so large images never materialize in
/// the Wasm heap.
async fn dispatch_asset_fallback(
state: &AppState,
services: &RuntimeServices,
Expand All @@ -771,30 +770,13 @@ async fn dispatch_asset_fallback(
let cache_policy = asset_response.cache_policy();
let (mut response, stream_body) = asset_response.into_response_and_body();

// Attach the origin stream so `edgezero_main` streams it to the
// client. HEAD and bodiless statuses (204, 304) advertise the origin
// Content-Length but carry no body, so their stream is dropped to
// preserve that header and avoid a length/body mismatch.
if let Some(body) = stream_body {
match buffer_asset_body(body, state.settings.publisher.max_buffered_body_bytes)
.await
{
Ok(bytes) => {
// Preserve the origin's Content-Length for HEAD and
// bodiless statuses; only body-bearing responses get a
// recomputed length and the buffered body attached.
if asset_response_carries_body(&method, response.status()) {
response.headers_mut().insert(
header::CONTENT_LENGTH,
HeaderValue::from(bytes.len() as u64),
);
*response.body_mut() = edgezero_core::body::Body::from(bytes);
}
}
Err(report) => {
let mut response = http_error(&report);
response
.extensions_mut()
.insert(AssetProxyCachePolicy::NoStorePrivate);
attach_request_filter_effects(&mut response, effects);
return response;
}
if asset_response_carries_body(&method, response.status()) {
*response.body_mut() = body;
}
}

Expand Down Expand Up @@ -825,23 +807,6 @@ fn attach_request_filter_effects(response: &mut Response, effects: &RequestFilte
}
}

/// Buffers a streaming asset body into memory, bounded by `max_bytes`
/// (the interim `publisher.max_buffered_body_bytes` OOM guard; see
/// [`dispatch_asset_fallback`]).
///
/// # Errors
///
/// Returns an error if the body exceeds the configured cap or the underlying
/// stream yields an error.
async fn buffer_asset_body(
body: edgezero_core::body::Body,
max_bytes: usize,
) -> Result<Vec<u8>, Report<TrustedServerError>> {
let mut output = BoundedWriter::new(max_bytes);
stream_asset_body(body, &mut output).await?;
Ok(output.into_inner())
}

// ---------------------------------------------------------------------------
// Error helper
// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -1097,6 +1062,10 @@ mod tests {
use std::sync::Arc;

use super::{build_state_from_settings, startup_error_router, AppState, TrustedServerApp};
use crate::route_tests::{
test_runtime_services_with_secret_http_client_and_geo, us_california_geo, FixedBackend,
FixedGeo, NoopSecretStore, StreamingRecordingHttpClient,
};

use edgezero_core::body::Body;
use edgezero_core::http::{header, request_builder, Method, StatusCode};
Expand All @@ -1114,7 +1083,7 @@ mod tests {
HeaderMutation, IntegrationRegistry, IntegrationRequestFilter, RequestFilterDecision,
RequestFilterEffects, RequestFilterInput,
};
use trusted_server_core::platform::ClientInfo;
use trusted_server_core::platform::{ClientInfo, PlatformHttpClient};
use trusted_server_core::settings::Settings;

fn settings_with_missing_consent_store() -> Settings {
Expand Down Expand Up @@ -1922,6 +1891,79 @@ mod tests {
);
}

#[test]
fn dispatch_asset_fallback_streams_origin_body_without_buffering() {
// Regression guard for the EdgeZero asset streaming cutover: a successful
// asset proxy must hand `edgezero_main` a streaming body (`Body::Stream`)
// rather than draining the origin into a buffered `Body::Once`. Injecting a
// streaming HTTP client lets us assert the body type the entry point will
// pipe straight to the client via `stream_to_client`.
let settings = Settings::from_toml(
r#"
[[handlers]]
path = "^/_ts/admin"
username = "admin"
password = "admin-pass"

[publisher]
domain = "test-publisher.com"
cookie_domain = ".test-publisher.com"
origin_url = "https://origin.test-publisher.com"
proxy_secret = "unit-test-proxy-secret"

[ec]
passphrase = "test-secret-key-32-bytes-minimum"

[request_signing]
enabled = false
config_store_id = "test-config-store-id"
secret_store_id = "test-secret-store-id"

[proxy]

[[proxy.asset_routes]]
prefix = "/.images/"
origin_url = "https://assets.example.com"
"#,
)
.expect("should parse asset-route settings");
let state = build_state_from_settings(settings).expect("should build state");

let fastly_req = fastly::Request::get("https://test-publisher.com/.images/logo.png");
let http_client = Arc::new(StreamingRecordingHttpClient::new());
let services = test_runtime_services_with_secret_http_client_and_geo(
&fastly_req,
Arc::new(FixedBackend),
Arc::new(NoopSecretStore),
Arc::clone(&http_client) as Arc<dyn PlatformHttpClient>,
Arc::new(FixedGeo(us_california_geo())),
);
let req = crate::compat::from_fastly_request(fastly_req);
let asset_route = state
.settings
.asset_route_for_path("/.images/logo.png")
.expect("should match the configured asset route");
let effects = RequestFilterEffects::default();

let response = block_on(super::dispatch_asset_fallback(
&state,
&services,
req,
asset_route,
&effects,
));

assert_eq!(
response.status(),
StatusCode::OK,
"should preserve the streaming origin status"
);
assert!(
matches!(response.body(), Body::Stream(_)),
"EdgeZero asset dispatch must stream the origin body, not buffer it"
);
}

#[test]
fn dispatch_runs_request_filter_and_threads_response_effects() {
// Regression guard for the EdgeZero request-filter bypass: the publisher
Expand Down
41 changes: 39 additions & 2 deletions crates/trusted-server-adapter-fastly/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ fn edgezero_main(mut req: FastlyRequest, config_store: ConfigStoreHandle) {
if let Some(effects) = &request_filter_effects {
effects.apply_to_response(&mut response);
}
compat::to_fastly_response(response).send_to_client();
send_core_response(response);

if ec_state.is_real_browser {
if let Some(context) = build_pull_sync_context(&ec_state.ec_context) {
Expand Down Expand Up @@ -399,7 +399,44 @@ fn edgezero_main(mut req: FastlyRequest, config_store: ConfigStoreHandle) {
if let Some(effects) = &request_filter_effects {
effects.apply_to_response(&mut response);
}
compat::to_fastly_response(response).send_to_client();
send_core_response(response);
}

/// Sends a finalized `EdgeZero` response to the client.
///
/// Asset responses carry an [`EdgeBody::Stream`] body: the headers are committed
/// first via [`fastly::Response::stream_to_client`], then the origin body is
/// piped chunk-by-chunk with [`stream_asset_body`] so large images never
/// materialize in the Wasm heap. This mirrors the legacy
/// `HandlerOutcome::AssetStreaming` path. All other responses carry a buffered
/// [`EdgeBody::Once`] body and are sent in a single shot.
fn send_core_response(response: HttpResponse) {
let (parts, body) = response.into_parts();
match body {
EdgeBody::Stream(_) => {
let skeleton = compat::to_fastly_response_skeleton(HttpResponse::from_parts(
parts,
EdgeBody::empty(),
));
let mut streaming_body = skeleton.stream_to_client();
match futures::executor::block_on(stream_asset_body(body, &mut streaming_body)) {
Ok(()) => {
if let Err(e) = streaming_body.finish() {
log::error!("failed to finish EdgeZero asset streaming body: {e}");
}
}
Err(e) => {
log::error!("EdgeZero asset streaming failed: {e:?}");
// Headers already committed; drop the body so the client sees
// a truncated response (EOF mid-stream), standard proxy behavior.
drop(streaming_body);
}
}
}
once => {
compat::to_fastly_response(HttpResponse::from_parts(parts, once)).send_to_client();
}
}
}

fn take_finalize_sentinel(response: &mut HttpResponse) -> bool {
Expand Down
14 changes: 7 additions & 7 deletions crates/trusted-server-adapter-fastly/src/route_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl PlatformConfigStore for StubJwksConfigStore {
}
}

struct NoopSecretStore;
pub(crate) struct NoopSecretStore;

struct HashMapSecretStore {
data: HashMap<String, Vec<u8>>,
Expand Down Expand Up @@ -145,7 +145,7 @@ struct RecordingHttpClient {
response_body: Vec<u8>,
}

struct StreamingRecordingHttpClient {
pub(crate) struct StreamingRecordingHttpClient {
calls: Mutex<Vec<RecordedHttpCall>>,
}

Expand Down Expand Up @@ -177,7 +177,7 @@ impl RecordingHttpClient {
}

impl StreamingRecordingHttpClient {
fn new() -> Self {
pub(crate) fn new() -> Self {
Self {
calls: Mutex::new(Vec::new()),
}
Expand All @@ -191,7 +191,7 @@ struct RecordedHttpCall {
stream_response: bool,
}

struct FixedBackend;
pub(crate) struct FixedBackend;

impl PlatformBackend for FixedBackend {
fn predict_name(&self, spec: &PlatformBackendSpec) -> Result<String, Report<PlatformError>> {
Expand Down Expand Up @@ -355,15 +355,15 @@ impl AuctionProvider for DisabledRouteProvider {
}
}

struct FixedGeo(GeoInfo);
pub(crate) struct FixedGeo(pub(crate) GeoInfo);

impl PlatformGeo for FixedGeo {
fn lookup(&self, _client_ip: Option<IpAddr>) -> Result<Option<GeoInfo>, Report<PlatformError>> {
Ok(Some(self.0.clone()))
}
}

fn us_california_geo() -> GeoInfo {
pub(crate) fn us_california_geo() -> GeoInfo {
GeoInfo {
city: "Example City".to_string(),
country: "US".to_string(),
Expand Down Expand Up @@ -532,7 +532,7 @@ fn test_runtime_services_with_secret_and_http_client(
)
}

fn test_runtime_services_with_secret_http_client_and_geo(
pub(crate) fn test_runtime_services_with_secret_http_client_and_geo(
req: &Request,
backend: Arc<dyn PlatformBackend>,
secret_store: Arc<dyn PlatformSecretStore>,
Expand Down
Loading