Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions lib/internal/blob.js
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,12 @@ function createBlobReaderStream(reader) {
const kMaxBatchChunks = 16;

async function* createBlobReaderIterable(reader, options = kEmptyObject) {
const { getReadError } = options;
// getEndError(status) lets the caller map a read ending to an error, or null.
// It is consulted both on a read failure (status < 0, always an error - we
// fall back to a generic one if the caller returns null) and on a clean EOS
// (status 0, an error only if the caller returns one, e.g. a stream that
// ended before its FIN).
const { getEndError } = options;
let wakeup = PromiseWithResolvers();
reader.setWakeup(wakeup.resolve);

Expand All @@ -568,8 +573,8 @@ async function* createBlobReaderIterable(reader, options = kEmptyObject) {
break;
}
if (pullResult.status < 0) {
error = typeof getReadError === 'function' ?
getReadError(pullResult.status) :
error = (typeof getEndError === 'function' &&
getEndError(pullResult.status)) ||
new ERR_INVALID_STATE('The reader is not readable');
break;
}
Expand All @@ -585,7 +590,11 @@ async function* createBlobReaderIterable(reader, options = kEmptyObject) {
yield batch;
}

if (eos) return;
if (eos) {
const eosError = typeof getEndError === 'function' ? getEndError(0) : null;
if (eosError) throw eosError;
return;
}
if (error) throw error;

if (blocked) {
Expand Down
52 changes: 29 additions & 23 deletions lib/internal/quic/quic.js
Original file line number Diff line number Diff line change
Expand Up @@ -1643,30 +1643,36 @@ class QuicStream {
// Non-readable stream (outbound-only unidirectional, or closed)
if (!inner.reader) return;

yield* createBlobReaderIterable(inner.reader, {
getReadError: () => {
// The read side ends for one of three reasons:
// * Clean FIN received from the peer (state.finReceived
// === true). The iterator stops without calling this;
// fall through to the generic state error if it does.
// * Peer sent us a RESET_STREAM. The C++ side records the
// code in state.resetCode regardless of whether the JS
// onreset handler was attached. state.finReceived stays
// false because no FIN was seen.
// * We aborted locally via stream.resetStream() or
// stream.stopSending(). Both paths run EndReadable in
// C++, setting state.readEnded without setting
// state.finReceived. There is no peer code to surface.
if (inner.state.readEnded && !inner.state.finReceived) {
const peerResetCode = inner.state.resetCode;
if (peerResetCode !== undefined && peerResetCode > 0n) {
return new ERR_QUIC_STREAM_RESET(Number(peerResetCode));
}
return new ERR_QUIC_STREAM_ABORTED(
'Stream aborted before FIN was received');
// Maps the read side's end state to the error to surface, or null if it
// ended cleanly. state.finReceived is set only when the peer explicitly
// sent a FIN, confirming we got the whole stream - the one and only clean
// ending. Returns null in that case. Without a FIN the read side is
// truncated, for one of these reasons:
// * Peer sent us a RESET_STREAM. The C++ side records the
// code in state.resetCode regardless of whether the JS
// onreset handler was attached. state.finReceived stays
// false because no FIN was seen.
// * We aborted locally via stream.resetStream() or
// stream.stopSending(). Both paths run EndReadable in
// C++, setting state.readEnded without setting
// state.finReceived. There is no peer code to surface.
// * The session was torn down before a FIN arrived (a local or peer
// connection close, an idle timeout, or any error), again leaving
// state.finReceived false.
const readTruncationError = () => {
if (inner.state.readEnded && !inner.state.finReceived) {
const peerResetCode = inner.state.resetCode;
if (peerResetCode !== undefined && peerResetCode > 0n) {
return new ERR_QUIC_STREAM_RESET(Number(peerResetCode));
}
return new ERR_INVALID_STATE('The stream is not readable');
},
return new ERR_QUIC_STREAM_ABORTED(
'Stream aborted before FIN was received');
}
return null;
};

yield* createBlobReaderIterable(inner.reader, {
getEndError: readTruncationError,
});
}

Expand Down
33 changes: 17 additions & 16 deletions src/quic/streams.cc
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ struct Stream::Impl {
code = args[0].As<BigInt>()->Uint64Value(&unused);
}

stream->EndReadable();
stream->EndReadable(std::nullopt, /* clean_fin = */ false);

if (!stream->is_pending()) {
// If the stream is a local unidirectional there's nothing to do here.
Expand Down Expand Up @@ -1415,13 +1415,6 @@ BaseObjectPtr<Blob::Reader> Stream::get_reader() {
return reader;
}

void Stream::set_final_size(uint64_t final_size) {
DCHECK_IMPLIES(state()->fin_received == 1,
final_size <= STAT_GET(Stats, final_size));
state()->fin_received = 1;
STAT_SET(Stats, final_size, final_size);
}

void Stream::set_outbound(std::shared_ptr<DataQueue> source) {
if (!source || !is_writable()) return;
Debug(this, "Setting the outbound data source");
Expand Down Expand Up @@ -1616,13 +1609,20 @@ void Stream::EndWritable() {
state()->write_ended = 1;
}

void Stream::EndReadable(std::optional<uint64_t> maybe_final_size) {
void Stream::EndReadable(std::optional<uint64_t> maybe_final_size,
bool clean_fin) {
if (!is_readable()) return;
state()->read_ended = 1;
// fin_received marks a clean completion of the read side (a real FIN). Any
// unclean end (reset/abort/session teardown) truncates the stream, which
// the JS reader will expose as a read error later.
if (clean_fin) state()->fin_received = 1;
// Flush any accumulated data before capping so the reader can see it.
FlushAccumulation();
set_final_size(maybe_final_size.value_or(STAT_GET(Stats, bytes_received)));
inbound_->cap(STAT_GET(Stats, final_size));
const uint64_t final_size =
maybe_final_size.value_or(STAT_GET(Stats, bytes_received));
STAT_SET(Stats, final_size, final_size);
inbound_->cap(final_size);
// Notify the JS reader so it can see EOS. Pass fin=true so the
// wakeup promise resolves with a value the iterator can check to
// avoid waiting for another wakeup that will never come.
Expand Down Expand Up @@ -1651,8 +1651,9 @@ void Stream::Destroy(QuicError error) {
// End the writable before marking as destroyed.
EndWritable();

// Also end the readable side if it isn't already.
EndReadable();
// Also end the readable side if it isn't already. If not already ended,
// this will eventually surface as a error, since the data is truncated.
EndReadable(std::nullopt, /* clean_fin = */ false);

// We are going to release our reference to the outbound_ queue here.
outbound_.reset();
Expand Down Expand Up @@ -1699,7 +1700,7 @@ void Stream::ReceiveData(const uint8_t* data,
// end the readable side if this is the last bit of data we've received.
Debug(this, "Receiving %zu bytes of data", len);
if (state()->read_ended == 1 || len == 0) {
if (flags.fin) EndReadable();
if (flags.fin) EndReadable(std::nullopt, /* clean_fin = */ true);
return;
}

Expand Down Expand Up @@ -1772,7 +1773,7 @@ void Stream::ReceiveData(const uint8_t* data,

if (flags.fin) {
FlushAccumulation();
EndReadable();
EndReadable(std::nullopt, /* clean_fin = */ true);
} else if (reader_ && was_empty) {
// Notify the reader once when the accumulator transitions from empty
// to non-empty. This wakes the reader exactly once per accumulation
Expand Down Expand Up @@ -1811,7 +1812,7 @@ void Stream::ReceiveStreamReset(uint64_t final_size, QuicError error) {
final_size,
error);
state()->reset_code = error.code();
EndReadable(final_size);
EndReadable(final_size, /* clean_fin = */ false);
EmitReset(error);
}

Expand Down
6 changes: 4 additions & 2 deletions src/quic/streams.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,10 @@ class Stream final : public AsyncWrap,
void Commit(size_t datalen, bool fin = false);

void EndWritable();
void EndReadable(std::optional<uint64_t> maybe_final_size = std::nullopt);
// clean_fin indicates the read side is ending because a real FIN frame was
// received from the peer (as opposed to a reset, a local abort, or the
// session being torn down). Anything else => truncated read.
void EndReadable(std::optional<uint64_t> maybe_final_size, bool clean_fin);
void EntryRead(size_t amount) override;
void BeforePull() override;

Expand Down Expand Up @@ -384,7 +387,6 @@ class Stream final : public AsyncWrap,
// Gets a reader for the data received for this stream from the peer,
BaseObjectPtr<Blob::Reader> get_reader();

void set_final_size(uint64_t amount);
void set_outbound(std::shared_ptr<DataQueue> source);

// Streaming outbound support
Expand Down
65 changes: 27 additions & 38 deletions test/parallel/test-quic-stream-iteration-reset.mjs
Original file line number Diff line number Diff line change
@@ -1,66 +1,55 @@
// Flags: --experimental-quic --experimental-stream-iter --no-warnings

// Test: peer RESET_STREAM causes iterator to error.
// When the server resets the stream, the client's async iterator
// should throw or return early.
// Test: a peer RESET_STREAM truncates the readable. The async iterator
// delivers the data received before the reset, then throws
// ERR_QUIC_STREAM_RESET (carrying the peer's code) at the end - rather than
// ending cleanly.

import { hasQuic, skip, mustCall } from '../common/index.mjs';
import * as assert from 'node:assert';
import { setTimeout as delay } from 'node:timers/promises';
import assert from 'node:assert';

const { ok, rejects } = assert;
const { strictEqual } = assert;

if (!hasQuic) {
skip('QUIC is not enabled');
}

const { listen, connect } = await import('../common/quic.mjs');

const encoder = new TextEncoder();

const serverReady = Promise.withResolvers();

const serverEndpoint = await listen(mustCall((serverSession) => {
serverSession.onstream = mustCall(async (stream) => {
// Reset the stream from the server side.
stream.writer.write(new Uint8Array(1000).fill(7));
while (stream.stats.maxOffsetAcknowledged < 1000n) await delay(5);
stream.resetStream(42n);
await rejects(stream.closed, mustCall((err) => {
assert.ok(err);
return true;
}));
serverReady.resolve();
await serverSession.closed;
stream.closed.catch(() => {});
});
}), { transportParams: { maxIdleTimeout: 1 } });
}));

const clientSession = await connect(serverEndpoint.address, {
transportParams: { maxIdleTimeout: 1 },
});
await clientSession.opened;

const stream = await clientSession.createBidirectionalStream({
body: encoder.encode('will be reset by server'),
});

// Set up the closed handler before the reset to avoid unhandled rejection.
const closedPromise = rejects(stream.closed, mustCall((err) => {
assert.ok(err);
return true;
}));

await serverReady.promise;
// Keep our write side open so the stream stays alive while we read.
const stream = await clientSession.createBidirectionalStream();
await stream.writer.write(new Uint8Array([1]));
stream.closed.catch(() => {});

// The async iterator should either throw or return early when the
// peer resets the readable side.
let received = 0;
let threw;
try {
for await (const batch of stream) {
// May receive some data before the reset arrives.
ok(Array.isArray(batch));
for await (const chunk of stream) {
for (const c of chunk) received += c.byteLength;
}
} catch {
// The iterator may throw when the reset arrives mid-iteration.
} catch (err) {
threw = err;
}

// Either way, the stream should close.
await closedPromise;
await clientSession.closed;
// The buffered data was delivered before the error.
strictEqual(received, 1000);
// The reset surfaced as a reset error (with its code), not a clean end.
strictEqual(threw?.code, 'ERR_QUIC_STREAM_RESET');

clientSession.close();
await serverEndpoint.close();
58 changes: 58 additions & 0 deletions test/parallel/test-quic-stream-iteration-truncated.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Flags: --experimental-quic --experimental-stream-iter --no-warnings

// Test: a readable stream truncated by the connection idle timeout delivers
// the data it received, then surfaces the truncation as an error at the end of
// iteration - rather than a silent clean end-of-stream that would make an
// incomplete stream look complete.

import { hasQuic, skip, mustCall } from '../common/index.mjs';
import assert from 'node:assert';

const { strictEqual } = assert;

if (!hasQuic) {
skip('QUIC is not enabled');
}

const { listen, connect } = await import('../common/quic.mjs');

// The body sends 1000 bytes then hangs (never a FIN), so the only thing that
// ends the client's read side is the connection idle timeout.
async function* stallingBody() {
yield new Uint8Array(1000).fill(7);
await new Promise(() => {});
}

const serverEndpoint = await listen(mustCall((serverSession) => {
serverSession.onstream = mustCall((stream) => {
stream.setBody(stallingBody());
stream.closed.catch(() => {});
});
}));

const clientSession = await connect(serverEndpoint.address, {
// Short connection idle timeout so the truncation happens quickly.
transportParams: { maxIdleTimeout: 1 },
});
await clientSession.opened;

const stream = await clientSession.createBidirectionalStream();
await stream.writer.write(new Uint8Array([1]));
stream.closed.catch(() => {});

let received = 0;
let threw;
try {
for await (const chunk of stream) {
for (const c of chunk) received += c.byteLength;
}
} catch (err) {
threw = err;
}

// All the buffered data was delivered before the error.
strictEqual(received, 1000);
// The truncation surfaced as an error at the end, not a clean end-of-stream.
strictEqual(threw?.code, 'ERR_QUIC_STREAM_ABORTED');

await serverEndpoint.close();
8 changes: 7 additions & 1 deletion test/parallel/test-quic-stream-setbody-errors.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,13 @@ await clientSession.opened;
message: /writer already accessed/,
});

for await (const _ of stream) { /* drain */ } // eslint-disable-line no-unused-vars
// The server handles only the first stream and then closes its session, so
// this stream is never answered and never receives a FIN. Reading it
// therefore surfaces the truncation rather than ending cleanly.
await assert.rejects((async () => {
// eslint-disable-next-line no-unused-vars
for await (const _ of stream) { /* drain */ }
})(), { code: 'ERR_QUIC_STREAM_ABORTED' });
await stream.closed;
}

Expand Down
Loading