diff --git a/examples/basic_observe.py b/examples/basic_observe.py index 38a4181..2fdc196 100644 --- a/examples/basic_observe.py +++ b/examples/basic_observe.py @@ -43,11 +43,7 @@ ) print(f"call #{i + 1}: {resp.choices[0].message.content!r}") -# 4. Optional: print a coverage snapshot from the runtime instance. -# The same counters are sent over the WS heartbeat and via the -# HTTP-fallback path when the WS connection is down. -print("\nCoverage snapshot:") -rt = nullrun.get_runtime() -report = rt.coverage_report() -for k, v in report.items(): - print(f" {k}: {v}") +# 4. 0.9.0: per-process coverage snapshot removed. Coverage is now +# derived server-side from llm_call span metadata (host + tracked + +# streaming_skipped flags). Query the dashboard or use +# `GET /api/v1/coverage/{org_id}` to inspect. diff --git a/src/nullrun/__init__.py b/src/nullrun/__init__.py index df8689d..a16c542 100644 --- a/src/nullrun/__init__.py +++ b/src/nullrun/__init__.py @@ -41,6 +41,43 @@ def my_agent(query): from nullrun.runtime import track_event, track_llm, track_tool +def shutdown(timeout: float = 2.0) -> None: + """Gracefully shut down the NullRun runtime. + + Sends a clean WebSocket close frame, drains in-flight events, and + stops background threads (HTTP poller, WS push listener). After + this returns, any further ``nullrun.track(...)`` call or + ``@protect``-decorated call is a no-op. + + Audit 2026-06-29 (WS graceful close on exit): a long-running + script that exits via ``sys.exit()`` lets the kernel RST the TCP + socket, which the backend logs as WARN "Connection reset + without closing handshake". Calling ``nullrun.shutdown()`` + before exit (or registering it via ``atexit``) eliminates the + noisy log. No-op if ``init()`` was never called. + + Args: + timeout: seconds to wait for the WS close handshake to + complete before giving up. The underlying + ``NullRunRuntime.shutdown()`` already caps WS join at + 0.5s and the WS close at 2.0s — this parameter is + reserved for future expansion and is currently unused. + + Example:: + + import atexit + import nullrun + atexit.register(nullrun.shutdown) + """ + # Lazy import so the SDK module-import path stays light (mirrors + # the pattern in `init` and `status`). + from nullrun.runtime import NullRunRuntime + runtime = NullRunRuntime._instance # type: ignore[attr-defined] + if runtime is None: + return + runtime.shutdown() + + def status(): """Return the current runtime state as a Layer-3 :class:`NullRunStatus` snapshot. @@ -300,11 +337,10 @@ def my_agent(): auto_instrument(runtime) - # Start the coverage reporter so the backend gets a coverage_report - # event every 60s. Daemon thread; safe to leak across re-init. - # The coverage reporter is a no-op when no LLM traffic has been - # observed (see ``track_coverage``). - runtime.start_coverage_reporter() + # 0.9.0: coverage reporter removed. Coverage is now derived + # server-side from llm_call span metadata (host + tracked + + # streaming_skipped flags). No 60s daemon thread, no per-process + # counter dicts. return runtime @@ -448,6 +484,15 @@ def __dir__() -> list[str]: "track_llm", "track_tool", "track_event", + # Audit 2026-06-29 (WS graceful close on exit): the user-facing + # top-level ``shutdown()`` sends a clean WS close frame and + # drains in-flight events. Without it, a long-running script + # that exits via ``sys.exit()`` lets the kernel RST the TCP + # socket → backend logs WARN "Connection reset without closing + # handshake". Calling ``nullrun.shutdown()`` before + # ``sys.exit(0)`` (or in an ``atexit`` handler) eliminates the + # noisy log. No-op if init() was never called. + "shutdown", # Layer 2: global on_error hook. Eager because it is the # single most important "give the user a chance" API — the # user has to know it exists to call it. diff --git a/src/nullrun/instrumentation/auto.py b/src/nullrun/instrumentation/auto.py index 45e8bcb..698fba9 100644 --- a/src/nullrun/instrumentation/auto.py +++ b/src/nullrun/instrumentation/auto.py @@ -634,9 +634,12 @@ def handle_request(self, request: httpx.Request) -> httpx.Response: # rebuild path only. body = _read_body_with_cap(response, MAX_RESPONSE_BYTES) if body is None: - # Body exceeded the cap. Drain it (so callers don't - # see a half-consumed response) but don't track. - _safe_bump_coverage(self._runtime, "_coverage_streaming_skipped", host) + # Body exceeded the cap. 0.9.0: still emit an + # llm_call event so the call counts toward coverage + # (host known, model best-effort, tracked: false + # because usage wasn't extractable). Drain the body + # so callers don't see a half-consumed response. + _emit_streaming_skipped(self._runtime, request, host) logger.debug( "NullRun transport: response from %s exceeded %d bytes; " "skipping usage tracking", @@ -709,16 +712,6 @@ def _emit( body: bytes, status: int, ) -> None: - # P2-1 (plan §10): bump the coverage counter so the dashboard - # can see which LLM hosts the agent is talking to. Pre-fix - # this counter was only incremented in the ``requests`` path - # (auto_requests.py:185). The httpx path is the dominant - # one (every OpenAI / Anthropic / Gemini / Mistral / Cohere - # call goes through httpx), so without this bump the - # ``coverage_seen`` view in the dashboard would be empty for - # the majority of customers. - _safe_bump_coverage(self._runtime, "_coverage_seen", host) - # 2026-06-28 (Issue 2 fix): if the extractor returned ``None`` # for ``model`` (response body lacked the field — observed for # some OpenAI Responses-API and Anthropic streaming edge cases), @@ -738,6 +731,14 @@ def _emit( or _extract_model_from_request_body(request) ) + # 0.9.0: every successful llm_call span carries + # `metadata.tracked: True`. The backend's coverage query + # (backend/src/coverage/mod.rs) computes tracked_pct from + # this flag — it replaces the old `_coverage_seen` / + # `_coverage_tracked` per-host dicts. Usage was extracted + # successfully, so the SDK's `_match_extractor` identified + # a known provider. See plan at + # `~/.claude/plans/async-swinging-hanrahan.md`. try: # Phase 4.1: lift cache / reasoning / finish / tool names # out of raw_usage onto the event itself. The backend's @@ -760,6 +761,9 @@ def _emit( "finish_reason": usage.get("finish_reason"), "tool_names": usage.get("tool_names") or [], "has_usage": True, + "metadata": { + "tracked": True, + }, # Stripped at the wire boundary by _WIRE_STRIP_FIELDS # in runtime.py — kept here only so the in-process # dedup layer can see the full vendor payload. @@ -805,7 +809,10 @@ async def handle_async_request(self, request: httpx.Request) -> httpx.Response: # P0-3: bounded read (see sync path for full rationale). body = await _aread_body_with_cap(response, MAX_RESPONSE_BYTES) if body is None: - _safe_bump_coverage(self._runtime, "_coverage_streaming_skipped", host) + # 0.9.0: emit llm_call with metadata.streaming_skipped: true + # so the call counts toward coverage (host known, + # tracked: false because usage wasn't extractable). + _emit_streaming_skipped(self._runtime, request, host) logger.debug( "NullRun transport: async response from %s exceeded %d bytes; " "skipping usage tracking", @@ -873,10 +880,11 @@ def _emit( body: bytes, status: int, ) -> None: - # P2-1 (plan §10): mirror the sync path — bump the coverage - # counter so the dashboard's ``coverage_seen`` view shows - # httpx-path traffic (the dominant path). - _safe_bump_coverage(self._runtime, "_coverage_seen", host) + # 0.9.0: emit llm_call with metadata.tracked: True (sync + # path is identical). Async path doesn't have the request- + # body model fallback yet (sync path's + # `_extract_model_from_request_body` is sync-only); leave + # model as the response-body value or None. try: # Phase 4.1: see sync _emit for rationale. Async path # uses identical event shape so the dedup key space @@ -896,6 +904,9 @@ def _emit( "finish_reason": usage.get("finish_reason"), "tool_names": usage.get("tool_names") or [], "has_usage": True, + "metadata": { + "tracked": True, + }, "raw_usage": usage, "_fingerprint": _fingerprint_for(host, body, status), } @@ -992,6 +1003,14 @@ def _fingerprint_for_event_dict(event: dict[str, Any]) -> str: # first runtime — silently losing track() calls from later test runs. _orig_sync_init: Callable[..., Any] | None = None _orig_async_init: Callable[..., Any] | None = None +# Audit 2026-06-29 (reset_for_tests gap): stash the originals of the +# class methods we wrap so reset_for_tests can put them back. Without +# this, a second test pass with `_langchain_patched = False` would +# double-wrap `BaseCallbackManager.__init__`, and similarly for +# `agents.Runner.run` / `Runner.run_sync`. +_orig_base_callback_manager_init: Callable[..., Any] | None = None +_orig_runner_run: Callable[..., Any] | None = None +_orig_runner_run_sync: Callable[..., Any] | None = None def patch_httpx(runtime: Any) -> bool: @@ -1165,6 +1184,12 @@ def patch_langchain_callback(runtime: Any) -> bool: return True _orig_init = BaseCallbackManager.__init__ + # Audit 2026-06-29 (reset_for_tests gap): stash the original + # on a module-level so reset_for_tests can put it back. + # Without this, a second test pass with `_langchain_patched + # = False` would double-wrap. + global _orig_base_callback_manager_init + _orig_base_callback_manager_init = _orig_init def _wrap_init(self: Any, *args: Any, **kwargs: Any) -> None: _orig_init(self, *args, **kwargs) @@ -1190,6 +1215,146 @@ def _wrap_init(self: Any, *args: Any, **kwargs: Any) -> None: return True +# --------------------------------------------------------------------------- +# D4b: patch_chat_model_invoke — defensive callback injection at the LLM +# boundary. +# --------------------------------------------------------------------------- +# Audit 2026-06-29 (silent zero-billing): the previous +# ``patch_langchain_callback`` only wrapped ``BaseCallbackManager.__init__``. +# When the user instantiates ``ChatOpenAI(...)`` *before* ``nullrun.init`` +# (a common pattern — see SDK examples), the ``ChatOpenAI`` object keeps +# no callback manager attached. The patched ``__init__`` runs only when +# a *new* ``BaseCallbackManager`` is constructed inside +# ``BaseChatModel.invoke`` / ``Runnable.invoke``. If the LangGraph path +# goes through a different construction sequence (e.g. caching, +# alternative transports, in-memory mock providers) the new manager +# might be bypassed and ``on_llm_end`` never fires. +# +# To make the wiring robust we also wrap ``BaseChatModel.invoke`` / +# ``BaseChatModel.ainvoke`` so a ``NullRunCallback`` is always present +# in the per-call handlers list. This is belt-and-suspenders over the +# ``BaseCallbackManager.__init__`` patch — if the manager constructor +# runs, the handler is added; if the manager constructor is somehow +# bypassed, the invoke wrapper still attaches it. +# +# Idempotent. Returns False if ``langchain-core`` is not installed. + +_chat_model_invoke_patched = False +_orig_chat_model_invoke: Callable[..., Any] | None = None +_orig_chat_model_ainvoke: Callable[..., Any] | None = None +_orig_chat_model_stream: Callable[..., Any] | None = None +_orig_chat_model_astream: Callable[..., Any] | None = None + + +def patch_chat_model_invoke(runtime: Any) -> bool: + """Inject ``NullRunCallback`` at the ``BaseChatModel.invoke`` / + ``ainvoke`` boundary as a defensive complement to + ``patch_langchain_callback``. + + See the audit block above for why both layers are needed. We never + *replace* user-supplied callbacks — the wrapped closure appends + our handler only when the user hasn't already supplied a + ``NullRunCallback``. + """ + global _chat_model_invoke_patched + with _langchain_lock: + if _chat_model_invoke_patched: + return True + try: + from langchain_core.language_models import BaseChatModel + except ImportError: + logger.debug("langchain-core not installed; chat-model invoke patch skipped") + return False + + if getattr(BaseChatModel, "_nullrun_invoke_patched", False): + _chat_model_invoke_patched = True + return True + + def _ensure_cb(handlers: Any) -> list[Any]: + """Append a NullRunCallback to a handler list if absent.""" + if handlers is None: + handlers = [] + try: + if any(isinstance(h, NullRunCallback) for h in handlers): + return list(handlers) + except TypeError: + return list(handlers) if handlers else [] + return list(handlers) + [NullRunCallback(runtime=runtime)] + + _orig_invoke = BaseChatModel.invoke + _orig_ainvoke = BaseChatModel.ainvoke + _orig_stream = BaseChatModel.stream + _orig_astream = BaseChatModel.astream + + def _wrap_invoke(self: Any, input: Any, config: Any = None, **kwargs: Any) -> Any: + new_config = _inject_handler_into_config(config, _ensure_cb) + return _orig_invoke(self, input, new_config, **kwargs) + + async def _wrap_ainvoke( + self: Any, input: Any, config: Any = None, **kwargs: Any + ) -> Any: + new_config = _inject_handler_into_config(config, _ensure_cb) + return await _orig_ainvoke(self, input, new_config, **kwargs) + + def _wrap_stream( + self: Any, input: Any, config: Any = None, **kwargs: Any + ) -> Any: + new_config = _inject_handler_into_config(config, _ensure_cb) + return _orig_stream(self, input, new_config, **kwargs) + + def _wrap_astream( + self: Any, input: Any, config: Any = None, **kwargs: Any + ) -> Any: + new_config = _inject_handler_into_config(config, _ensure_cb) + return _orig_astream(self, input, new_config, **kwargs) + + global _orig_chat_model_invoke, _orig_chat_model_ainvoke + _orig_chat_model_invoke = _orig_invoke + _orig_chat_model_ainvoke = _orig_ainvoke + global _orig_chat_model_stream, _orig_chat_model_astream + _orig_chat_model_stream = _orig_stream + _orig_chat_model_astream = _orig_astream + + BaseChatModel.invoke = _wrap_invoke # type: ignore[method-assign] + BaseChatModel.ainvoke = _wrap_ainvoke # type: ignore[method-assign] + BaseChatModel.stream = _wrap_stream # type: ignore[method-assign] + BaseChatModel.astream = _wrap_astream # type: ignore[method-assign] + BaseChatModel._nullrun_invoke_patched = True # type: ignore[attr-defined] + _chat_model_invoke_patched = True + logger.info( + "LangChain BaseChatModel.invoke/ainvoke/stream/astream defensive callback injection installed" + ) + return True + + +def _inject_handler_into_config(config: Any, ensure: Callable[[Any], list[Any]]) -> Any: + """Helper: ensure ``config["callbacks"]`` (or the + ``configurable``/``callbacks`` kwarg) carries our handler. Returns + the original config untouched when we can't safely mutate it + (e.g. ``config`` is a frozen mapping). + + The user may pass either: + - a dict like ``{"callbacks": [...]}`` (standard Runnable path) + - a RunnableConfig built from ``ConfigurableFieldSpec`` etc. + - ``None`` (we synthesise a fresh dict). + """ + if config is None: + return {"callbacks": ensure(None)} + if not isinstance(config, dict): + return config + # `callbacks` is the canonical key on RunnableConfig. Some + # wrappers use `callback_manager` — we honour both. We never + # *replace* a user-supplied callback manager; we only ensure + # the handler list contains our NullRunCallback. + if "callbacks" in config: + config = dict(config) + config["callbacks"] = ensure(config["callbacks"]) + return config + config = dict(config) + config["callbacks"] = ensure(None) + return config + + # --------------------------------------------------------------------------- # D5: patch_openai_agents — OpenAI Agents SDK tracer # --------------------------------------------------------------------------- @@ -1219,6 +1384,13 @@ def patch_openai_agents(runtime: Any) -> bool: _orig_run = Runner.run _orig_run_sync = getattr(Runner, "run_sync", None) + # Audit 2026-06-29 (reset_for_tests gap): stash originals so + # reset_for_tests can restore them. Without this, a second + # test pass with `_agents_patched = False` would double-wrap + # Runner.run / Runner.run_sync. + global _orig_runner_run, _orig_runner_run_sync + _orig_runner_run = _orig_run + _orig_runner_run_sync = _orig_run_sync def _wrap_run(*args: Any, **kwargs: Any) -> Any: result = _orig_run(*args, **kwargs) @@ -1472,10 +1644,10 @@ def auto_instrument(runtime: Any) -> bool: with _auto_lock: if _auto_installed: return True - # Lazy imports — auto_requests needs `_safe_bump_coverage` (now - # defined in this module) at module import time. The framework - # patches below are silent no-ops when their respective - # packages aren't installed. + # Lazy imports — auto_requests and the framework patches below + # are silent no-ops when their respective packages aren't + # installed. Each sub-importer handles its own missing-dep + # case. from nullrun.instrumentation._safe_patch import safe_patch from nullrun.instrumentation.auto_requests import patch_requests from nullrun.instrumentation.autogen import patch_autogen @@ -1485,6 +1657,15 @@ def auto_instrument(runtime: Any) -> bool: paths = [ safe_patch("httpx", lambda: patch_httpx(runtime)), safe_patch("langchain_callback", lambda: patch_langchain_callback(runtime)), + # D4b (2026-06-29): belt-and-suspenders callback injection at + # the BaseChatModel.invoke boundary. Ensures NullRunCallback + # fires even when the user creates the LLM BEFORE init() and + # the BaseCallbackManager.__init__ patch is somehow bypassed + # (LangGraph node-internal calls, cached config paths, etc.). + safe_patch( + "chat_model_invoke", + lambda: patch_chat_model_invoke(runtime), + ), safe_patch("openai_agents", lambda: patch_openai_agents(runtime)), safe_patch("langgraph_compiled", lambda: patch_langgraph_compiled(runtime)), safe_patch("requests", lambda: patch_requests(runtime)), @@ -1529,6 +1710,10 @@ def reset_for_tests() -> None: global _orig_sync_init, _orig_async_init global _orig_pregel_invoke, _orig_pregel_stream global _orig_pregel_ainvoke, _orig_pregel_astream + global _orig_chat_model_invoke, _orig_chat_model_ainvoke + global _orig_chat_model_stream, _orig_chat_model_astream + global _orig_base_callback_manager_init + global _orig_runner_run, _orig_runner_run_sync _auto_installed = False _httpx_patched = False _langchain_patched = False @@ -1562,6 +1747,50 @@ def reset_for_tests() -> None: _orig_pregel_stream = None _orig_pregel_ainvoke = None _orig_pregel_astream = None + # D4b (2026-06-29): restore BaseChatModel.invoke/ainvoke/stream/astream + # if we patched them, otherwise the next test pass would double-wrap. + if _orig_chat_model_invoke is not None: + try: + from langchain_core.language_models import BaseChatModel + BaseChatModel.invoke = _orig_chat_model_invoke # type: ignore[method-assign] + BaseChatModel.ainvoke = _orig_chat_model_ainvoke # type: ignore[method-assign] + BaseChatModel.stream = _orig_chat_model_stream # type: ignore[method-assign] + BaseChatModel.astream = _orig_chat_model_astream # type: ignore[method-assign] + BaseChatModel._nullrun_invoke_patched = False # type: ignore[attr-defined] + except Exception as e: # pragma: no cover — defensive + logger.debug("reset_for_tests: failed to restore BaseChatModel: %s", e) + _orig_chat_model_invoke = None + _orig_chat_model_ainvoke = None + _orig_chat_model_stream = None + _orig_chat_model_astream = None + global _chat_model_invoke_patched + _chat_model_invoke_patched = False + # Audit 2026-06-29 (reset_for_tests gap): pre-fix the function + # reset the *_patched flag for langchain_callback and openai_agents + # but did NOT restore the wrapped class methods. A second test + # pass with `auto._langchain_patched = False; auto_instrument(r)` + # would then double-wrap BaseCallbackManager.__init__ / + # Runner.run / Runner.run_sync. Fix: restore them here so a + # repeat pass gets a clean wrap. + if _orig_base_callback_manager_init is not None: + try: + from langchain_core.callbacks import BaseCallbackManager + BaseCallbackManager.__init__ = _orig_base_callback_manager_init # type: ignore[method-assign] + BaseCallbackManager._nullrun_patched = False # type: ignore[attr-defined] + except Exception as e: # pragma: no cover — defensive + logger.debug("reset_for_tests: failed to restore BaseCallbackManager: %s", e) + _orig_base_callback_manager_init = None + if _orig_runner_run is not None: + try: + from agents import Runner + Runner.run = _orig_runner_run # type: ignore[method-assign] + if _orig_runner_run_sync is not None: + Runner.run_sync = _orig_runner_run_sync # type: ignore[method-assign] + Runner._nullrun_patched = False # type: ignore[attr-defined] + except Exception as e: # pragma: no cover — defensive + logger.debug("reset_for_tests: failed to restore Runner: %s", e) + _orig_runner_run = None + _orig_runner_run_sync = None # --------------------------------------------------------------------------- @@ -1674,27 +1903,41 @@ def _fingerprint_is_seen(state: OrderedDict[str, None], fp: str) -> bool: return False -def _safe_bump_coverage(runtime: Any, target_attr: str, host: str) -> None: - """Bump a per-host counter on the runtime, tolerating stub runtimes - (MagicMock, custom test doubles) that don't carry the attribute. - - ``target_attr`` is one of ``_coverage_seen``, - ``_coverage_streaming_skipped``. Mirrors the structure of - ``_fingerprint_is_seen`` — never raises. - - Background: ``nullrun.instrumentation.auto_requests`` imports this - helper but the original 0.3.0 release never defined it, so the - entire ``requests`` auto-instrumentation path was unimportable. - Adding the helper here unblocks the module and the dashboard's - coverage tab. +def _emit_streaming_skipped( + runtime: Any, + request: httpx.Request, + host: str, +) -> None: + """Emit an llm_call event for a response where the body exceeded + the tracking cap and usage data could not be extracted. + + 0.9.0: replaces the old `_safe_bump_coverage(..., + "_coverage_streaming_skipped", host)` counter bump. The event + carries `metadata.streaming_skipped: True` and `metadata.tracked: + False` (extractor did not run because the body was never read) + so the backend's coverage query still counts it toward the + `llm_call_count` denominator while flagging it as not-tracked. + + `model` falls back to the request body via + `_extract_model_from_request_body` (sync-only, mirrors + `_emit`'s pattern at lines 735-739). """ - target = getattr(runtime, target_attr, None) - if target is None: - return - if isinstance(target, dict): - target[host] = int(target.get(host, 0)) + 1 - else: - try: - target[host] = int(target[host]) + 1 - except Exception as e: # pragma: no cover — defensive - logger.debug("_safe_bump_coverage: %s bump failed: %s", target_attr, e) + try: + runtime.track( + { + "type": "llm_call", + "provider": _provider_label(host), + "host": host, + "model": _extract_model_from_request_body(request), + "tokens": 0, + "input_tokens": 0, + "output_tokens": 0, + "has_usage": False, + "metadata": { + "tracked": False, + "streaming_skipped": True, + }, + } + ) + except Exception as e: # pragma: no cover — defensive + logger.debug("NullRun transport: streaming-skipped track failed: %s", e) diff --git a/src/nullrun/instrumentation/auto_requests.py b/src/nullrun/instrumentation/auto_requests.py index b1a754c..13c6cc2 100644 --- a/src/nullrun/instrumentation/auto_requests.py +++ b/src/nullrun/instrumentation/auto_requests.py @@ -14,15 +14,13 @@ - `_match_extractor(host)` — exact + subdomain match - `_provider_label(host)` — short label for the `provider` event field - `_fingerprint_for(host, body, status)` — dedup fingerprint -- `_safe_bump_coverage(runtime, target_attr, host)` — bounded counter - bump that tolerates stub runtimes (MagicMock, custom test doubles) What this module owns: - `patch_requests(runtime)` — wraps `requests.Session.send` so every call routed through a session is observed. Idempotent. - Streaming handling: `requests.get(url, stream=True)` and - `Accept: text/event-stream` are skipped with a `streaming-skipped` - coverage marker. We do NOT buffer the response — that would break + `Accept: text/event-stream` emit a `metadata.streaming_skipped: true` + llm_call event. We do NOT buffer the response — that would break user-facing streaming (the caller reads `iter_content`/`iter_lines` chunk-by-chunk). The known limit is documented in `docs/known-limitations.md`. @@ -31,6 +29,12 @@ `urllib3` patch (which `requests` uses under the hood) can skip already-tracked requests. See plan section P2 / "requests ↔ urllib3". +0.9.0: counter-bump helpers (`_safe_bump_coverage`, +`_bump_streaming_skipped`) are gone — coverage is now derived from +llm_call span metadata. Each emit site tags `metadata.tracked: bool` +and `metadata.streaming_skipped: bool` so the backend can compute +coverage_pct from `spans.metadata` directly. + `aiohttp` is deliberately out of scope for this phase — see `docs/known-limitations.md` and the plan's open questions. """ @@ -45,7 +49,6 @@ _fingerprint_for, _match_extractor, _provider_label, - _safe_bump_coverage, ) logger = logging.getLogger(__name__) @@ -77,24 +80,6 @@ def _is_streaming_request(request: Any, send_kwargs: dict[str, Any]) -> bool: return any(ct in accept for ct in _STREAMING_CONTENT_TYPES) -def _bump_streaming_skipped(runtime: Any, host: str) -> None: - """Phase P2: bump a `streaming-skipped` counter so the dashboard - surfaces *known* untracked hosts (vs. just "seen but unknown - extractor"). Mirrors the structure of `_safe_bump_coverage` to - tolerate stub runtimes. - """ - target = getattr(runtime, "_coverage_streaming_skipped", None) - if target is None: - return - bump = getattr(runtime, "_bump_coverage_counter", None) - if bump is None: - return - try: - bump(target, host) - except Exception as e: # pragma: no cover — defensive - logger.debug("NullRun streaming-skipped bump failed: %s", e) - - def _emit_to_runtime( runtime: Any, request: Any, @@ -107,8 +92,10 @@ def _emit_to_runtime( transport. Kept in this module (rather than re-exported from `auto.py`) so the requests path is self-contained and the `requests` dep is not pulled into `auto.py`'s import graph. + + 0.9.0: emits `metadata.tracked: True` — usage was extracted so + the SDK's `_match_extractor` identified a known provider. """ - _safe_bump_coverage(runtime, "_coverage_tracked", host) try: runtime.track( { @@ -120,6 +107,9 @@ def _emit_to_runtime( "input_tokens": usage.get("prompt_tokens", 0), "output_tokens": usage.get("completion_tokens", 0), "has_usage": True, + "metadata": { + "tracked": True, + }, "raw_usage": usage, "_fingerprint": _fingerprint_for(host, body, status), } @@ -128,6 +118,49 @@ def _emit_to_runtime( logger.debug("NullRun requests transport: track failed: %s", e) +def _emit_streaming_skipped_to_runtime( + runtime: Any, + request: Any, + host: str, +) -> None: + """0.9.0: emit an llm_call event for a streamed response that + we deliberately did NOT buffer (so the user keeps their chunked + read). Tags `metadata.streaming_skipped: True` and + `metadata.tracked: False` (extractor never ran because the body + was never read) — backend counts it toward `llm_call_count` but + not toward `tracked_call_count`. + + Model is best-effort from the request body (sync path; mirrors + `auto._emit_streaming_skipped`). + """ + try: + from nullrun.instrumentation.auto import ( + _extract_model_from_request_body, + ) + model = _extract_model_from_request_body(request) + except Exception: # pragma: no cover — defensive + model = None + try: + runtime.track( + { + "type": "llm_call", + "provider": _provider_label(host), + "host": host, + "model": model, + "tokens": 0, + "input_tokens": 0, + "output_tokens": 0, + "has_usage": False, + "metadata": { + "tracked": False, + "streaming_skipped": True, + }, + } + ) + except Exception as e: + logger.debug("NullRun requests transport: streaming-skipped track failed: %s", e) + + _requests_patched = False _requests_lock = threading.Lock() _orig_session_send: Any = None @@ -179,17 +212,13 @@ def _wrapped_send(self: Any, request: Any, **kwargs: Any) -> Any: host = urllib.parse.urlparse(url).hostname or "" - # Phase 1.1: bump seen-counter for *every* host, including - # ones we don't have an extractor for. Same pattern as - # the httpx transport. - _safe_bump_coverage(runtime, "_coverage_seen", host) - # Streaming skip: do NOT read `response.content` here — # that would buffer the entire stream and break the - # caller's chunked consumption. Mark as `streaming-skipped` - # so the dashboard can show "known but untracked". + # caller's chunked consumption. Emit an llm_call event + # tagged `metadata.streaming_skipped: true` so the + # backend can still see the call in coverage. if _is_streaming_request(request, kwargs): - _bump_streaming_skipped(runtime, host) + _emit_streaming_skipped_to_runtime(runtime, request, host) return _orig_session_send(self, request, **kwargs) extractor = _match_extractor(host) @@ -254,4 +283,4 @@ def reset_for_tests() -> None: Session._nullrun_patched = False # type: ignore[attr-defined] except Exception as e: # pragma: no cover — defensive logger.debug("reset_for_tests: failed to restore Session: %s", e) - _orig_session_send = None + _orig_session_send = None \ No newline at end of file diff --git a/src/nullrun/instrumentation/langgraph.py b/src/nullrun/instrumentation/langgraph.py index 69c0b41..a9c58c8 100644 --- a/src/nullrun/instrumentation/langgraph.py +++ b/src/nullrun/instrumentation/langgraph.py @@ -507,6 +507,12 @@ def on_llm_end(self, response: Any, **kwargs: Any) -> None: # Phase 4.1: lift cache / reasoning / finish / tool names out # of raw_usage onto the event itself, mirroring the httpx # transport shape so the dedup key space stays unified. + # 0.9.0: tag metadata.tracked based on whether the model + # extraction produced a real value (not the literal + # "unknown" fallback). The backend's coverage query + # (backend/src/coverage/mod.rs) reads this flag to + # compute tracked_pct — see plan at + # `~/.claude/plans/async-swinging-hanrahan.md`. event = { "type": "llm_call", "model": model, @@ -521,6 +527,13 @@ def on_llm_end(self, response: Any, **kwargs: Any) -> None: "tool_names": usage.get("tool_names") or [], # Flag to backend: this is raw usage, compute cost yourself "has_usage": usage["has_usage"], + "metadata": { + # Tracked iff we extracted a real model name — the + # `usage["has_usage"]` flag is independent (a 4xx + # response with empty body still yields has_usage + # = False but we DID see the response). + "tracked": model != "unknown", + }, # Stripped at the wire boundary by _WIRE_STRIP_FIELDS — # kept here for in-process dedup + test introspection. "raw_usage": usage["raw_usage"], @@ -813,13 +826,68 @@ def _extract_model_from_response(response: Any) -> str | None: # event; this DEBUG line is for the per-call site so the # operator can correlate the wire warning back to a specific # response shape. + # + # Audit 2026-06-29 (silent zero-billing): the previous version + # emitted a single DEBUG line with only the response type. That + # was insufficient when the operator needed to see *which* of + # the four fallback steps almost-but-didn't match. We now dump + # the available keys on every relevant shape so a single + # logcat-level filter surfaces the root cause: + # - `response.llm_output` keys + # - `response.response_metadata` keys + # - `gen_msg.response_metadata` keys (LLMResult callback path) + # - direct attrs `response.model_name` / `response.model` + # All four dumps are guarded so a missing attribute is silent. try: response_type = type(response).__name__ except Exception: response_type = "" + + llm_out_keys: list[str] = [] + if isinstance(llm_out, dict): + try: + llm_out_keys = sorted(str(k) for k in llm_out.keys()) + except Exception: + llm_out_keys = [""] + resp_meta_keys: list[str] = [] + if isinstance(resp_meta, dict): + try: + resp_meta_keys = sorted(str(k) for k in resp_meta.keys()) + except Exception: + resp_meta_keys = [""] + gen_msg_meta_keys: list[str] = [] + direct_attrs: dict[str, str] = {} + if gen_msg is not None: + try: + gm = getattr(gen_msg, "response_metadata", None) + if isinstance(gm, dict): + gen_msg_meta_keys = sorted(str(k) for k in gm.keys()) + except Exception: + pass + for attr in ("model_name", "model"): + try: + v = getattr(gen_msg, attr, None) + if v is not None: + direct_attrs[attr] = repr(v)[:64] + except Exception: + pass + for attr in ("model_name", "model"): + try: + v = getattr(response, attr, None) + if v is not None: + direct_attrs[attr] = repr(v)[:64] + except Exception: + pass + logger.debug( - "_extract_model_from_response returned None for response of type %s", + "_extract_model_from_response returned None for response of type %s — " + "llm_output_keys=%s response_metadata_keys=%s gen_msg_metadata_keys=%s " + "direct_attrs=%s", response_type, + llm_out_keys, + resp_meta_keys, + gen_msg_meta_keys, + direct_attrs, ) return None diff --git a/src/nullrun/runtime.py b/src/nullrun/runtime.py index 63baa65..592741a 100644 --- a/src/nullrun/runtime.py +++ b/src/nullrun/runtime.py @@ -262,18 +262,10 @@ def __init__( # first call. self._local_cost_cents_estimate: int = 0 - # Coverage counters (Phase 3 of the production-readiness plan). - # The instrumentation layer in `nullrun.instrumentation.auto` - # calls ``_safe_bump_coverage(runtime, "_coverage_seen" / - # "_coverage_tracked" / "_coverage_streaming_skipped", host)`` - # so the dashboard can show "which LLM hosts the SDK is - # seeing vs. successfully tracking". Previous versions - # relied on ``_safe_bump_coverage`` to no-op when these - # attributes were missing -- the dashboard's coverage tab - # was always empty. - self._coverage_seen: dict[str, int] = {} - self._coverage_tracked: dict[str, int] = {} - self._coverage_streaming_skipped: dict[str, int] = {} + # 0.9.0: coverage counters removed. Coverage is now derived + # server-side from the llm_call span metadata (`tracked` and + # `streaming_skipped` flags set by the instrumentation layer). + # The previous per-host dicts and 60s daemon thread are gone. # Remote control plane state (per-workflow, pushed from server via WS). # Unified model: effective_state = max(local_state, remote_state). @@ -399,21 +391,7 @@ def __init__( # a concurrent ``add`` if both interleave on a free-threaded # build). The lock is uncontended on the read path so the # cost is one acquire per call. - # - # We also reuse this lock to guard the coverage-counter - # dicts (§7.2 #33) because the bump + prune sequence must - # be atomic — otherwise two threads could both observe the - # dict at length 4095, both bump their counter, and both - # evict a different entry, growing the dict to 4097 - # before either prune lands. One lock, one source of - # truth, cheaper than two fine-grained ones. self._tools_lock = threading.Lock() - # §7.2 #33: cap the per-host coverage counters. Without - # this, a long-running process that sees thousands of - # custom LLM endpoints over its lifetime would grow these - # dicts without bound — same hazard as - # ``NullRunCallback._active_runs`` (now capped at 4096). - self._COVERAGE_CAP: int = 4096 logger.info("NullRun Runtime initialized: mode=cloud") @@ -1529,186 +1507,6 @@ def is_sensitive_tool(self, tool_name: str) -> bool: t.lower() for t in self._strict_mode_tools } - def coverage_report(self) -> dict[str, dict[str, int]]: - """ - Snapshot of the LLM-host coverage counters that the auto- - instrumentation layer maintains. The SDK tracks three - counters per host: - - - ``seen`` -- every LLM host the SDK observed a request to. - - ``tracked`` -- hosts whose response was successfully - extracted and emitted as an ``llm_call`` event. - - ``streaming_skipped`` -- hosts whose response was a - streaming SSE / ``stream=True`` and was deliberately - NOT buffered (so the user keeps their chunked read). - - The same payload is sent over the WebSocket heartbeat every - 60s and via the HTTP-fallback path when the WS connection - is down. The dashboard's coverage tab uses these counters - to surface "we know about this host but cannot track it" -- - the leading indicator that an SDK upgrade is needed. - - Returns: - ``{"seen": {...}, "tracked": {...}, - "streaming_skipped": {...}}``. Each value is a fresh - ``dict`` so callers can mutate the result without - affecting the runtime's internal state. - """ - return { - "seen": dict(self._coverage_seen), - "tracked": dict(self._coverage_tracked), - "streaming_skipped": dict(self._coverage_streaming_skipped), - } - - def track_coverage(self) -> dict[str, Any] | None: - """Emit a `coverage_report` event with the current per-host counters. - - Returned from ``track_event`` so the caller can observe the - transport-side outcome (queued, deduped, breaker open, etc.). - Returns ``None`` when there are no counters to report yet - (cold start, no LLM traffic) — the backend doesn't need an - empty row per minute per process. - - Background emission is driven by ``start_coverage_reporter``; - most callers don't invoke this method directly. - - Wire shape (2026-06-28 fix): - The per-host counter dicts live under ``metadata`` so the - backend's batch handler (backend/src/proxy/handlers.rs:5909 - -5923) reads them from ``sdk_event.metadata``. The handler - was wired to that location when the SDK moved from the - deprecated ``POST /api/v1/coverage`` endpoint onto the - shared ``/api/v1/track/batch`` pipeline — placing the - counters at the event top level silently dropped them (the - ``SdkTrackRequest`` struct uses explicit fields, no - ``#[serde(flatten)]`` catchall, so unknown top-level keys - are discarded by serde). Nesting under ``metadata`` matches - what the handler reads. - """ - stats = self.coverage_report() - seen_total = sum(stats["seen"].values()) - if seen_total == 0: - # Nothing to report — avoid empty rows. - return None - return self.track_event( - "coverage_report", - metadata={ - "seen": stats["seen"], - "tracked": stats["tracked"], - "streaming_skipped": stats["streaming_skipped"], - }, - ) - - _COVERAGE_REPORT_INTERVAL_SECONDS = 60.0 - - def start_coverage_reporter(self) -> None: - """Start a background thread that emits ``coverage_report`` events - every ``_COVERAGE_REPORT_INTERVAL_SECONDS``. - - Idempotent — second call is a no-op. Caller is responsible - for calling :meth:`stop_coverage_reporter` on shutdown, but - the thread is a daemon so a missed stop does not block exit. - """ - if getattr(self, "_coverage_reporter_thread", None) is not None: - return - thread = threading.Thread( - target=self._coverage_reporter_loop, - name="nullrun-coverage-reporter", - daemon=True, - ) - self._coverage_reporter_thread = thread - thread.start() - - def stop_coverage_reporter(self, timeout: float = 2.0) -> None: - """Signal the coverage reporter to exit and join its thread.""" - self._coverage_reporter_stop = True - thread = getattr(self, "_coverage_reporter_thread", None) - if thread is not None: - thread.join(timeout=timeout) - - def _coverage_reporter_loop(self) -> None: - """Loop body for the coverage reporter thread. - - Emits a coverage report on entry (so the dashboard has data - within ~1s of process start), then every interval until - ``stop_coverage_reporter`` is called. - """ - self._coverage_reporter_stop = False - # Emit once on entry — gives the backend a row even if the - # process is short-lived (CI, batch jobs). - try: - self.track_coverage() - except Exception as e: # noqa: BLE001 — background loop - logger.debug(f"coverage_reporter: initial emit failed: {e}") - while not getattr(self, "_coverage_reporter_stop", False): - # Sleep in short slices so shutdown is responsive. - slept = 0.0 - while slept < self._COVERAGE_REPORT_INTERVAL_SECONDS and not getattr( - self, "_coverage_reporter_stop", False - ): - time.sleep(min(0.5, self._COVERAGE_REPORT_INTERVAL_SECONDS - slept)) - slept += 0.5 - if getattr(self, "_coverage_reporter_stop", False): - break - try: - self.track_coverage() - except Exception as e: # noqa: BLE001 — background loop - logger.debug(f"coverage_reporter: emit failed: {e}") - - def bump_coverage_counter(self, target_attr: str, host: str) -> None: - """Bump a per-host coverage counter with FIFO eviction at the cap. - - §7.2 #33: replaces the previous direct-dict-mutation path - used by ``nullrun.instrumentation.auto._safe_bump_coverage``. - The pre-fix code just did ``target[host] = target.get(host, - 0) + 1``, which let a process with many custom LLM - endpoints grow the dict without bound. We now: - - 1. Take ``_tools_lock`` so concurrent bumps from - multiple threads (sync httpx + async httpx + the - requests transport) can't both pass the cap check - and evict different entries. - 2. If the dict already has the key, increment (LRU - bump via dict insertion order). - 3. If the key is new and we're at the cap, evict the - oldest entry before inserting. - - Tolerates a missing attribute (stub runtimes in tests): - no-op when ``getattr(self, target_attr, None)`` returns - ``None``. Tolerates a non-dict target (also a test-only - scenario): logs DEBUG and moves on. - """ - with self._tools_lock: - target = getattr(self, target_attr, None) - if target is None: - return - if not isinstance(target, dict): - logger.debug( - "bump_coverage_counter: %s is not a dict (%s); skipping", - target_attr, - type(target).__name__, - ) - return - if host in target: - # Insertion-order LRU bump: re-insert so this - # host moves to the end of the dict. - target[host] = int(target.get(host, 0)) + 1 - # Re-set to refresh insertion order (Python dicts - # don't auto-promote on value update). - value = target.pop(host) - target[host] = value - else: - if len(target) >= self._COVERAGE_CAP: - evicted_host, _ = next(iter(target.items())) - del target[evicted_host] - logger.warning( - "coverage counter %s hit cap %d; evicting oldest host=%s", - target_attr, - self._COVERAGE_CAP, - evicted_host, - ) - target[host] = 1 - def get_org_status(self, org_id: str | None = None) -> dict[str, Any]: """Public helper for reading ``/api/v1/orgs/{org_id}/status``. diff --git a/tests/contract/test_llm_call_model_wire.py b/tests/contract/test_llm_call_model_wire.py index 0e737ea..252dce9 100644 --- a/tests/contract/test_llm_call_model_wire.py +++ b/tests/contract/test_llm_call_model_wire.py @@ -319,3 +319,163 @@ def test_patch_httpx_eager_wrap_is_idempotent(): finally: auto.reset_for_tests() pre_existing.close() + + +# ─── patch_chat_model_invoke: init-ordering regression ─────────────── +# +# Audit 2026-06-29 (silent zero-billing): the LangGraph case the +# production trace exposed is +# +# llm = ChatOpenAI(model=...) # before init +# nullrun.init(api_key=...) # patch installed too late +# graph.invoke(input) # llm.invoke() inside the node +# +# `patch_httpx` covers the eager-sweep path (pre-existing +# ``httpx.Client`` instances are wrapped). For LangChain chat models, +# the `BaseCallbackManager.__init__` patch is the original defence. +# ``patch_chat_model_invoke`` is the new belt-and-suspenders layer +# that wraps ``BaseChatModel.invoke`` / ``ainvoke`` directly so a +# ``NullRunCallback`` is present in the per-call config even if the +# callback manager constructor is somehow bypassed. +# +# This regression test pins the wrap so a future refactor can't +# silently drop it. + + +def test_patch_chat_model_invoke_injects_callback_when_llm_pre_exists(): + """Create a fake BaseChatModel BEFORE ``nullrun.init``, then call + ``patch_chat_model_invoke``, then invoke. The wrapped invoke must + inject a ``NullRunCallback`` into the per-call config. + """ + from langchain_core.language_models import BaseChatModel + from langchain_core.messages import AIMessage + from langchain_core.outputs import ChatGeneration, ChatResult + + from nullrun.instrumentation import auto + from nullrun.instrumentation.langgraph import NullRunCallback + + auto.reset_for_tests() + auto._chat_model_invoke_patched = False + + class FakeChatModel(BaseChatModel): + """Minimal BaseChatModel that records the callbacks it saw.""" + + seen_callbacks: list = [] + + @property + def _llm_type(self) -> str: + return "fake" + + def _generate(self, messages, stop=None, run_manager=None, **kwargs): + # Record the callbacks the framework attached (or didn't) + # so the test can assert our wrapper injected one. + seen = getattr(run_manager, "handlers", None) or [] + type(self).seen_callbacks.append(list(seen)) + # Return a properly-shaped ChatResult with an AIMessage so + # the downstream on_llm_end extraction has a generations[0] + # to walk. The model_name is the same string we'd see from + # a real langchain-openai 1.x response. + return ChatResult( + generations=[ + ChatGeneration( + message=AIMessage( + content="ok", + response_metadata={"model_name": "fake-model"}, + ) + ) + ], + llm_output={"model_name": "fake-model"}, + ) + + runtime = MagicMock() + try: + # The user's order: create the LLM before init. + llm = FakeChatModel() + type(llm).seen_callbacks = [] + + ok = auto.patch_chat_model_invoke(runtime) + assert ok is True + + # Invoke the LLM through the wrapped method. The wrap must + # inject a NullRunCallback into config["callbacks"] so the + # internal _generate sees it. + llm.invoke("hello") + + # Assert: at least one NullRunCallback was seen during _generate. + saw_nullrun = any( + any(isinstance(h, NullRunCallback) for h in seen) + for seen in type(llm).seen_callbacks + ) + assert saw_nullrun, ( + "patch_chat_model_invoke did not inject NullRunCallback into " + "the per-call config — the audit fix is broken or missing." + ) + finally: + auto.reset_for_tests() + + +def test_patch_chat_model_invoke_preserves_user_callbacks(): + """If the user already supplied a callback in the config, the + wrap must NOT replace it — only add the NullRunCallback if absent. + """ + from langchain_core.language_models import BaseChatModel + from langchain_core.messages import AIMessage + from langchain_core.outputs import ChatGeneration, ChatResult + from langchain_core.callbacks import BaseCallbackHandler + + from nullrun.instrumentation import auto + from nullrun.instrumentation.langgraph import NullRunCallback + + auto.reset_for_tests() + auto._chat_model_invoke_patched = False + + class UserCallback(BaseCallbackHandler): + """Real BaseCallbackHandler so LangChain's manager doesn't + trip on missing attributes (``ignore_chat_model``, + ``raise_error``, etc.) when it tries to fire the callback.""" + seen_callbacks: list = [] + + def on_chat_model_start(self, *args, **kwargs): + type(self).seen_callbacks.append(("start",)) + + def on_llm_end(self, *args, **kwargs): + type(self).seen_callbacks.append(("end",)) + + class FakeChatModel(BaseChatModel): + seen_callbacks: list = [] + + @property + def _llm_type(self) -> str: + return "fake" + + def _generate(self, messages, stop=None, run_manager=None, **kwargs): + seen = getattr(run_manager, "handlers", None) or [] + type(self).seen_callbacks.append(list(seen)) + return ChatResult( + generations=[ + ChatGeneration(message=AIMessage(content="ok")) + ], + llm_output={"model_name": "fake-model"}, + ) + + runtime = MagicMock() + try: + llm = FakeChatModel() + type(llm).seen_callbacks = [] + ok = auto.patch_chat_model_invoke(runtime) + assert ok is True + + # User already has their own callback in config. + user_cb = UserCallback() + llm.invoke("hello", config={"callbacks": [user_cb]}) + + # The user's callback must still be there, alongside ours. + seen_lists = type(llm).seen_callbacks + assert any(user_cb in seen for seen in seen_lists), ( + "user-supplied callback was lost" + ) + assert any( + any(isinstance(h, NullRunCallback) for h in seen) for seen in seen_lists + ), "NullRunCallback was not injected alongside user callback" + finally: + auto.reset_for_tests() diff --git a/tests/test_auto_requests.py b/tests/test_auto_requests.py index 8056255..535f7c0 100644 --- a/tests/test_auto_requests.py +++ b/tests/test_auto_requests.py @@ -194,16 +194,17 @@ def test_session_send_already_tracked_returns_unchanged(monkeypatch, fresh_patch def test_session_send_streaming_skips_track(monkeypatch, fresh_patch_module): - """``stream=True`` kwarg triggers the streaming skip branch.""" + """0.9.0: ``stream=True`` triggers the streaming branch which + emits an llm_call event tagged `metadata.streaming_skipped: True` + and `metadata.tracked: False`. The call still counts toward + coverage `llm_call_count` (backend's denominator) but not toward + `tracked_call_count`. + """ _install_fake_requests(monkeypatch, streaming=True) recorder = {"track": [], "track_event": []} rt = MagicMock() rt.track.side_effect = lambda ev: recorder["track"].append(ev) rt.track_event.side_effect = lambda **kw: recorder["track_event"].append(kw) - # Pretend the runtime has a coverage counters dict so we can - # observe the streaming-skipped bump. - rt._coverage_streaming_skipped = {} - rt._bump_coverage_counter = MagicMock() from requests import Session @@ -212,14 +213,22 @@ def test_session_send_streaming_skips_track(monkeypatch, fresh_patch_module): assert patch_requests(rt) is True req = SimpleNamespace(url="https://api.openai.com/v1/chat/completions", headers={}) Session().send(req, stream=True) - # Track was NOT called (streaming skip). - assert recorder["track"] == [] - # Streaming-skipped counter was bumped. - assert rt._bump_coverage_counter.called + # Track WAS called (with the streaming-skipped flag) — the new + # behavior replaces the old counter-bump. + assert len(recorder["track"]) == 1 + ev = recorder["track"][0] + assert ev["type"] == "llm_call" + assert ev["host"] == "api.openai.com" + assert ev["has_usage"] is False + assert ev["metadata"]["streaming_skipped"] is True + assert ev["metadata"]["tracked"] is False def test_session_send_accept_event_stream_header_skips_track(monkeypatch, fresh_patch_module): - """``Accept: text/event-stream`` header triggers the streaming skip branch.""" + """0.9.0: ``Accept: text/event-stream`` header triggers the same + streaming branch — emit llm_call tagged + `metadata.streaming_skipped: True`. + """ _install_fake_requests(monkeypatch) recorder = {"track": [], "track_event": []} rt = _fake_runtime(recorder) @@ -233,7 +242,8 @@ def test_session_send_accept_event_stream_header_skips_track(monkeypatch, fresh_ url="https://api.openai.com/v1/chat/completions", headers={"Accept": "text/event-stream"} ) Session().send(req) - assert recorder["track"] == [] + assert len(recorder["track"]) == 1 + assert recorder["track"][0]["metadata"]["streaming_skipped"] is True def test_session_send_no_extractor_for_host_returns_response(monkeypatch, fresh_patch_module): @@ -323,29 +333,10 @@ def test_session_send_track_failure_is_swallowed(monkeypatch, fresh_patch_module assert resp.status_code == 200 -def test_session_send_seen_counter_bumped(monkeypatch, fresh_patch_module): - """Every host bumps the ``_coverage_seen`` counter, including - unknown ones (so the dashboard shows visibility into all - outbound traffic, not just tracked vendors). The bump happens - via ``_safe_bump_coverage`` which mutates the dict directly. - """ - _install_fake_requests(monkeypatch) - rt = MagicMock() - rt.track.side_effect = lambda ev: None - rt.track_event.side_effect = lambda **kw: None - rt._coverage_seen = {} - rt._bump_coverage_counter = MagicMock() - - from requests import Session - - from nullrun.instrumentation.auto_requests import patch_requests - - assert patch_requests(rt) is True - req = SimpleNamespace(url="https://example.com/api", headers={}) - Session().send(req) - # Direct dict mutation: host is now present with count 1. - assert rt._coverage_seen.get("example.com") == 1 - +# 0.9.0: removed `test_session_send_seen_counter_bumped`. The +# `_coverage_seen` per-host counter dict is gone — coverage is +# derived from llm_call span metadata.host. See plan at +# `~/.claude/plans/async-swinging-hanrahan.md`. # ─── reset_for_tests ───────────────────────────────────────────────── @@ -424,36 +415,9 @@ def get(self, *_args, **_kwargs): assert _is_streaming_request(req, {}) is False -def test_bump_streaming_skipped_no_attr(): - """Runtime missing the attribute → silent no-op.""" - from nullrun.instrumentation.auto_requests import _bump_streaming_skipped - - # MagicMock auto-creates attributes, so build a plain object. - class _Runtime: - pass - - rt = _Runtime() # no _coverage_streaming_skipped - _bump_streaming_skipped(rt, "x") # must not raise - - -def test_bump_streaming_skipped_no_bump_method(): - """Runtime missing the bump method → silent no-op.""" - from nullrun.instrumentation.auto_requests import _bump_streaming_skipped - - class _Runtime: - _coverage_streaming_skipped = {} - - rt = _Runtime() # no _bump_coverage_counter - _bump_streaming_skipped(rt, "x") # must not raise - - -def test_bump_streaming_skipped_calls_bump(): - """Happy path: bump is invoked with the target dict and host.""" - from nullrun.instrumentation.auto_requests import _bump_streaming_skipped - - target: dict = {} - rt = MagicMock() - rt._coverage_streaming_skipped = target - rt._bump_coverage_counter = MagicMock() - _bump_streaming_skipped(rt, "api.openai.com") - rt._bump_coverage_counter.assert_called_once_with(target, "api.openai.com") +# 0.9.0: removed three `_bump_streaming_skipped` helper tests. +# The helper is gone — streaming-skipped calls now emit an +# llm_call event tagged `metadata.streaming_skipped: True`. See +# `test_session_send_streaming_skips_track` and +# `test_session_send_accept_event_stream_header_skips_track` above +# for the new behavior assertions. diff --git a/tests/test_blocker_fixes.py b/tests/test_blocker_fixes.py index 320f57e..7a59d71 100644 --- a/tests/test_blocker_fixes.py +++ b/tests/test_blocker_fixes.py @@ -50,30 +50,11 @@ def test_auto_requests_module_importable(): import nullrun.instrumentation.auto_requests # noqa: F401 -def test_safe_bump_coverage_exported(): - """`_safe_bump_coverage` is importable and increments the runtime counter.""" - from nullrun.instrumentation.auto import _safe_bump_coverage - from nullrun.runtime import NullRunRuntime - - runtime = NullRunRuntime(api_key="test", _test_mode=True) - assert runtime._coverage_seen == {} - _safe_bump_coverage(runtime, "_coverage_seen", "api.openai.com") - assert runtime._coverage_seen == {"api.openai.com": 1} - _safe_bump_coverage(runtime, "_coverage_seen", "api.openai.com") - assert runtime._coverage_seen == {"api.openai.com": 2} - _safe_bump_coverage(runtime, "_coverage_seen", "api.anthropic.com") - assert runtime._coverage_seen == {"api.openai.com": 2, "api.anthropic.com": 1} - - -def test_safe_bump_coverage_tolerates_missing_attribute(): - """Stub runtimes (MagicMock, custom doubles) without the attribute don't crash.""" - from nullrun.instrumentation.auto import _safe_bump_coverage - - class StubRuntime: - pass - - # Should not raise. - _safe_bump_coverage(StubRuntime(), "_coverage_seen", "api.openai.com") +# 0.9.0: removed `test_safe_bump_coverage_exported` and +# `test_safe_bump_coverage_tolerates_missing_attribute`. The +# `_safe_bump_coverage` helper is gone — coverage is derived from +# llm_call span metadata. See plan at +# `~/.claude/plans/async-swinging-hanrahan.md`. def test_auto_instrument_patches_requests(): diff --git a/tests/test_coverage_report.py b/tests/test_coverage_report.py deleted file mode 100644 index 32ffb15..0000000 --- a/tests/test_coverage_report.py +++ /dev/null @@ -1,145 +0,0 @@ -""" -tests/test_coverage_report.py — coverage_report event emission. - -The SDK already keeps per-host counters via ``bump_coverage_counter`` -(see §7.2 #33). Pre-fix there was no path to ship those counters -to the backend — ``get_coverage_stats()`` existed but no caller. -This test pins the new ``track_coverage`` / ``start_coverage_reporter`` -contract: - -* ``track_coverage()`` returns ``None`` when no LLM traffic has - been observed (cold start). -* After at least one counter bump, ``track_coverage()`` returns a - track-result dict (the underlying ``track_event`` result). -* The emitted event carries ``type=coverage_report`` plus the - three counter dicts and ``tokens=0`` so the backend's - ``SdkTrackRequest`` deserializer accepts it. -* The counter dicts live under ``metadata`` so the backend's batch - handler (backend/src/proxy/handlers.rs:5909-5923) reads them. - Placed at the event top level, serde deserialization would drop - them (``SdkTrackRequest`` has explicit fields, no flatten - catchall), which is exactly the bug this test guards against. -* ``start_coverage_reporter`` is idempotent and stops cleanly. -""" - -from __future__ import annotations - -import asyncio -import threading -import time - -import pytest - -from nullrun.runtime import NullRunRuntime - - -@pytest.fixture -def runtime(): - r = NullRunRuntime(api_key="test-key-12345678", _test_mode=True) - yield r - r.stop_coverage_reporter() - - -class TestTrackCoverage: - def test_track_coverage_returns_none_when_no_traffic(self, runtime): - # No counter bumps yet → no event. - result = runtime.track_coverage() - assert result is None - - def test_track_coverage_returns_event_after_counter_bump(self, runtime): - runtime.bump_coverage_counter("_coverage_seen", "api.openai.com") - runtime.bump_coverage_counter("_coverage_tracked", "api.openai.com") - runtime.bump_coverage_counter("_coverage_seen", "api.anthropic.com") - - result = runtime.track_coverage() - assert result is not None - # The transport queues the event; the runtime returns the - # dedup/queue result from track_event. - assert "deduped" in result or "accepted" in result or "queued" in result or True - - def test_track_coverage_emits_wire_shape_with_metadata_nesting(self, runtime): - """Pin the wire shape so backend (handlers.rs:5909-5923) can - read the counters from ``event.metadata``. - - Pre-fix this placed the three dicts at the top level of the - event, which serde silently dropped (the batch handler's - ``SdkTrackRequest`` uses explicit fields with no - ``#[serde(flatten)]`` catchall). Page rendered - ``last_coverage_pct = null`` permanently because every - report landed with empty ``seen`` / ``tracked`` / - ``streaming_skipped`` JSONB columns. - """ - runtime.bump_coverage_counter("_coverage_seen", "api.openai.com") - runtime.bump_coverage_counter("_coverage_tracked", "api.openai.com") - runtime.bump_coverage_counter( - "_coverage_streaming_skipped", "api.openai.com" - ) - - # Drain anything left in the buffer from prior tests. - buf = runtime._transport._buffer - try: - buf.clear() - except Exception: - pass - - result = runtime.track_coverage() - assert result is not None, "track_coverage must emit once counters exist" - - # Find the coverage_report event in the buffered payload. - events = [e for e in list(buf) if e.get("type") == "coverage_report"] - assert len(events) >= 1, ( - f"expected at least one coverage_report event in buffer, " - f"saw types={[e.get('type') for e in buf]}" - ) - event = events[-1] - - # Must NOT carry the counters at the top level — that was - # the bug shape (silently dropped by serde). - assert "seen" not in event, ( - "top-level 'seen' silently dropped by SdkTrackRequest; " - "must live under metadata" - ) - assert "tracked" not in event - assert "streaming_skipped" not in event - - # MUST carry them under metadata so the batch handler reads - # them correctly. - metadata = event.get("metadata") - assert isinstance(metadata, dict), ( - f"coverage_report event must have metadata dict, got {type(metadata).__name__}: {event!r}" - ) - assert "seen" in metadata - assert "tracked" in metadata - assert "streaming_skipped" in metadata - assert metadata["seen"].get("api.openai.com") == 1 - assert metadata["tracked"].get("api.openai.com") == 1 - assert metadata["streaming_skipped"].get("api.openai.com") == 1 - - # Backend requires `tokens: u64` (non-Optional) on every - # event; track_event defaults it to 0 so the request - # deserializer accepts the coverage_report row. - assert event.get("tokens") == 0 - - def test_coverage_reporter_emits_immediately(self, runtime): - # Even with no traffic, start+stop should be safe. - runtime.start_coverage_reporter() - # Idempotent. - runtime.start_coverage_reporter() - # Stop should not deadlock. - runtime.stop_coverage_reporter(timeout=2.0) - - def test_coverage_reporter_emits_periodically_with_traffic(self, runtime): - # Override interval to a tiny value so the test runs fast. - runtime._COVERAGE_REPORT_INTERVAL_SECONDS = 0.2 - runtime.bump_coverage_counter("_coverage_seen", "api.openai.com") - runtime.bump_coverage_counter("_coverage_tracked", "api.openai.com") - - runtime.start_coverage_reporter() - # Give the thread time for the initial emit + at least one - # interval tick. 0.5s is comfortably > 2× the 0.2s interval. - time.sleep(0.5) - runtime.stop_coverage_reporter(timeout=2.0) - # No assertion on buffer contents — the test exists to - # confirm the reporter thread runs without crashing. A - # stronger test would mock the transport, but the SDK - # already has transport-level coverage in test_transport.py. diff --git a/tests/test_coverage_seen_httpx.py b/tests/test_coverage_seen_httpx.py deleted file mode 100644 index 381b1af..0000000 --- a/tests/test_coverage_seen_httpx.py +++ /dev/null @@ -1,151 +0,0 @@ -""" -Regression test for plan item P2-1: coverage_seen must be incremented -in the httpx path, not only the requests path. - -Pre-fix, ``_safe_bump_coverage(runtime, "_coverage_seen", host)`` was -only called from ``auto_requests.py:185``. The httpx transport's -``_emit`` (which handles ~95% of LLM traffic — OpenAI, Anthropic, -Gemini, Mistral, Cohere all use httpx under the hood) just called -``runtime.track(...)`` without bumping the counter. - -Net effect: the dashboard's ``coverage_seen`` view was empty for the -majority of customers. Operators couldn't tell which LLM hosts an -agent was actually talking to. - -Post-fix both sync and async httpx ``_emit`` bump the counter. -""" - -import asyncio -from unittest.mock import MagicMock - -import httpx -import pytest - -from nullrun.instrumentation.auto import ( - NullRunAsyncTransport, - NullRunSyncTransport, -) - - -def _make_response(body: bytes, host: str = "api.openai.com") -> httpx.Response: - request = httpx.Request("POST", f"https://{host}/v1/chat/completions") - return httpx.Response( - 200, - headers={"content-type": "application/json"}, - content=body, - request=request, - ) - - -# A minimal OpenAI-completions response body with usage. The extractor -# for api.openai.com reads ``usage.{prompt_tokens, completion_tokens, -# total_tokens}``. -USAGE_BODY = ( - b'{"id":"chatcmpl-1","choices":[{"message":{"role":"assistant","content":"hi"}}],' - b'"usage":{"prompt_tokens":5,"completion_tokens":3,"total_tokens":8}}' -) - - -def test_sync_transport_bumps_coverage_seen(): - """A successful OpenAI call via the sync httpx transport must - bump ``_coverage_seen[api.openai.com]`` to 1.""" - runtime = MagicMock() - # Provide a real dict for _coverage_seen so the bump survives - # the test assertion. - runtime._coverage_seen = {} - - inner = MagicMock() - inner.handle_request.return_value = _make_response(USAGE_BODY) - - transport = NullRunSyncTransport(inner=inner, runtime=runtime) - request = httpx.Request("POST", "https://api.openai.com/v1/chat/completions") - transport.handle_request(request) - - assert runtime._coverage_seen.get("api.openai.com") == 1, ( - f"coverage_seen[api.openai.com] should be 1 after one httpx " - f"call; got {runtime._coverage_seen}" - ) - - -def test_sync_transport_bumps_for_anthropic(): - """Same bump applies to other supported hosts — the dashboard - should see Anthropic traffic too, not just OpenAI.""" - runtime = MagicMock() - runtime._coverage_seen = {} - - # Anthropic-style response body: usage.{input_tokens, output_tokens}. - # See _anthropic_extractor in auto.py. - anthropic_body = ( - b'{"id":"msg-1","content":[{"type":"text","text":"hi"}],' - b'"usage":{"input_tokens":10,"output_tokens":4}}' - ) - inner = MagicMock() - inner.handle_request.return_value = _make_response(anthropic_body, host="api.anthropic.com") - - transport = NullRunSyncTransport(inner=inner, runtime=runtime) - request = httpx.Request("POST", "https://api.anthropic.com/v1/messages") - transport.handle_request(request) - - assert runtime._coverage_seen.get("api.anthropic.com") == 1, ( - f"coverage_seen[api.anthropic.com] should be 1; got {runtime._coverage_seen}" - ) - - -def test_async_transport_bumps_coverage_seen(): - """Async mirror: a call via the async httpx transport also - bumps the counter.""" - runtime = MagicMock() - runtime._coverage_seen = {} - - async def fake_handle(_request): - return _make_response(USAGE_BODY) - - inner = MagicMock() - inner.handle_async_request.side_effect = fake_handle - - transport = NullRunAsyncTransport(inner=inner, runtime=runtime) - request = httpx.Request("POST", "https://api.openai.com/v1/chat/completions") - asyncio.run(transport.handle_async_request(request)) - - assert runtime._coverage_seen.get("api.openai.com") == 1, ( - f"async coverage_seen[api.openai.com] should be 1; got {runtime._coverage_seen}" - ) - - -def test_sync_transport_bumps_incrementally_across_requests(): - """Multiple calls to the same host must accumulate, not overwrite - (so the counter is a real frequency, not a 0/1 flag).""" - runtime = MagicMock() - runtime._coverage_seen = {} - - inner = MagicMock() - inner.handle_request.return_value = _make_response(USAGE_BODY) - - transport = NullRunSyncTransport(inner=inner, runtime=runtime) - request = httpx.Request("POST", "https://api.openai.com/v1/chat/completions") - - for _ in range(3): - transport.handle_request(request) - - assert runtime._coverage_seen.get("api.openai.com") == 3, ( - f"3 calls should produce coverage_seen=3; got {runtime._coverage_seen}" - ) - - -def test_sync_transport_no_bump_when_extractor_misses(): - """If the extractor returns None (no usage block in the body), - we don't call _emit, so the counter is NOT bumped. This is the - right behaviour — we only want to count LLM calls we actually - tracked, not every HTTP round-trip to an LLM host.""" - runtime = MagicMock() - runtime._coverage_seen = {} - - body = b'{"id":"chatcmpl-1","choices":[]}' # no usage block - inner = MagicMock() - inner.handle_request.return_value = _make_response(body) - - transport = NullRunSyncTransport(inner=inner, runtime=runtime) - request = httpx.Request("POST", "https://api.openai.com/v1/chat/completions") - transport.handle_request(request) - - assert runtime._coverage_seen == {}, f"no usage → no bump; got {runtime._coverage_seen}" diff --git a/tests/test_llm_call_metadata_flags.py b/tests/test_llm_call_metadata_flags.py new file mode 100644 index 0000000..9e92b9b --- /dev/null +++ b/tests/test_llm_call_metadata_flags.py @@ -0,0 +1,126 @@ +""" +Pin the wire shape of ``llm_call`` event metadata for coverage derivation. + +The backend's coverage query (backend/src/coverage/mod.rs) reads two +boolean flags off `metadata`: + + - `tracked` — True when the SDK's `_match_extractor` identified a + known provider (extractor returned a non-None usage). False for + hosts without an extractor, or where the model was the literal + "unknown" fallback. + + - `streaming_skipped` — True when the response body exceeded + `MAX_RESPONSE_BYTES` and usage was NOT extractable. The event is + still emitted (counts toward `llm_call_count` denominator) so + coverage_pct is honest about streamed calls. + +0.9.0: these flags REPLACE the old per-host `_coverage_seen` / +`_coverage_tracked` / `_coverage_streaming_skipped` counter dicts. +The previous counter-bump path is gone — see plan at +`~/.claude/plans/async-swinging-hanrahan.md`. + +These tests do NOT exercise the actual HTTP path (that's +`test_streaming_oom_cap.py` and `test_auto_requests.py`). They pin +the wire shape at the SDK boundary so a future refactor that drops +the flags will fail CI immediately. +""" + +from unittest.mock import MagicMock + +import httpx + + +# Mirror the response builder from test_streaming_oom_cap.py to keep +# these tests self-contained. +def _make_response(content: bytes, content_length: int | None = None) -> httpx.Response: + request = httpx.Request("POST", "https://api.openai.com/v1/chat/completions") + headers = {"content-type": "application/json"} + if content_length is not None: + headers["content-length"] = str(content_length) + return httpx.Response(200, headers=headers, content=content, request=request) + + +def test_tracked_flag_true_on_normal_call(): + """A normal call (under cap, extractor matched) emits tracked: True + and NO streaming_skipped flag.""" + from nullrun.instrumentation.auto import ( + MAX_RESPONSE_BYTES, + NullRunSyncTransport, + ) + + runtime = MagicMock() + inner = MagicMock() + body = ( + b'{"id":"chatcmpl-1","choices":[{"message":{"role":"assistant","content":"hi"}}],' + b'"usage":{"prompt_tokens":5,"completion_tokens":3,"total_tokens":8}}' + ) + inner.handle_request.return_value = _make_response(body, content_length=len(body)) + transport = NullRunSyncTransport(inner=inner, runtime=runtime) + transport.handle_request( + httpx.Request("POST", "https://api.openai.com/v1/chat/completions") + ) + + event = runtime.track.call_args[0][0] + assert event["metadata"]["tracked"] is True + # `streaming_skipped` may be absent or False; the absence is the + # honest wire shape. + assert event["metadata"].get("streaming_skipped", False) is False + + +def test_streaming_skipped_flag_on_oversized_response(): + """Oversized response → tracked: False, streaming_skipped: True.""" + from nullrun.instrumentation.auto import ( + MAX_RESPONSE_BYTES, + NullRunSyncTransport, + ) + + runtime = MagicMock() + inner = MagicMock() + body = b"x" * (MAX_RESPONSE_BYTES + 1) + inner.handle_request.return_value = _make_response(body, content_length=len(body)) + transport = NullRunSyncTransport(inner=inner, runtime=runtime) + transport.handle_request( + httpx.Request("POST", "https://api.openai.com/v1/chat/completions") + ) + + event = runtime.track.call_args[0][0] + assert event["metadata"]["tracked"] is False + assert event["metadata"]["streaming_skipped"] is True + + +def test_track_does_not_strip_metadata_flags(): + """`metadata` is NOT in `_WIRE_STRIP_FIELDS` (runtime.py:106-108). + Verify the flags survive the wire boundary by mocking the + transport-level `track` to apply the same stripping rule the + real runtime uses.""" + from nullrun.instrumentation.auto import ( + MAX_RESPONSE_BYTES, + NullRunSyncTransport, + ) + from nullrun.runtime import NullRunRuntime + + runtime = NullRunRuntime(api_key="test", _test_mode=True) + # Capture wire-event shape post-strip: + captured = {} + + def _capture(enriched): + # Mirror runtime.py:1427-1431 strip rule. + _WIRE_STRIP_FIELDS = frozenset({"cost_cents", "_fingerprint", "raw_usage"}) + wire = {k: v for k, v in enriched.items() if k not in _WIRE_STRIP_FIELDS and v is not None} + captured["event"] = wire + return wire + + runtime.track = _capture + inner = MagicMock() + body = b"x" * (MAX_RESPONSE_BYTES + 1) + inner.handle_request.return_value = _make_response(body, content_length=len(body)) + transport = NullRunSyncTransport(inner=inner, runtime=runtime) + transport.handle_request( + httpx.Request("POST", "https://api.openai.com/v1/chat/completions") + ) + + wire = captured["event"] + # `metadata` field is preserved on the wire — backend reads it. + assert "metadata" in wire + assert wire["metadata"]["tracked"] is False + assert wire["metadata"]["streaming_skipped"] is True \ No newline at end of file diff --git a/tests/test_runtime_branches.py b/tests/test_runtime_branches.py index 6e150c9..e659b68 100644 --- a/tests/test_runtime_branches.py +++ b/tests/test_runtime_branches.py @@ -220,62 +220,14 @@ def test_register_sensitive_tools_bulk(): assert "stripe.charge" in tools -# ─── coverage_report / bump_coverage_counter ───────────────────────── - - -def test_coverage_report_returns_independent_copies(): - rt = _make_test_runtime() - rt._coverage_seen["a"] = 1 - snap = rt.coverage_report() - snap["seen"]["b"] = 99 # mutate the snapshot - # Internal state should not observe the mutation. - assert "b" not in rt._coverage_seen - - -def test_bump_coverage_counter_missing_attr_no_op(): - """Stub runtime (duck type) without the attribute → no-op.""" - import threading - - class _Stub: - _tools_lock = threading.Lock() - - stub = _Stub() - NullRunRuntime.bump_coverage_counter(stub, "_coverage_seen", "x") # no raise - - -def test_bump_coverage_counter_non_dict_logs_and_skips(): - """If the target is not a dict, log DEBUG and skip without raising.""" - rt = _make_test_runtime() - rt.__dict__["_coverage_seen"] = "string-not-dict" # bypass dict guard - NullRunRuntime.bump_coverage_counter(rt, "_coverage_seen", "x") # no raise - - -def test_bump_coverage_counter_existing_key_increments(): - rt = _make_test_runtime() - rt._coverage_seen["a"] = 5 - rt.bump_coverage_counter("_coverage_seen", "a") - assert rt._coverage_seen["a"] == 6 - - -def test_bump_coverage_counter_new_key_inserts(): - rt = _make_test_runtime() - rt.bump_coverage_counter("_coverage_seen", "new") - assert rt._coverage_seen["new"] == 1 - - -def test_bump_coverage_counter_evicts_at_cap(caplog): - """When the cap is hit, the OLDEST host is evicted (FIFO).""" - import logging - - rt = _make_test_runtime() - rt._COVERAGE_CAP = 3 - rt._coverage_seen["a"] = 1 - rt._coverage_seen["b"] = 1 - rt._coverage_seen["c"] = 1 - with caplog.at_level(logging.WARNING, logger="nullrun.runtime"): - rt.bump_coverage_counter("_coverage_seen", "d") - assert "a" not in rt._coverage_seen # evicted - assert "d" in rt._coverage_seen +# 0.9.0: removed six `coverage_report` / `bump_coverage_counter` +# tests at lines 223-278. The `_coverage_seen` / +# `_coverage_tracked` / `_coverage_streaming_skipped` dicts, +# `coverage_report()`, `track_coverage()`, +# `start_coverage_reporter()`, `_coverage_reporter_loop()`, and +# `bump_coverage_counter()` method are all gone — coverage is now +# derived server-side from llm_call span metadata. See plan at +# `~/.claude/plans/async-swinging-hanrahan.md`. # ─── execute() mode resolution ────────────────────────────────────── diff --git a/tests/test_streaming_oom_cap.py b/tests/test_streaming_oom_cap.py index e9a2ad2..17b76f9 100644 --- a/tests/test_streaming_oom_cap.py +++ b/tests/test_streaming_oom_cap.py @@ -10,18 +10,19 @@ in long-running services. Post-fix we use a bounded chunked read (``_read_body_with_cap`` / -``_aread_body_with_cap``). When the body exceeds the cap we skip -tracking and increment ``_coverage_streaming_skipped`` so the -dashboard can see which hosts are producing oversized responses. +``_aread_body_with_cap``). When the body exceeds the cap we now +(0.9.0) emit an ``llm_call`` event tagged +``metadata.streaming_skipped: True`` and ``metadata.tracked: False`` +so the backend's coverage query still counts the call toward +``llm_call_count`` but not toward ``tracked_call_count``. The +previous counter-bump on ``_coverage_streaming_skipped`` is gone. """ import asyncio from unittest.mock import MagicMock import httpx -import pytest -from nullrun.instrumentation import auto as auto_mod from nullrun.instrumentation.auto import ( MAX_RESPONSE_BYTES, NullRunAsyncTransport, @@ -89,10 +90,12 @@ def test_aread_body_with_cap_short_circuits_on_content_length(): # =========================================================================== -def test_sync_transport_skips_tracking_on_oversized_response(monkeypatch): +def test_sync_transport_emits_streaming_skipped_event(monkeypatch): """When the response body exceeds MAX_RESPONSE_BYTES, the sync - transport must NOT call ``runtime.track`` and MUST increment - ``_coverage_streaming_skipped``.""" + transport must emit an llm_call event tagged + `metadata.streaming_skipped: True` and `metadata.tracked: False`. + The body is NOT buffered (so the caller can still consume it) + and usage data is not extracted (no tokens field).""" runtime = MagicMock() inner = MagicMock() body = b"x" * (MAX_RESPONSE_BYTES + 1) @@ -104,19 +107,18 @@ def test_sync_transport_skips_tracking_on_oversized_response(monkeypatch): transport.handle_request(request) - # Body was oversized → no llm_call event was emitted. - runtime.track.assert_not_called() - # Coverage counter incremented (best-effort; the runtime mock - # accepts attribute reads). We verify the helper was called via - # the runtime attribute access path: - # ``_safe_bump_coverage(runtime, "_coverage_streaming_skipped", host)`` - # should have read runtime._coverage_streaming_skipped. - # (We don't assert on the dict contents because the mock - # returns a fresh MagicMock for each attribute access; the - # important contract is that track() was NOT called.) + # Track WAS called with the streaming-skipped event. + runtime.track.assert_called_once() + event = runtime.track.call_args[0][0] + assert event["type"] == "llm_call" + assert event["host"] == "api.openai.com" + assert event["has_usage"] is False + assert event["tokens"] == 0 + assert event["metadata"]["streaming_skipped"] is True + assert event["metadata"]["tracked"] is False -def test_async_transport_skips_tracking_on_oversized_response(): +def test_async_transport_emits_streaming_skipped_event(): """Async mirror of the sync test.""" runtime = MagicMock() inner = MagicMock() @@ -132,12 +134,16 @@ async def fake_handle(_request): asyncio.run(transport.handle_async_request(request)) - runtime.track.assert_not_called() + runtime.track.assert_called_once() + event = runtime.track.call_args[0][0] + assert event["metadata"]["streaming_skipped"] is True + assert event["metadata"]["tracked"] is False def test_sync_transport_does_track_normal_sized_response(): """Sanity: the cap doesn't break the happy path. A normal 200-byte - response with a usage block must still be tracked.""" + response with a usage block must still be tracked and the event + must carry `metadata.tracked: True`.""" runtime = MagicMock() inner = MagicMock() body = ( @@ -156,3 +162,6 @@ def test_sync_transport_does_track_normal_sized_response(): event = runtime.track.call_args[0][0] assert event["type"] == "llm_call" assert event["tokens"] == 8 + assert event["metadata"]["tracked"] is True + # streaming_skipped is not present (or False) on a normal tracked call. + assert not event["metadata"].get("streaming_skipped", False) \ No newline at end of file