Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/task-graph/src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export * from "./task-graph/ITaskGraph";
export * from "./task-graph/RunContext";
export * from "./task-graph/RunScheduler";
export * from "./task-graph/StreamPump";
export * from "./task-graph/SubGraphEventBridge";
export * from "./task-graph/TaskGraph";
export * from "./task-graph/TaskGraphEvents";
export * from "./task-graph/TaskGraphRunner";
Expand Down
55 changes: 53 additions & 2 deletions packages/task-graph/src/task-graph/SubGraphEventBridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,27 @@
* SPDX-License-Identifier: Apache-2.0
*/

import { getLogger } from "@workglow/util";
import type { TaskGraph } from "./TaskGraph";

/**
* Default maximum bridge nesting depth before re-emit listeners are dropped.
* Each level installs one listener per event type and re-emits up; without a
* cap, a pathologically nested compound task (e.g. a MapTask containing a
* GraphAsTask containing a MapTask…) amplifies a single inner emit into N
* parent emits before reaching any wire subscriber, and downstream consumers
* with a bounded event log can evict legitimate events under sustained
* fan-out.
*/
const DEFAULT_MAX_BRIDGE_DEPTH = 16;

/**
* Symbol-keyed marker we attach to each subgraph so nested calls can derive
* the current depth from the parent without changing call sites. Stored on a
* symbol to avoid colliding with any user-set property.
*/
const BRIDGE_DEPTH = Symbol.for("@workglow/task-graph/SubGraphEventBridge.depth");

