Skip to content

fix(libs): janitor live-run guard, Container dispose-on-replace, vector default parity, StreamPump cancel#604

Open
sroussey wants to merge 4 commits into
claude/brave-ritchie-n3tiztfrom
claude/beautiful-mayer-74ff08
Open

fix(libs): janitor live-run guard, Container dispose-on-replace, vector default parity, StreamPump cancel#604
sroussey wants to merge 4 commits into
claude/brave-ritchie-n3tiztfrom
claude/beautiful-mayer-74ff08

Conversation

@sroussey

Copy link
Copy Markdown
Collaborator

Stacks on #602. Four independent follow-up fixes.

1. (CRITICAL) CacheJanitor live-run guard

sweepStaleRunPrivate called clearOlderThan unconditionally — createdAt is the row write time, not run liveness, so a long-running in-flight run whose rows predate the cutoff would have its cache deleted out from under it. CacheJanitor now accepts a liveRunIds: () => Iterable<string> snapshot (default () => [] + one-time console.warn) and passes the live set into clearOlderThan(olderThanInMs, excludeRunIds). The repo enumerates distinct old runIds and calls deleteRunOlderThan per runId not in the exclude set (ITabularStorage has no NOT IN operator).

2. (HIGH) Container.register dispose-on-replace

register / registerInstance overwrote a cached singleton without invoking its disposer. A DB connection, file handle, or event subscriber re-registered would leak the original. Extract the disposer chain into invokeDisposer (protocol probe) + disposeService (eviction wrapper that catches errors with console.warn so a buggy disposer cannot block re-registration). dispose() retains its AggregateError collection by calling invokeDisposer directly. Inherited (parent-owned) instances are skipped.

3. (HIGH) Vector default scoreThreshold parity

InMemoryVectorStorage and IndexedDbVectorStorage defaulted scoreThreshold to -Infinity; every SQL backend (Sqlite / Postgres / Supabase / SqliteAi) defaults to 0. Identical app code returned different result sets depending on the backing — a portability foot-gun. Switched in-memory + IndexedDB defaults to 0. Callers that want every result regardless of correlation pass scoreThreshold: -Infinity explicitly. Inverted the two existing validation tests asserting the old behaviour.

4. (HIGH) StreamPump.createStreamFromTaskEvents cancel handler

The wired stream_chunk / stream_end / status listeners only released on a normal stream_end or terminal task status. A consumer calling reader.cancel() mid-stream left every listener attached: the task kept emitting into a closed controller and the listeners pinned GC. Hoist cleanup out of start() so the cancel handler can call it. cleanup is idempotent.

Verification

  • bun run build:packages clean (incl. types).
  • bun scripts/test.ts graph vitest — 736 passed.
  • bun scripts/test.ts util vitest — 648 passed, 10 skipped.
  • bun scripts/test.ts storage vitest — 1484 passed, 13 skipped.

Generated by Claude Code

claude added 4 commits June 28, 2026 08:29
…ht cache-row deletion

CacheJanitor.sweepStaleRunPrivate previously called
RunPrivateTaskOutputRepository.clearOlderThan unconditionally, which deleted
every row older than the cutoff — including rows belonging to long-running
in-flight runs (createdAt is the row write time, not run liveness).

Add a liveRunIds: () => Iterable<string> snapshot accessor on CacheJanitor.
Default to () => [] with a one-time console.warn since defaulting is unsafe
when any run is ever concurrent with the sweep. Plumb the snapshot Set
through to clearOlderThan, which now takes excludeRunIds and, when present,
enumerates distinct old runIds and calls deleteRunOlderThan per runId not in
the exclude set (ITabularStorage has no NOT IN operator).
…ment

Container.register and Container.registerInstance overwrote a cached singleton
without invoking its disposer protocol. A DB connection, file handle, or
event subscriber registered then re-registered would leak the original.

Extract the disposer chain into invokeDisposer (the protocol probe) and a
disposeService eviction wrapper that catches errors with console.warn so a
buggy disposer cannot block re-registration. dispose() retains its own
try/catch + AggregateError collection by calling invokeDisposer directly.

Inherited (parent-owned) instances are skipped — child cannot dispose an
instance the parent still hands out.
…ld to 0 (match SQL backends)

InMemoryVectorStorage and IndexedDbVectorStorage defaulted scoreThreshold to
-Infinity, while every SQL backend (Sqlite / Postgres / Supabase /
SqliteAiVectorStorage) defaults to 0. Identical app code returned a different
result set depending on the backing — a portability foot-gun.

Switch in-memory + IndexedDB defaults to 0. Callers that want every result
regardless of correlation pass scoreThreshold: -Infinity explicitly (which is
exactly what the SQL backends already required). Inverted the two validation
tests that previously asserted the negative-correlation behaviour.
…mFromTaskEvents

createStreamFromTaskEvents wired stream_chunk / stream_end / status
listeners onto the source task but only released them on a normal stream_end
or terminal task status. A consumer that called reader.cancel() mid-stream
(downstream abort, consumer-side limit hit) left every listener attached:
the task kept emitting into a closed controller and the listeners pinned GC.

Hoist the shared cleanup closure to underlying-source scope so the
ReadableStream cancel handler can call it. cleanup is already idempotent
via the closed flag, so concurrent cancel + stream_end is safe.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants