diff --git a/packages/task-graph/src/common.ts b/packages/task-graph/src/common.ts index b995546c3..eaa1f2ced 100644 --- a/packages/task-graph/src/common.ts +++ b/packages/task-graph/src/common.ts @@ -16,6 +16,7 @@ export * from "./task-graph/ITaskGraph"; export * from "./task-graph/RunContext"; export * from "./task-graph/RunScheduler"; export * from "./task-graph/StreamPump"; +export * from "./task-graph/SubGraphEventBridge"; export * from "./task-graph/TaskGraph"; export * from "./task-graph/TaskGraphEvents"; export * from "./task-graph/TaskGraphRunner"; diff --git a/packages/task-graph/src/task-graph/SubGraphEventBridge.ts b/packages/task-graph/src/task-graph/SubGraphEventBridge.ts index 24d199f4b..a847b0e68 100644 --- a/packages/task-graph/src/task-graph/SubGraphEventBridge.ts +++ b/packages/task-graph/src/task-graph/SubGraphEventBridge.ts @@ -4,8 +4,27 @@ * SPDX-License-Identifier: Apache-2.0 */ +import { getLogger } from "@workglow/util"; import type { TaskGraph } from "./TaskGraph"; +/** + * Default maximum bridge nesting depth before re-emit listeners are dropped. + * Each level installs one listener per event type and re-emits up; without a + * cap, a pathologically nested compound task (e.g. a MapTask containing a + * GraphAsTask containing a MapTask…) amplifies a single inner emit into N + * parent emits before reaching any wire subscriber, and downstream consumers + * with a bounded event log can evict legitimate events under sustained + * fan-out. + */ +const DEFAULT_MAX_BRIDGE_DEPTH = 16; + +/** + * Symbol-keyed marker we attach to each subgraph so nested calls can derive + * the current depth from the parent without changing call sites. Stored on a + * symbol to avoid colliding with any user-set property. + */ +const BRIDGE_DEPTH = Symbol.for("@workglow/task-graph/SubGraphEventBridge.depth"); + /** * Forward a subgraph's per-task events (task_complete, task_progress, and the * task_stream_* trio) up to the parent graph, so tasks nested inside a compound @@ -14,16 +33,39 @@ import type { TaskGraph } from "./TaskGraph"; * and progress. Bubbles recursively: a nested compound forwards its own * subgraph to its parent, which forwards onward. * + * Depth is tracked on the parent graph via a symbol-keyed marker so callers do + * not need to thread a counter through. Once `depth >= maxDepth` the call + * degrades to a no-op (with a single warn log) — see {@link DEFAULT_MAX_BRIDGE_DEPTH}. + * * @returns a teardown that unsubscribes every bridged listener. Callers MUST * invoke it in a `finally` so a rejecting/aborted/early-terminated subgraph * run cannot leak subscriptions (which would double-emit on a later run). */ -export function bridgeSubGraphTaskEvents(subGraph: TaskGraph, parentGraph: TaskGraph): () => void { +export function bridgeSubGraphTaskEvents( + subGraph: TaskGraph, + parentGraph: TaskGraph, + depth: number = (parentGraph as unknown as Record)[BRIDGE_DEPTH] ?? 0, + maxDepth: number = DEFAULT_MAX_BRIDGE_DEPTH +): () => void { // A subgraph bridging to itself would re-emit each event back onto the same // graph it just observed, looping forever. This cannot arise from normal // composition (a compound task's subGraph and parentGraph are distinct // instances) but guard anyway so a malformed hierarchy degrades to a no-op. if (subGraph === parentGraph) return () => {}; + if (depth >= maxDepth) { + getLogger().warn("bridgeSubGraphTaskEvents depth cap hit; dropping bridge", { + depth, + maxDepth, + }); + return () => {}; + } + + // Stamp the subgraph with its bridge depth so any nested bridge call (whose + // parentGraph is this subgraph) derives `depth + 1` automatically. + const subGraphWithDepth = subGraph as unknown as Record; + const previousDepth = subGraphWithDepth[BRIDGE_DEPTH]; + subGraphWithDepth[BRIDGE_DEPTH] = depth + 1; + const offs = [ subGraph.subscribe("task_complete", (id, out) => parentGraph.emit("task_complete", id, out)), subGraph.subscribe("task_progress", (id, p, m, ...a) => @@ -37,5 +79,14 @@ export function bridgeSubGraphTaskEvents(subGraph: TaskGraph, parentGraph: TaskG parentGraph.emit("task_stream_end", id, out) ), ]; - return () => offs.forEach((off) => off()); + return () => { + offs.forEach((off) => off()); + // Restore the previous depth marker so a later, independently-rooted bridge + // of the same subgraph instance does not inherit a stale counter. + if (previousDepth === undefined) { + delete subGraphWithDepth[BRIDGE_DEPTH]; + } else { + subGraphWithDepth[BRIDGE_DEPTH] = previousDepth; + } + }; } diff --git a/packages/test/src/test/task-graph/TaskCompleteEvent.test.ts b/packages/test/src/test/task-graph/TaskCompleteEvent.test.ts index 380720f31..50d2f74eb 100644 --- a/packages/test/src/test/task-graph/TaskCompleteEvent.test.ts +++ b/packages/test/src/test/task-graph/TaskCompleteEvent.test.ts @@ -6,6 +6,7 @@ import type { IExecuteContext, StreamEvent } from "@workglow/task-graph"; import { + bridgeSubGraphTaskEvents, Dataflow, FallbackTask, GraphAsTask, @@ -16,8 +17,10 @@ import { WhileTask, Workflow, } from "@workglow/task-graph"; +import type { ILogger } from "@workglow/util"; +import { NullLogger, setLogger } from "@workglow/util"; import type { DataPortSchema } from "@workglow/util/schema"; -import { describe, expect, it } from "vitest"; +import { afterEach, describe, expect, it, vi } from "vitest"; class TCEAddOne extends Task<{ value: number }, { value: number }> { static override readonly type = "TCEAddOne"; @@ -398,3 +401,112 @@ describe("task_complete graph event", () => { expect(seen.filter((id) => id === "leaky")).toHaveLength(1); }); }); + +describe("bridgeSubGraphTaskEvents depth cap", () => { + // Each test installs a stub logger and restores a NullLogger after, so warn + // spies do not leak into other tests (and we never emit to the real console). + let warnSpy: ReturnType; + + function installStubLogger(): void { + warnSpy = vi.fn(); + const stub: ILogger = { + debug: () => {}, + info: () => {}, + warn: warnSpy as unknown as ILogger["warn"], + error: () => {}, + fatal: () => {}, + child() { + return stub; + }, + time: () => {}, + timeEnd: () => {}, + group: () => {}, + groupEnd: () => {}, + }; + setLogger(stub); + } + + afterEach(() => { + setLogger(new NullLogger()); + }); + + it("stops bridging past maxDepth (default 16)", () => { + installStubLogger(); + // Build a chain of TaskGraphs: g0 (top) -> g1 -> g2 -> ... -> gN (innermost). + // Bridging is chained so each gK forwards into gK-1, mirroring how a stack + // of GraphAsTask wrappers nests bridges at run time. + const N = 20; + const graphs: TaskGraph[] = []; + for (let i = 0; i <= N; i++) graphs.push(new TaskGraph()); + + const unbridges: Array<() => void> = []; + for (let i = 1; i <= N; i++) { + // graphs[i] is the inner subgraph; graphs[i-1] is its parent. + unbridges.push(bridgeSubGraphTaskEvents(graphs[i], graphs[i - 1])); + } + + // Emit a task_progress from the innermost graph and count how many bridges + // it traversed up to the outermost. Without the cap this would re-emit N + // times (once per parent); with the cap of 16 it stops after 16 hops. + const reachedDepths: number[] = []; + for (let i = 0; i < graphs.length; i++) { + const depth = i; + graphs[i].subscribe("task_progress", () => { + reachedDepths.push(depth); + }); + } + + graphs[N].emit("task_progress", "inner-id", 42, "msg"); + + // Innermost (depth N) saw the original emit; bridges propagate up at most + // 16 levels (the default cap), so the highest depth reached is N - 16. + const minReached = Math.min(...reachedDepths); + expect(minReached).toBeGreaterThanOrEqual(N - 16); + // The cap fired (depth reached 16), producing at least one warn. + expect(warnSpy).toHaveBeenCalled(); + + unbridges.forEach((off) => off()); + }); + + it("logs warning with depth fields when the cap is hit", () => { + installStubLogger(); + const N = 18; + const graphs: TaskGraph[] = []; + for (let i = 0; i <= N; i++) graphs.push(new TaskGraph()); + const unbridges: Array<() => void> = []; + for (let i = 1; i <= N; i++) { + unbridges.push(bridgeSubGraphTaskEvents(graphs[i], graphs[i - 1])); + } + + expect(warnSpy).toHaveBeenCalled(); + const firstCall = warnSpy.mock.calls[0]; + expect(firstCall[0]).toBe("bridgeSubGraphTaskEvents depth cap hit; dropping bridge"); + expect(firstCall[1]).toEqual({ depth: 16, maxDepth: 16 }); + + unbridges.forEach((off) => off()); + }); + + it("accepts a custom maxDepth override", () => { + installStubLogger(); + const N = 6; + const graphs: TaskGraph[] = []; + for (let i = 0; i <= N; i++) graphs.push(new TaskGraph()); + + const unbridges: Array<() => void> = []; + for (let i = 1; i <= N; i++) { + // Explicitly pass maxDepth=4 to each level. Depth is derived from the + // parent's stamped marker, so the first call to exceed the cap is the + // bridge whose computed depth is 4. + unbridges.push(bridgeSubGraphTaskEvents(graphs[i], graphs[i - 1], undefined, 4)); + } + + expect(warnSpy).toHaveBeenCalled(); + const firstCall = warnSpy.mock.calls.find( + (c) => c[0] === "bridgeSubGraphTaskEvents depth cap hit; dropping bridge" + ); + expect(firstCall).toBeDefined(); + expect(firstCall![1]).toEqual({ depth: 4, maxDepth: 4 }); + + unbridges.forEach((off) => off()); + }); +});