From 31fdadc549caf826ba8b422410b1a74e8931278e Mon Sep 17 00:00:00 2001 From: Tim Perry Date: Wed, 17 Jun 2026 16:11:50 +0200 Subject: [PATCH] quic: fix readable stream truncation on stop-sending, abort & timeout Signed-off-by: Tim Perry --- lib/internal/blob.js | 17 +++-- lib/internal/quic/quic.js | 52 ++++++++------- src/quic/streams.cc | 33 +++++----- src/quic/streams.h | 6 +- .../test-quic-stream-iteration-reset.mjs | 65 ++++++++----------- .../test-quic-stream-iteration-truncated.mjs | 58 +++++++++++++++++ .../test-quic-stream-setbody-errors.mjs | 8 ++- 7 files changed, 155 insertions(+), 84 deletions(-) create mode 100644 test/parallel/test-quic-stream-iteration-truncated.mjs diff --git a/lib/internal/blob.js b/lib/internal/blob.js index 20f7078d1d0915..4865101da3b0e6 100644 --- a/lib/internal/blob.js +++ b/lib/internal/blob.js @@ -542,7 +542,12 @@ function createBlobReaderStream(reader) { const kMaxBatchChunks = 16; async function* createBlobReaderIterable(reader, options = kEmptyObject) { - const { getReadError } = options; + // getEndError(status) lets the caller map a read ending to an error, or null. + // It is consulted both on a read failure (status < 0, always an error - we + // fall back to a generic one if the caller returns null) and on a clean EOS + // (status 0, an error only if the caller returns one, e.g. a stream that + // ended before its FIN). + const { getEndError } = options; let wakeup = PromiseWithResolvers(); reader.setWakeup(wakeup.resolve); @@ -568,8 +573,8 @@ async function* createBlobReaderIterable(reader, options = kEmptyObject) { break; } if (pullResult.status < 0) { - error = typeof getReadError === 'function' ? - getReadError(pullResult.status) : + error = (typeof getEndError === 'function' && + getEndError(pullResult.status)) || new ERR_INVALID_STATE('The reader is not readable'); break; } @@ -585,7 +590,11 @@ async function* createBlobReaderIterable(reader, options = kEmptyObject) { yield batch; } - if (eos) return; + if (eos) { + const eosError = typeof getEndError === 'function' ? getEndError(0) : null; + if (eosError) throw eosError; + return; + } if (error) throw error; if (blocked) { diff --git a/lib/internal/quic/quic.js b/lib/internal/quic/quic.js index ac3f50ddd34b8b..c34c1d4def87fb 100644 --- a/lib/internal/quic/quic.js +++ b/lib/internal/quic/quic.js @@ -1643,30 +1643,36 @@ class QuicStream { // Non-readable stream (outbound-only unidirectional, or closed) if (!inner.reader) return; - yield* createBlobReaderIterable(inner.reader, { - getReadError: () => { - // The read side ends for one of three reasons: - // * Clean FIN received from the peer (state.finReceived - // === true). The iterator stops without calling this; - // fall through to the generic state error if it does. - // * Peer sent us a RESET_STREAM. The C++ side records the - // code in state.resetCode regardless of whether the JS - // onreset handler was attached. state.finReceived stays - // false because no FIN was seen. - // * We aborted locally via stream.resetStream() or - // stream.stopSending(). Both paths run EndReadable in - // C++, setting state.readEnded without setting - // state.finReceived. There is no peer code to surface. - if (inner.state.readEnded && !inner.state.finReceived) { - const peerResetCode = inner.state.resetCode; - if (peerResetCode !== undefined && peerResetCode > 0n) { - return new ERR_QUIC_STREAM_RESET(Number(peerResetCode)); - } - return new ERR_QUIC_STREAM_ABORTED( - 'Stream aborted before FIN was received'); + // Maps the read side's end state to the error to surface, or null if it + // ended cleanly. state.finReceived is set only when the peer explicitly + // sent a FIN, confirming we got the whole stream - the one and only clean + // ending. Returns null in that case. Without a FIN the read side is + // truncated, for one of these reasons: + // * Peer sent us a RESET_STREAM. The C++ side records the + // code in state.resetCode regardless of whether the JS + // onreset handler was attached. state.finReceived stays + // false because no FIN was seen. + // * We aborted locally via stream.resetStream() or + // stream.stopSending(). Both paths run EndReadable in + // C++, setting state.readEnded without setting + // state.finReceived. There is no peer code to surface. + // * The session was torn down before a FIN arrived (a local or peer + // connection close, an idle timeout, or any error), again leaving + // state.finReceived false. + const readTruncationError = () => { + if (inner.state.readEnded && !inner.state.finReceived) { + const peerResetCode = inner.state.resetCode; + if (peerResetCode !== undefined && peerResetCode > 0n) { + return new ERR_QUIC_STREAM_RESET(Number(peerResetCode)); } - return new ERR_INVALID_STATE('The stream is not readable'); - }, + return new ERR_QUIC_STREAM_ABORTED( + 'Stream aborted before FIN was received'); + } + return null; + }; + + yield* createBlobReaderIterable(inner.reader, { + getEndError: readTruncationError, }); } diff --git a/src/quic/streams.cc b/src/quic/streams.cc index 7186aed89a78e9..5bad5b7ee5d747 100644 --- a/src/quic/streams.cc +++ b/src/quic/streams.cc @@ -487,7 +487,7 @@ struct Stream::Impl { code = args[0].As()->Uint64Value(&unused); } - stream->EndReadable(); + stream->EndReadable(std::nullopt, /* clean_fin = */ false); if (!stream->is_pending()) { // If the stream is a local unidirectional there's nothing to do here. @@ -1415,13 +1415,6 @@ BaseObjectPtr Stream::get_reader() { return reader; } -void Stream::set_final_size(uint64_t final_size) { - DCHECK_IMPLIES(state()->fin_received == 1, - final_size <= STAT_GET(Stats, final_size)); - state()->fin_received = 1; - STAT_SET(Stats, final_size, final_size); -} - void Stream::set_outbound(std::shared_ptr source) { if (!source || !is_writable()) return; Debug(this, "Setting the outbound data source"); @@ -1616,13 +1609,20 @@ void Stream::EndWritable() { state()->write_ended = 1; } -void Stream::EndReadable(std::optional maybe_final_size) { +void Stream::EndReadable(std::optional maybe_final_size, + bool clean_fin) { if (!is_readable()) return; state()->read_ended = 1; + // fin_received marks a clean completion of the read side (a real FIN). Any + // unclean end (reset/abort/session teardown) truncates the stream, which + // the JS reader will expose as a read error later. + if (clean_fin) state()->fin_received = 1; // Flush any accumulated data before capping so the reader can see it. FlushAccumulation(); - set_final_size(maybe_final_size.value_or(STAT_GET(Stats, bytes_received))); - inbound_->cap(STAT_GET(Stats, final_size)); + const uint64_t final_size = + maybe_final_size.value_or(STAT_GET(Stats, bytes_received)); + STAT_SET(Stats, final_size, final_size); + inbound_->cap(final_size); // Notify the JS reader so it can see EOS. Pass fin=true so the // wakeup promise resolves with a value the iterator can check to // avoid waiting for another wakeup that will never come. @@ -1651,8 +1651,9 @@ void Stream::Destroy(QuicError error) { // End the writable before marking as destroyed. EndWritable(); - // Also end the readable side if it isn't already. - EndReadable(); + // Also end the readable side if it isn't already. If not already ended, + // this will eventually surface as a error, since the data is truncated. + EndReadable(std::nullopt, /* clean_fin = */ false); // We are going to release our reference to the outbound_ queue here. outbound_.reset(); @@ -1699,7 +1700,7 @@ void Stream::ReceiveData(const uint8_t* data, // end the readable side if this is the last bit of data we've received. Debug(this, "Receiving %zu bytes of data", len); if (state()->read_ended == 1 || len == 0) { - if (flags.fin) EndReadable(); + if (flags.fin) EndReadable(std::nullopt, /* clean_fin = */ true); return; } @@ -1772,7 +1773,7 @@ void Stream::ReceiveData(const uint8_t* data, if (flags.fin) { FlushAccumulation(); - EndReadable(); + EndReadable(std::nullopt, /* clean_fin = */ true); } else if (reader_ && was_empty) { // Notify the reader once when the accumulator transitions from empty // to non-empty. This wakes the reader exactly once per accumulation @@ -1811,7 +1812,7 @@ void Stream::ReceiveStreamReset(uint64_t final_size, QuicError error) { final_size, error); state()->reset_code = error.code(); - EndReadable(final_size); + EndReadable(final_size, /* clean_fin = */ false); EmitReset(error); } diff --git a/src/quic/streams.h b/src/quic/streams.h index 86cb36b2668985..197372e07d51e3 100644 --- a/src/quic/streams.h +++ b/src/quic/streams.h @@ -306,7 +306,10 @@ class Stream final : public AsyncWrap, void Commit(size_t datalen, bool fin = false); void EndWritable(); - void EndReadable(std::optional maybe_final_size = std::nullopt); + // clean_fin indicates the read side is ending because a real FIN frame was + // received from the peer (as opposed to a reset, a local abort, or the + // session being torn down). Anything else => truncated read. + void EndReadable(std::optional maybe_final_size, bool clean_fin); void EntryRead(size_t amount) override; void BeforePull() override; @@ -384,7 +387,6 @@ class Stream final : public AsyncWrap, // Gets a reader for the data received for this stream from the peer, BaseObjectPtr get_reader(); - void set_final_size(uint64_t amount); void set_outbound(std::shared_ptr source); // Streaming outbound support diff --git a/test/parallel/test-quic-stream-iteration-reset.mjs b/test/parallel/test-quic-stream-iteration-reset.mjs index 7b0d077064c563..475c04cf3b227b 100644 --- a/test/parallel/test-quic-stream-iteration-reset.mjs +++ b/test/parallel/test-quic-stream-iteration-reset.mjs @@ -1,13 +1,15 @@ // Flags: --experimental-quic --experimental-stream-iter --no-warnings -// Test: peer RESET_STREAM causes iterator to error. -// When the server resets the stream, the client's async iterator -// should throw or return early. +// Test: a peer RESET_STREAM truncates the readable. The async iterator +// delivers the data received before the reset, then throws +// ERR_QUIC_STREAM_RESET (carrying the peer's code) at the end - rather than +// ending cleanly. import { hasQuic, skip, mustCall } from '../common/index.mjs'; -import * as assert from 'node:assert'; +import { setTimeout as delay } from 'node:timers/promises'; +import assert from 'node:assert'; -const { ok, rejects } = assert; +const { strictEqual } = assert; if (!hasQuic) { skip('QUIC is not enabled'); @@ -15,52 +17,39 @@ if (!hasQuic) { const { listen, connect } = await import('../common/quic.mjs'); -const encoder = new TextEncoder(); - -const serverReady = Promise.withResolvers(); - const serverEndpoint = await listen(mustCall((serverSession) => { serverSession.onstream = mustCall(async (stream) => { - // Reset the stream from the server side. + stream.writer.write(new Uint8Array(1000).fill(7)); + while (stream.stats.maxOffsetAcknowledged < 1000n) await delay(5); stream.resetStream(42n); - await rejects(stream.closed, mustCall((err) => { - assert.ok(err); - return true; - })); - serverReady.resolve(); - await serverSession.closed; + stream.closed.catch(() => {}); }); -}), { transportParams: { maxIdleTimeout: 1 } }); +})); const clientSession = await connect(serverEndpoint.address, { transportParams: { maxIdleTimeout: 1 }, }); await clientSession.opened; -const stream = await clientSession.createBidirectionalStream({ - body: encoder.encode('will be reset by server'), -}); - -// Set up the closed handler before the reset to avoid unhandled rejection. -const closedPromise = rejects(stream.closed, mustCall((err) => { - assert.ok(err); - return true; -})); - -await serverReady.promise; +// Keep our write side open so the stream stays alive while we read. +const stream = await clientSession.createBidirectionalStream(); +await stream.writer.write(new Uint8Array([1])); +stream.closed.catch(() => {}); -// The async iterator should either throw or return early when the -// peer resets the readable side. +let received = 0; +let threw; try { - for await (const batch of stream) { - // May receive some data before the reset arrives. - ok(Array.isArray(batch)); + for await (const chunk of stream) { + for (const c of chunk) received += c.byteLength; } -} catch { - // The iterator may throw when the reset arrives mid-iteration. +} catch (err) { + threw = err; } -// Either way, the stream should close. -await closedPromise; -await clientSession.closed; +// The buffered data was delivered before the error. +strictEqual(received, 1000); +// The reset surfaced as a reset error (with its code), not a clean end. +strictEqual(threw?.code, 'ERR_QUIC_STREAM_RESET'); + +clientSession.close(); await serverEndpoint.close(); diff --git a/test/parallel/test-quic-stream-iteration-truncated.mjs b/test/parallel/test-quic-stream-iteration-truncated.mjs new file mode 100644 index 00000000000000..1340f95fc5ac4b --- /dev/null +++ b/test/parallel/test-quic-stream-iteration-truncated.mjs @@ -0,0 +1,58 @@ +// Flags: --experimental-quic --experimental-stream-iter --no-warnings + +// Test: a readable stream truncated by the connection idle timeout delivers +// the data it received, then surfaces the truncation as an error at the end of +// iteration - rather than a silent clean end-of-stream that would make an +// incomplete stream look complete. + +import { hasQuic, skip, mustCall } from '../common/index.mjs'; +import assert from 'node:assert'; + +const { strictEqual } = assert; + +if (!hasQuic) { + skip('QUIC is not enabled'); +} + +const { listen, connect } = await import('../common/quic.mjs'); + +// The body sends 1000 bytes then hangs (never a FIN), so the only thing that +// ends the client's read side is the connection idle timeout. +async function* stallingBody() { + yield new Uint8Array(1000).fill(7); + await new Promise(() => {}); +} + +const serverEndpoint = await listen(mustCall((serverSession) => { + serverSession.onstream = mustCall((stream) => { + stream.setBody(stallingBody()); + stream.closed.catch(() => {}); + }); +})); + +const clientSession = await connect(serverEndpoint.address, { + // Short connection idle timeout so the truncation happens quickly. + transportParams: { maxIdleTimeout: 1 }, +}); +await clientSession.opened; + +const stream = await clientSession.createBidirectionalStream(); +await stream.writer.write(new Uint8Array([1])); +stream.closed.catch(() => {}); + +let received = 0; +let threw; +try { + for await (const chunk of stream) { + for (const c of chunk) received += c.byteLength; + } +} catch (err) { + threw = err; +} + +// All the buffered data was delivered before the error. +strictEqual(received, 1000); +// The truncation surfaced as an error at the end, not a clean end-of-stream. +strictEqual(threw?.code, 'ERR_QUIC_STREAM_ABORTED'); + +await serverEndpoint.close(); diff --git a/test/parallel/test-quic-stream-setbody-errors.mjs b/test/parallel/test-quic-stream-setbody-errors.mjs index 4b41ac4cb66ea3..d1f1e8f01d210e 100644 --- a/test/parallel/test-quic-stream-setbody-errors.mjs +++ b/test/parallel/test-quic-stream-setbody-errors.mjs @@ -55,7 +55,13 @@ await clientSession.opened; message: /writer already accessed/, }); - for await (const _ of stream) { /* drain */ } // eslint-disable-line no-unused-vars + // The server handles only the first stream and then closes its session, so + // this stream is never answered and never receives a FIN. Reading it + // therefore surfaces the truncation rather than ending cleanly. + await assert.rejects((async () => { + // eslint-disable-next-line no-unused-vars + for await (const _ of stream) { /* drain */ } + })(), { code: 'ERR_QUIC_STREAM_ABORTED' }); await stream.closed; }