/**
* Forward a subgraph's per-task events (task_complete, task_progress, and the
* task_stream_* trio) up to the parent graph, so tasks nested inside a compound
Expand All @@ -14,16 +33,39 @@ import type { TaskGraph } from "./TaskGraph";
* and progress. Bubbles recursively: a nested compound forwards its own
* subgraph to its parent, which forwards onward.
*
* Depth is tracked on the parent graph via a symbol-keyed marker so callers do
* not need to thread a counter through. Once `depth >= maxDepth` the call
* degrades to a no-op (with a single warn log) — see {@link DEFAULT_MAX_BRIDGE_DEPTH}.
*
* @returns a teardown that unsubscribes every bridged listener. Callers MUST
* invoke it in a `finally` so a rejecting/aborted/early-terminated subgraph
* run cannot leak subscriptions (which would double-emit on a later run).
*/
export function bridgeSubGraphTaskEvents(subGraph: TaskGraph, parentGraph: TaskGraph): () => void {
export function bridgeSubGraphTaskEvents(
subGraph: TaskGraph,
parentGraph: TaskGraph,
depth: number = (parentGraph as unknown as Record<symbol, number>)[BRIDGE_DEPTH] ?? 0,
maxDepth: number = DEFAULT_MAX_BRIDGE_DEPTH
): () => void {
// A subgraph bridging to itself would re-emit each event back onto the same
// graph it just observed, looping forever. This cannot arise from normal
// composition (a compound task's subGraph and parentGraph are distinct
// instances) but guard anyway so a malformed hierarchy degrades to a no-op.
if (subGraph === parentGraph) return () => {};
if (depth >= maxDepth) {
getLogger().warn("bridgeSubGraphTaskEvents depth cap hit; dropping bridge", {
depth,
maxDepth,
});
return () => {};
}

// Stamp the subgraph with its bridge depth so any nested bridge call (whose
// parentGraph is this subgraph) derives `depth + 1` automatically.
const subGraphWithDepth = subGraph as unknown as Record<symbol, number>;
const previousDepth = subGraphWithDepth[BRIDGE_DEPTH];
subGraphWithDepth[BRIDGE_DEPTH] = depth + 1;

const offs = [
subGraph.subscribe("task_complete", (id, out) => parentGraph.emit("task_complete", id, out)),
subGraph.subscribe("task_progress", (id, p, m, ...a) =>
Expand All @@ -37,5 +79,14 @@ export function bridgeSubGraphTaskEvents(subGraph: TaskGraph, parentGraph: TaskG
parentGraph.emit("task_stream_end", id, out)
),
];
return () => offs.forEach((off) => off());
return () => {
offs.forEach((off) => off());
// Restore the previous depth marker so a later, independently-rooted bridge
// of the same subgraph instance does not inherit a stale counter.
if (previousDepth === undefined) {
delete subGraphWithDepth[BRIDGE_DEPTH];
} else {
subGraphWithDepth[BRIDGE_DEPTH] = previousDepth;
}
};
}
114 changes: 113 additions & 1 deletion packages/test/src/test/task-graph/TaskCompleteEvent.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import type { IExecuteContext, StreamEvent } from "@workglow/task-graph";
import {
bridgeSubGraphTaskEvents,
Dataflow,
FallbackTask,
GraphAsTask,
Expand All @@ -16,8 +17,10 @@ import {
WhileTask,
Workflow,
} from "@workglow/task-graph";
import type { ILogger } from "@workglow/util";
import { NullLogger, setLogger } from "@workglow/util";
import type { DataPortSchema } from "@workglow/util/schema";
import { describe, expect, it } from "vitest";
import { afterEach, describe, expect, it, vi } from "vitest";

class TCEAddOne extends Task<{ value: number }, { value: number }> {
static override readonly type = "TCEAddOne";
Expand Down Expand Up @@ -398,3 +401,112 @@ describe("task_complete graph event", () => {
expect(seen.filter((id) => id === "leaky")).toHaveLength(1);
});
});

describe("bridgeSubGraphTaskEvents depth cap", () => {
// Each test installs a stub logger and restores a NullLogger after, so warn
// spies do not leak into other tests (and we never emit to the real console).
let warnSpy: ReturnType<typeof vi.fn>;

function installStubLogger(): void {
warnSpy = vi.fn();
const stub: ILogger = {
debug: () => {},
info: () => {},
warn: warnSpy as unknown as ILogger["warn"],
error: () => {},
fatal: () => {},
child() {
return stub;
},
time: () => {},
timeEnd: () => {},
group: () => {},
groupEnd: () => {},
};
setLogger(stub);
}

afterEach(() => {
setLogger(new NullLogger());
});

it("stops bridging past maxDepth (default 16)", () => {
installStubLogger();
// Build a chain of TaskGraphs: g0 (top) -> g1 -> g2 -> ... -> gN (innermost).
// Bridging is chained so each gK forwards into gK-1, mirroring how a stack
// of GraphAsTask wrappers nests bridges at run time.
const N = 20;
const graphs: TaskGraph[] = [];
for (let i = 0; i <= N; i++) graphs.push(new TaskGraph());

const unbridges: Array<() => void> = [];
for (let i = 1; i <= N; i++) {
// graphs[i] is the inner subgraph; graphs[i-1] is its parent.
unbridges.push(bridgeSubGraphTaskEvents(graphs[i], graphs[i - 1]));
}

// Emit a task_progress from the innermost graph and count how many bridges
// it traversed up to the outermost. Without the cap this would re-emit N
// times (once per parent); with the cap of 16 it stops after 16 hops.
const reachedDepths: number[] = [];
for (let i = 0; i < graphs.length; i++) {
const depth = i;
graphs[i].subscribe("task_progress", () => {
reachedDepths.push(depth);
});
}

graphs[N].emit("task_progress", "inner-id", 42, "msg");

// Innermost (depth N) saw the original emit; bridges propagate up at most
// 16 levels (the default cap), so the highest depth reached is N - 16.
const minReached = Math.min(...reachedDepths);
expect(minReached).toBeGreaterThanOrEqual(N - 16);
// The cap fired (depth reached 16), producing at least one warn.
expect(warnSpy).toHaveBeenCalled();

unbridges.forEach((off) => off());
});

it("logs warning with depth fields when the cap is hit", () => {
installStubLogger();
const N = 18;
const graphs: TaskGraph[] = [];
for (let i = 0; i <= N; i++) graphs.push(new TaskGraph());
const unbridges: Array<() => void> = [];
for (let i = 1; i <= N; i++) {
unbridges.push(bridgeSubGraphTaskEvents(graphs[i], graphs[i - 1]));
}

expect(warnSpy).toHaveBeenCalled();
const firstCall = warnSpy.mock.calls[0];
expect(firstCall[0]).toBe("bridgeSubGraphTaskEvents depth cap hit; dropping bridge");
expect(firstCall[1]).toEqual({ depth: 16, maxDepth: 16 });

unbridges.forEach((off) => off());
});

it("accepts a custom maxDepth override", () => {
installStubLogger();
const N = 6;
const graphs: TaskGraph[] = [];
for (let i = 0; i <= N; i++) graphs.push(new TaskGraph());

const unbridges: Array<() => void> = [];
for (let i = 1; i <= N; i++) {
// Explicitly pass maxDepth=4 to each level. Depth is derived from the
// parent's stamped marker, so the first call to exceed the cap is the
// bridge whose computed depth is 4.
unbridges.push(bridgeSubGraphTaskEvents(graphs[i], graphs[i - 1], undefined, 4));
}

expect(warnSpy).toHaveBeenCalled();
const firstCall = warnSpy.mock.calls.find(
(c) => c[0] === "bridgeSubGraphTaskEvents depth cap hit; dropping bridge"
);
expect(firstCall).toBeDefined();
expect(firstCall![1]).toEqual({ depth: 4, maxDepth: 4 });

unbridges.forEach((off) => off());
});
});
Loading