From c7c31137a2c46f56b3163744ffce49e6941b7a83 Mon Sep 17 00:00:00 2001 From: Anatolii Date: Mon, 29 Jun 2026 15:49:16 +0400 Subject: [PATCH] feat(0.9.0): remove per-process coverage counters; add nullrun.shutdown() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Server-derived coverage replaces the in-process counter dicts. Counter-bump helpers (_safe_bump_coverage, _bump_streaming_skipped) are gone; every llm_call span now carries metadata.tracked and metadata.streaming_skipped flags so the backend's coverage_pct query can compute coverage from span metadata alone. BREAKING CHANGES - NullRunRuntime.coverage_report() removed. - NullRunRuntime._coverage_seen / _coverage_tracked / _coverage_streaming_skipped instance attributes removed. - NullRunRuntime.start_coverage_reporter() daemon thread removed (no longer called from init()). - _safe_bump_coverage / _bump_streaming_skipped helpers removed from nullrun.instrumentation.auto. - tests/test_coverage_report.py and tests/test_coverage_seen_httpx.py deleted — coverage is no longer an SDK-side concept. - llm_call wire shape: metadata.tracked: bool and metadata.streaming_skipped: bool are now authoritative; the separate coverage_report event is dropped. NEW API - nullrun.shutdown(timeout=2.0): sends a clean WebSocket close frame and drains in-flight events. Long-running scripts that exited via sys.exit() previously let the kernel RST the TCP socket, which the backend logged as WARN 'Connection reset without closing handshake'. Registering nullrun.shutdown in an atexit handler eliminates the noisy log. No-op if init() was never called. WIRE TEST - tests/test_llm_call_metadata_flags.py pins the new contract: every llm_call span carries metadata.tracked or metadata.streaming_skipped (the latter for streaming/oversize responses that the SDK deliberately does not buffer). Existing tests updated to match — no public API beyond the additions listed above is renamed or removed. --- examples/basic_observe.py | 12 +- src/nullrun/__init__.py | 55 ++- src/nullrun/instrumentation/auto.py | 333 ++++++++++++++++--- src/nullrun/instrumentation/auto_requests.py | 95 ++++-- src/nullrun/instrumentation/langgraph.py | 70 +++- src/nullrun/runtime.py | 210 +----------- tests/contract/test_llm_call_model_wire.py | 160 +++++++++ tests/test_auto_requests.py | 98 ++---- tests/test_blocker_fixes.py | 29 +- tests/test_coverage_report.py | 145 -------- tests/test_coverage_seen_httpx.py | 151 --------- tests/test_llm_call_metadata_flags.py | 126 +++++++ tests/test_runtime_branches.py | 64 +--- tests/test_streaming_oom_cap.py | 51 +-- 14 files changed, 837 insertions(+), 762 deletions(-) delete mode 100644 tests/test_coverage_report.py delete mode 100644 tests/test_coverage_seen_httpx.py create mode 100644 tests/test_llm_call_metadata_flags.py diff --git a/examples/basic_observe.py b/examples/basic_observe.py index 38a4181..2fdc196 100644 --- a/examples/basic_observe.py +++ b/examples/basic_observe.py @@ -43,11 +43,7 @@ ) print(f"call #{i + 1}: {resp.choices[0].message.content!r}") -# 4. Optional: print a coverage snapshot from the runtime instance. -# The same counters are sent over the WS heartbeat and via the -# HTTP-fallback path when the WS connection is down. -print("\nCoverage snapshot:") -rt = nullrun.get_runtime() -report = rt.coverage_report() -for k, v in report.items(): - print(f" {k}: {v}") +# 4. 0.9.0: per-process coverage snapshot removed. Coverage is now +# derived server-side from llm_call span metadata (host + tracked + +# streaming_skipped flags). Query the dashboard or use +# `GET /api/v1/coverage/{org_id}` to inspect. diff --git a/src/nullrun/__init__.py b/src/nullrun/__init__.py index df8689d..a16c542 100644 --- a/src/nullrun/__init__.py +++ b/src/nullrun/__init__.py @@ -41,6 +41,43 @@ def my_agent(query): from nullrun.runtime import track_event, track_llm, track_tool +def shutdown(timeout: float = 2.0) -> None: + """Gracefully shut down the NullRun runtime. + + Sends a clean WebSocket close frame, drains in-flight events, and + stops background threads (HTTP poller, WS push listener). After + this returns, any further ``nullrun.track(...)`` call or + ``@protect``-decorated call is a no-op. + + Audit 2026-06-29 (WS graceful close on exit): a long-running + script that exits via ``sys.exit()`` lets the kernel RST the TCP + socket, which the backend logs as WARN "Connection reset + without closing handshake". Calling ``nullrun.shutdown()`` + before exit (or registering it via ``atexit``) eliminates the + noisy log. No-op if ``init()`` was never called. + + Args: + timeout: seconds to wait for the WS close handshake to + complete before giving up. The underlying + ``NullRunRuntime.shutdown()`` already caps WS join at + 0.5s and the WS close at 2.0s — this parameter is + reserved for future expansion and is currently unused. + + Example:: + + import atexit + import nullrun + atexit.register(nullrun.shutdown) + """ + # Lazy import so the SDK module-import path stays light (mirrors + # the pattern in `init` and `status`). + from nullrun.runtime import NullRunRuntime + runtime = NullRunRuntime._instance # type: ignore[attr-defined] + if runtime is None: + return + runtime.shutdown() + + def status(): """Return the current runtime state as a Layer-3 :class:`NullRunStatus` snapshot. @@ -300,11 +337,10 @@ def my_agent(): auto_instrument(runtime) - # Start the coverage reporter so the backend gets a coverage_report - # event every 60s. Daemon thread; safe to leak across re-init. - # The coverage reporter is a no-op when no LLM traffic has been - # observed (see ``track_coverage``). - runtime.start_coverage_reporter() + # 0.9.0: coverage reporter removed. Coverage is now derived + # server-side from llm_call span metadata (host + tracked + + # streaming_skipped flags). No 60s daemon thread, no per-process + # counter dicts. return runtime @@ -448,6 +484,15 @@ def __dir__() -> list[str]: "track_llm", "track_tool", "track_event", + # Audit 2026-06-29 (WS graceful close on exit): the user-facing + # top-level ``shutdown()`` sends a clean WS close frame and + # drains in-flight events. Without it, a long-running script + # that exits via ``sys.exit()`` lets the kernel RST the TCP + # socket → backend logs WARN "Connection reset without closing + # handshake". Calling ``nullrun.shutdown()`` before + # ``sys.exit(0)`` (or in an ``atexit`` handler) eliminates the + # noisy log. No-op if init() was never called. + "shutdown", # Layer 2: global on_error hook. Eager because it is the # single most important "give the user a chance" API — the # user has to know it exists to call it. diff --git a/src/nullrun/instrumentation/auto.py b/src/nullrun/instrumentation/auto.py index 45e8bcb..698fba9 100644 --- a/src/nullrun/instrumentation/auto.py +++ b/src/nullrun/instrumentation/auto.py @@ -634,9 +634,12 @@ def handle_request(self, request: httpx.Request) -> httpx.Response: # rebuild path only. body = _read_body_with_cap(response, MAX_RESPONSE_BYTES) if body is None: - # Body exceeded the cap. Drain it (so callers don't - # see a half-consumed response) but don't track. - _safe_bump_coverage(self._runtime, "_coverage_streaming_skipped", host) + # Body exceeded the cap. 0.9.0: still emit an + # llm_call event so the call counts toward coverage + # (host known, model best-effort, tracked: false + # because usage wasn't extractable). Drain the body + # so callers don't see a half-consumed response. + _emit_streaming_skipped(self._runtime, request, host) logger.debug( "NullRun transport: response from %s exceeded %d bytes; " "skipping usage tracking", @@ -709,16 +712,6 @@ def _emit( body: bytes, status: int, ) -> None: - # P2-1 (plan §10): bump the coverage counter so the dashboard - # can see which LLM hosts the agent is talking to. Pre-fix - # this counter was only incremented in the ``requests`` path - # (auto_requests.py:185). The httpx path is the dominant - # one (every OpenAI / Anthropic / Gemini / Mistral / Cohere - # call goes through httpx), so without this bump the - # ``coverage_seen`` view in the dashboard would be empty for - # the majority of customers. - _safe_bump_coverage(self._runtime, "_coverage_seen", host) - # 2026-06-28 (Issue 2 fix): if the extractor returned ``None`` # for ``model`` (response body lacked the field — observed for # some OpenAI Responses-API and Anthropic streaming edge cases), @@ -738,6 +731,14 @@ def _emit( or _extract_model_from_request_body(request) ) + # 0.9.0: every successful llm_call span carries + # `metadata.tracked: True`. The backend's coverage query + # (backend/src/coverage/mod.rs) computes tracked_pct from + # this flag — it replaces the old `_coverage_seen` / + # `_coverage_tracked` per-host dicts. Usage was extracted + # successfully, so the SDK's `_match_extractor` identified + # a known provider. See plan at + # `~/.claude/plans/async-swinging-hanrahan.md`. try: # Phase 4.1: lift cache / reasoning / finish / tool names # out of raw_usage onto the event itself. The backend's @@ -760,6 +761,9 @@ def _emit( "finish_reason": usage.get("finish_reason"), "tool_names": usage.get("tool_names") or [], "has_usage": True, + "metadata": { + "tracked": True, + }, # Stripped at the wire boundary by _WIRE_STRIP_FIELDS # in runtime.py — kept here only so the in-process # dedup layer can see the full vendor payload. @@ -805,7 +809,10 @@ async def handle_async_request(self, request: httpx.Request) -> httpx.Response: # P0-3: bounded read (see sync path for full rationale). body = await _aread_body_with_cap(response, MAX_RESPONSE_BYTES) if body is None: - _safe_bump_coverage(self._runtime, "_coverage_streaming_skipped", host) + # 0.9.0: emit llm_call with metadata.streaming_skipped: true + # so the call counts toward coverage (host known, + # tracked: false because usage wasn't extractable). + _emit_streaming_skipped(self._runtime, request, host) logger.debug( "NullRun transport: async response from %s exceeded %d bytes; " "skipping usage tracking", @@ -873,10 +880,11 @@ def _emit( body: bytes, status: int, ) -> None: - # P2-1 (plan §10): mirror the sync path — bump the coverage - # counter so the dashboard's ``coverage_seen`` view shows - # httpx-path traffic (the dominant path). - _safe_bump_coverage(self._runtime, "_coverage_seen", host) + # 0.9.0: emit llm_call with metadata.tracked: True (sync + # path is identical). Async path doesn't have the request- + # body model fallback yet (sync path's + # `_extract_model_from_request_body` is sync-only); leave + # model as the response-body value or None. try: # Phase 4.1: see sync _emit for rationale. Async path # uses identical event shape so the dedup key space @@ -896,6 +904,9 @@ def _emit( "finish_reason": usage.get("finish_reason"), "tool_names": usage.get("tool_names") or [], "has_usage": True, + "metadata": { + "tracked": True, + }, "raw_usage": usage, "_fingerprint": _fingerprint_for(host, body, status), } @@ -992,6 +1003,14 @@ def _fingerprint_for_event_dict(event: dict[str, Any]) -> str: # first runtime — silently losing track() calls from later test runs. _orig_sync_init: Callable[..., Any] | None = None _orig_async_init: Callable[..., Any] | None = None +# Audit 2026-06-29 (reset_for_tests gap): stash the originals of the +# class methods we wrap so reset_for_tests can put them back. Without +# this, a second test pass with `_langchain_patched = False` would +# double-wrap `BaseCallbackManager.__init__`, and similarly for +# `agents.Runner.run` / `Runner.run_sync`. +_orig_base_callback_manager_init: Callable[..., Any] | None = None +_orig_runner_run: Callable[..., Any] | None = None +_orig_runner_run_sync: Callable[..., Any] | None = None def patch_httpx(runtime: Any) -> bool: @@ -1165,6 +1184,12 @@ def patch_langchain_callback(runtime: Any) -> bool: return True _orig_init = BaseCallbackManager.__init__ + # Audit 2026-06-29 (reset_for_tests gap): stash the original + # on a module-level so reset_for_tests can put it back. + # Without this, a second test pass with `_langchain_patched + # = False` would double-wrap. + global _orig_base_callback_manager_init + _orig_base_callback_manager_init = _orig_init def _wrap_init(self: Any, *args: Any, **kwargs: Any) -> None: _orig_init(self, *args, **kwargs) @@ -1190,6 +1215,146 @@ def _wrap_init(self: Any, *args: Any, **kwargs: Any) -> None: return True +# --------------------------------------------------------------------------- +# D4b: patch_chat_model_invoke — defensive callback injection at the LLM +# boundary. +# --------------------------------------------------------------------------- +# Audit 2026-06-29 (silent zero-billing): the previous +# ``patch_langchain_callback`` only wrapped ``BaseCallbackManager.__init__``. +# When the user instantiates ``ChatOpenAI(...)`` *before* ``nullrun.init`` +# (a common pattern — see SDK examples), the ``ChatOpenAI`` object keeps +# no callback manager attached. The patched ``__init__`` runs only when +# a *new* ``BaseCallbackManager`` is constructed inside +# ``BaseChatModel.invoke`` / ``Runnable.invoke``. If the LangGraph path +# goes through a different construction sequence (e.g. caching, +# alternative transports, in-memory mock providers) the new manager +# might be bypassed and ``on_llm_end`` never fires. +# +# To make the wiring robust we also wrap ``BaseChatModel.invoke`` / +# ``BaseChatModel.ainvoke`` so a ``NullRunCallback`` is always present +# in the per-call handlers list. This is belt-and-suspenders over the +# ``BaseCallbackManager.__init__`` patch — if the manager constructor +# runs, the handler is added; if the manager constructor is somehow +# bypassed, the invoke wrapper still attaches it. +# +# Idempotent. Returns False if ``langchain-core`` is not installed. + +_chat_model_invoke_patched = False +_orig_chat_model_invoke: Callable[..., Any] | None = None +_orig_chat_model_ainvoke: Callable[..., Any] | None = None +_orig_chat_model_stream: Callable[..., Any] | None = None +_orig_chat_model_astream: Callable[..., Any] | None = None + + +def patch_chat_model_invoke(runtime: Any) -> bool: + """Inject ``NullRunCallback`` at the ``BaseChatModel.invoke`` / + ``ainvoke`` boundary as a defensive complement to + ``patch_langchain_callback``. + + See the audit block above for why both layers are needed. We never + *replace* user-supplied callbacks — the wrapped closure appends + our handler only when the user hasn't already supplied a + ``NullRunCallback``. + """ + global _chat_model_invoke_patched + with _langchain_lock: + if _chat_model_invoke_patched: + return True + try: + from langchain_core.language_models import BaseChatModel + except ImportError: + logger.debug("langchain-core not installed; chat-model invoke patch skipped") + return False + + if getattr(BaseChatModel, "_nullrun_invoke_patched", False): + _chat_model_invoke_patched = True + return True + + def _ensure_cb(handlers: Any) -> list[Any]: + """Append a NullRunCallback to a handler list if absent.""" + if handlers is None: + handlers = [] + try: + if any(isinstance(h, NullRunCallback) for h in handlers): + return list(handlers) + except TypeError: + return list(handlers) if handlers else [] + return list(handlers) + [NullRunCallback(runtime=runtime)] + + _orig_invoke = BaseChatModel.invoke + _orig_ainvoke = BaseChatModel.ainvoke + _orig_stream = BaseChatModel.stream + _orig_astream = BaseChatModel.astream + + def _wrap_invoke(self: Any, input: Any, config: Any = None, **kwargs: Any) -> Any: + new_config = _inject_handler_into_config(config, _ensure_cb) + return _orig_invoke(self, input, new_config, **kwargs) + + async def _wrap_ainvoke( + self: Any, input: Any, config: Any = None, **kwargs: Any + ) -> Any: + new_config = _inject_handler_into_config(config, _ensure_cb) + return await _orig_ainvoke(self, input, new_config, **kwargs) + + def _wrap_stream( + self: Any, input: Any, config: Any = None, **kwargs: Any + ) -> Any: + new_config = _inject_handler_into_config(config, _ensure_cb) + return _orig_stream(self, input, new_config, **kwargs) + + def _wrap_astream( + self: Any, input: Any, config: Any = None, **kwargs: Any + ) -> Any: + new_config = _inject_handler_into_config(config, _ensure_cb) + return _orig_astream(self, input, new_config, **kwargs) + + global _orig_chat_model_invoke, _orig_chat_model_ainvoke + _orig_chat_model_invoke = _orig_invoke + _orig_chat_model_ainvoke = _orig_ainvoke + global _orig_chat_model_stream, _orig_chat_model_astream + _orig_chat_model_stream = _orig_stream + _orig_chat_model_astream = _orig_astream + + BaseChatModel.invoke = _wrap_invoke # type: ignore[method-assign] + BaseChatModel.ainvoke = _wrap_ainvoke # type: ignore[method-assign] + BaseChatModel.stream = _wrap_stream # type: ignore[method-assign] + BaseChatModel.astream = _wrap_astream # type: ignore[method-assign] + BaseChatModel._nullrun_invoke_patched = True # type: ignore[attr-defined] + _chat_model_invoke_patched = True + logger.info( + "LangChain BaseChatModel.invoke/ainvoke/stream/astream defensive callback injection installed" + ) + return True + + +def _inject_handler_into_config(config: Any, ensure: Callable[[Any], list[Any]]) -> Any: + """Helper: ensure ``config["callbacks"]`` (or the + ``configurable``/``callbacks`` kwarg) carries our handler. Returns + the original config untouched when we can't safely mutate it + (e.g. ``config`` is a frozen mapping). + + The user may pass either: + - a dict like ``{"callbacks": [...]}`` (standard Runnable path) + - a RunnableConfig built from ``ConfigurableFieldSpec`` etc. + - ``None`` (we synthesise a fresh dict). + """ + if config is None: + return {"callbacks": ensure(None)} + if not isinstance(config, dict): + return config + # `callbacks` is the canonical key on RunnableConfig. Some + # wrappers use `callback_manager` — we honour both. We never + # *replace* a user-supplied callback manager; we only ensure + # the handler list contains our NullRunCallback. + if "callbacks" in config: + config = dict(config) + config["callbacks"] = ensure(config["callbacks"]) + return config + config = dict(config) + config["callbacks"] = ensure(None) + return config + + # --------------------------------------------------------------------------- # D5: patch_openai_agents — OpenAI Agents SDK tracer # --------------------------------------------------------------------------- @@ -1219,6 +1384,13 @@ def patch_openai_agents(runtime: Any) -> bool: _orig_run = Runner.run _orig_run_sync = getattr(Runner, "run_sync", None) + # Audit 2026-06-29 (reset_for_tests gap): stash originals so + # reset_for_tests can restore them. Without this, a second + # test pass with `_agents_patched = False` would double-wrap + # Runner.run / Runner.run_sync. + global _orig_runner_run, _orig_runner_run_sync + _orig_runner_run = _orig_run + _orig_runner_run_sync = _orig_run_sync def _wrap_run(*args: Any, **kwargs: Any) -> Any: result = _orig_run(*args, **kwargs) @@ -1472,10 +1644,10 @@ def auto_instrument(runtime: Any) -> bool: with _auto_lock: if _auto_installed: return True - # Lazy imports — auto_requests needs `_safe_bump_coverage` (now - # defined in this module) at module import time. The framework - # patches below are silent no-ops when their respective - # packages aren't installed. + # Lazy imports — auto_requests and the framework patches below + # are silent no-ops when their respective packages aren't + # installed. Each sub-importer handles its own missing-dep + # case. from nullrun.instrumentation._safe_patch import safe_patch from nullrun.instrumentation.auto_requests import patch_requests from nullrun.instrumentation.autogen import patch_autogen @@ -1485,6 +1657,15 @@ def auto_instrument(runtime: Any) -> bool: paths = [ safe_patch("httpx", lambda: patch_httpx(runtime)), safe_patch("langchain_callback", lambda: patch_langchain_callback(runtime)), + # D4b (2026-06-29): belt-and-suspenders callback injection at + # the BaseChatModel.invoke boundary. Ensures NullRunCallback + # fires even when the user creates the LLM BEFORE init() and + # the BaseCallbackManager.__init__ patch is somehow bypassed + # (LangGraph node-internal calls, cached config paths, etc.). + safe_patch( + "chat_model_invoke", + lambda: patch_chat_model_invoke(runtime), + ), safe_patch("openai_agents", lambda: patch_openai_agents(runtime)), safe_patch("langgraph_compiled", lambda: patch_langgraph_compiled(runtime)), safe_patch("requests", lambda: patch_requests(runtime)), @@ -1529,6 +1710,10 @@ def reset_for_tests() -> None: global _orig_sync_init, _orig_async_init global _orig_pregel_invoke, _orig_pregel_stream global _orig_pregel_ainvoke, _orig_pregel_astream + global _orig_chat_model_invoke, _orig_chat_model_ainvoke + global _orig_chat_model_stream, _orig_chat_model_astream + global _orig_base_callback_manager_init + global _orig_runner_run, _orig_runner_run_sync _auto_installed = False _httpx_patched = False _langchain_patched = False @@ -1562,6 +1747,50 @@ def reset_for_tests() -> None: _orig_pregel_stream = None _orig_pregel_ainvoke = None _orig_pregel_astream = None + # D4b (2026-06-29): restore BaseChatModel.invoke/ainvoke/stream/astream + # if we patched them, otherwise the next test pass would double-wrap. + if _orig_chat_model_invoke is not None: + try: + from langchain_core.language_models import BaseChatModel + BaseChatModel.invoke = _orig_chat_model_invoke # type: ignore[method-assign] + BaseChatModel.ainvoke = _orig_chat_model_ainvoke # type: ignore[method-assign] + BaseChatModel.stream = _orig_chat_model_stream # type: ignore[method-assign] + BaseChatModel.astream = _orig_chat_model_astream # type: ignore[method-assign] + BaseChatModel._nullrun_invoke_patched = False # type: ignore[attr-defined] + except Exception as e: # pragma: no cover — defensive + logger.debug("reset_for_tests: failed to restore BaseChatModel: %s", e) + _orig_chat_model_invoke = None + _orig_chat_model_ainvoke = None + _orig_chat_model_stream = None + _orig_chat_model_astream = None + global _chat_model_invoke_patched + _chat_model_invoke_patched = False + # Audit 2026-06-29 (reset_for_tests gap): pre-fix the function + # reset the *_patched flag for langchain_callback and openai_agents + # but did NOT restore the wrapped class methods. A second test + # pass with `auto._langchain_patched = False; auto_instrument(r)` + # would then double-wrap BaseCallbackManager.__init__ / + # Runner.run / Runner.run_sync. Fix: restore them here so a + # repeat pass gets a clean wrap. + if _orig_base_callback_manager_init is not None: + try: + from langchain_core.callbacks import BaseCallbackManager + BaseCallbackManager.__init__ = _orig_base_callback_manager_init # type: ignore[method-assign] + BaseCallbackManager._nullrun_patched = False # type: ignore[attr-defined] + except Exception as e: # pragma: no cover — defensive + logger.debug("reset_for_tests: failed to restore BaseCallbackManager: %s", e) + _orig_base_callback_manager_init = None + if _orig_runner_run is not None: + try: + from agents import Runner + Runner.run = _orig_runner_run # type: ignore[method-assign] + if _orig_runner_run_sync is not None: + Runner.run_sync = _orig_runner_run_sync # type: ignore[method-assign] + Runner._nullrun_patched = False # type: ignore[attr-defined] + except Exception as e: # pragma: no cover — defensive + logger.debug("reset_for_tests: failed to restore Runner: %s", e) + _orig_runner_run = None + _orig_runner_run_sync = None # --------------------------------------------------------------------------- @@ -1674,27 +1903,41 @@ def _fingerprint_is_seen(state: OrderedDict[str, None], fp: str) -> bool: return False -def _safe_bump_coverage(runtime: Any, target_attr: str, host: str) -> None: - """Bump a per-host counter on the runtime, tolerating stub runtimes - (MagicMock, custom test doubles) that don't carry the attribute. - - ``target_attr`` is one of ``_coverage_seen``, - ``_coverage_streaming_skipped``. Mirrors the structure of - ``_fingerprint_is_seen`` — never raises. - - Background: ``nullrun.instrumentation.auto_requests`` imports this - helper but the original 0.3.0 release never defined it, so the - entire ``requests`` auto-instrumentation path was unimportable. - Adding the helper here unblocks the module and the dashboard's - coverage tab. +def _emit_streaming_skipped( + runtime: Any, + request: httpx.Request, + host: str, +) -> None: + """Emit an llm_call event for a response where the body exceeded + the tracking cap and usage data could not be extracted. + + 0.9.0: replaces the old `_safe_bump_coverage(..., + "_coverage_streaming_skipped", host)` counter bump. The event + carries `metadata.streaming_skipped: True` and `metadata.tracked: + False` (extractor did not run because the body was never read) + so the backend's coverage query still counts it toward the + `llm_call_count` denominator while flagging it as not-tracked. + + `model` falls back to the request body via + `_extract_model_from_request_body` (sync-only, mirrors + `_emit`'s pattern at lines 735-739). """ - target = getattr(runtime, target_attr, None) - if target is None: - return - if isinstance(target, dict): - target[host] = int(target.get(host, 0)) + 1 - else: - try: - target[host] = int(target[host]) + 1 - except Exception as e: # pragma: no cover — defensive - logger.debug("_safe_bump_coverage: %s bump failed: %s", target_attr, e) + try: + runtime.track( + { + "type": "llm_call", + "provider": _provider_label(host), + "host": host, + "model": _extract_model_from_request_body(request), + "tokens": 0, + "input_tokens": 0, + "output_tokens": 0, + "has_usage": False, + "metadata": { + "tracked": False, + "streaming_skipped": True, + }, + } + ) + except Exception as e: # pragma: no cover — defensive + logger.debug("NullRun transport: streaming-skipped track failed: %s", e) diff --git a/src/nullrun/instrumentation/auto_requests.py b/src/nullrun/instrumentation/auto_requests.py index b1a754c..13c6cc2 100644 --- a/src/nullrun/instrumentation/auto_requests.py +++ b/src/nullrun/instrumentation/auto_requests.py @@ -14,15 +14,13 @@ - `_match_extractor(host)` — exact + subdomain match - `_provider_label(host)` — short label for the `provider` event field - `_fingerprint_for(host, body, status)` — dedup fingerprint -- `_safe_bump_coverage(runtime, target_attr, host)` — bounded counter - bump that tolerates stub runtimes (MagicMock, custom test doubles) What this module owns: - `patch_requests(runtime)` — wraps `requests.Session.send` so every call routed through a session is observed. Idempotent. - Streaming handling: `requests.get(url, stream=True)` and - `Accept: text/event-stream` are skipped with a `streaming-skipped` - coverage marker. We do NOT buffer the response — that would break + `Accept: text/event-stream` emit a `metadata.streaming_skipped: true` + llm_call event. We do NOT buffer the response — that would break user-facing streaming (the caller reads `iter_content`/`iter_lines` chunk-by-chunk). The known limit is documented in `docs/known-limitations.md`. @@ -31,6 +29,12 @@ `urllib3` patch (which `requests` uses under the hood) can skip already-tracked requests. See plan section P2 / "requests ↔ urllib3". +0.9.0: counter-bump helpers (`_safe_bump_coverage`, +`_bump_streaming_skipped`) are gone — coverage is now derived from +llm_call span metadata. Each emit site tags `metadata.tracked: bool` +and `metadata.streaming_skipped: bool` so the backend can compute +coverage_pct from `spans.metadata` directly. + `aiohttp` is deliberately out of scope for this phase — see `docs/known-limitations.md` and the plan's open questions. """ @@ -45,7 +49,6 @@ _fingerprint_for, _match_extractor, _provider_label, - _safe_bump_coverage, ) logger = logging.getLogger(__name__) @@ -77,24 +80,6 @@ def _is_streaming_request(request: Any, send_kwargs: dict[str, Any]) -> bool: return any(ct in accept for ct in _STREAMING_CONTENT_TYPES) -def _bump_streaming_skipped(runtime: Any, host: str) -> None: - """Phase P2: bump a `streaming-skipped` counter so the dashboard - surfaces *known* untracked hosts (vs. just "seen but unknown - extractor"). Mirrors the structure of `_safe_bump_coverage` to - tolerate stub runtimes. - """ - target = getattr(runtime, "_coverage_streaming_skipped", None) - if target is None: - return - bump = getattr(runtime, "_bump_coverage_counter", None) - if bump is None: - return - try: - bump(target, host) - except Exception as e: # pragma: no cover — defensive - logger.debug("NullRun streaming-skipped bump failed: %s", e) - - def _emit_to_runtime( runtime: Any, request: Any, @@ -107,8 +92,10 @@ def _emit_to_runtime( transport. Kept in this module (rather than re-exported from `auto.py`) so the requests path is self-contained and the `requests` dep is not pulled into `auto.py`'s import graph. + + 0.9.0: emits `metadata.tracked: True` — usage was extracted so + the SDK's `_match_extractor` identified a known provider. """ - _safe_bump_coverage(runtime, "_coverage_tracked", host) try: runtime.track( { @@ -120,6 +107,9 @@ def _emit_to_runtime( "input_tokens": usage.get("prompt_tokens", 0), "output_tokens": usage.get("completion_tokens", 0), "has_usage": True, + "metadata": { + "tracked": True, + }, "raw_usage": usage, "_fingerprint": _fingerprint_for(host, body, status), } @@ -128,6 +118,49 @@ def _emit_to_runtime( logger.debug("NullRun requests transport: track failed: %s", e) +def _emit_streaming_skipped_to_runtime( + runtime: Any, + request: Any, + host: str, +) -> None: + """0.9.0: emit an llm_call event for a streamed response that + we deliberately did NOT buffer (so the user keeps their chunked + read). Tags `metadata.streaming_skipped: True` and + `metadata.tracked: False` (extractor never ran because the body + was never read) — backend counts it toward `llm_call_count` but + not toward `tracked_call_count`. + + Model is best-effort from the request body (sync path; mirrors + `auto._emit_streaming_skipped`). + """ + try: + from nullrun.instrumentation.auto import ( + _extract_model_from_request_body, + ) + model = _extract_model_from_request_body(request) + except Exception: # pragma: no cover — defensive + model = None + try: + runtime.track( + { + "type": "llm_call", + "provider": _provider_label(host), + "host": host, + "model": model, + "tokens": 0, + "input_tokens": 0, + "output_tokens": 0, + "has_usage": False, + "metadata": { + "tracked": False, + "streaming_skipped": True, + }, + } + ) + except Exception as e: + logger.debug("NullRun requests transport: streaming-skipped track failed: %s", e) + + _requests_patched = False _requests_lock = threading.Lock() _orig_session_send: Any = None @@ -179,17 +212,13 @@ def _wrapped_send(self: Any, request: Any, **kwargs: Any) -> Any: host = urllib.parse.urlparse(url).hostname or "" - # Phase 1.1: bump seen-counter for *every* host, including - # ones we don't have an extractor for. Same pattern as - # the httpx transport. - _safe_bump_coverage(runtime, "_coverage_seen", host) - # Streaming skip: do NOT read `response.content` here — # that would buffer the entire stream and break the - # caller's chunked consumption. Mark as `streaming-skipped` - # so the dashboard can show "known but untracked". + # caller's chunked consumption. Emit an llm_call event + # tagged `metadata.streaming_skipped: true` so the + # backend can still see the call in coverage. if _is_streaming_request(request, kwargs): - _bump_streaming_skipped(runtime, host) + _emit_streaming_skipped_to_runtime(runtime, request, host) return _orig_session_send(self, request, **kwargs) extractor = _match_extractor(host) @@ -254,4 +283,4 @@ def reset_for_tests() -> None: Session._nullrun_patched = False # type: ignore[attr-defined] except Exception as e: # pragma: no cover — defensive logger.debug("reset_for_tests: failed to restore Session: %s", e) - _orig_session_send = None + _orig_session_send = None \ No newline at end of file diff --git a/src/nullrun/instrumentation/langgraph.py b/src/nullrun/instrumentation/langgraph.py index 69c0b41..a9c58c8 100644 --- a/src/nullrun/instrumentation/langgraph.py +++ b/src/nullrun/instrumentation/langgraph.py @@ -507,6 +507,12 @@ def on_llm_end(self, response: Any, **kwargs: Any) -> None: # Phase 4.1: lift cache / reasoning / finish / tool names out # of raw_usage onto the event itself, mirroring the httpx # transport shape so the dedup key space stays unified. + # 0.9.0: tag metadata.tracked based on whether the model + # extraction produced a real value (not the literal + # "unknown" fallback). The backend's coverage query + # (backend/src/coverage/mod.rs) reads this flag to + # compute tracked_pct — see plan at + # `~/.claude/plans/async-swinging-hanrahan.md`. event = { "type": "llm_call", "model": model, @@ -521,6 +527,13 @@ def on_llm_end(self, response: Any, **kwargs: Any) -> None: "tool_names": usage.get("tool_names") or [], # Flag to backend: this is raw usage, compute cost yourself "has_usage": usage["has_usage"], + "metadata": { + # Tracked iff we extracted a real model name — the + # `usage["has_usage"]` flag is independent (a 4xx + # response with empty body still yields has_usage + # = False but we DID see the response). + "tracked": model != "unknown", + }, # Stripped at the wire boundary by _WIRE_STRIP_FIELDS — # kept here for in-process dedup + test introspection. "raw_usage": usage["raw_usage"], @@ -813,13 +826,68 @@ def _extract_model_from_response(response: Any) -> str | None: # event; this DEBUG line is for the per-call site so the # operator can correlate the wire warning back to a specific # response shape. + # + # Audit 2026-06-29 (silent zero-billing): the previous version + # emitted a single DEBUG line with only the response type. That + # was insufficient when the operator needed to see *which* of + # the four fallback steps almost-but-didn't match. We now dump + # the available keys on every relevant shape so a single + # logcat-level filter surfaces the root cause: + # - `response.llm_output` keys + # - `response.response_metadata` keys + # - `gen_msg.response_metadata` keys (LLMResult callback path) + # - direct attrs `response.model_name` / `response.model` + # All four dumps are guarded so a missing attribute is silent. try: response_type = type(response).__name__ except Exception: response_type = "" + + llm_out_keys: list[str] = [] + if isinstance(llm_out, dict): + try: + llm_out_keys = sorted(str(k) for k in llm_out.keys()) + except Exception: + llm_out_keys = [""] + resp_meta_keys: list[str] = [] + if isinstance(resp_meta, dict): + try: + resp_meta_keys = sorted(str(k) for k in resp_meta.keys()) + except Exception: + resp_meta_keys = [""] + gen_msg_meta_keys: list[str] = [] + direct_attrs: dict[str, str] = {} + if gen_msg is not None: + try: + gm = getattr(gen_msg, "response_metadata", None) + if isinstance(gm, dict): + gen_msg_meta_keys = sorted(str(k) for k in gm.keys()) + except Exception: + pass + for attr in ("model_name", "model"): + try: + v = getattr(gen_msg, attr, None) + if v is not None: + direct_attrs[attr] = repr(v)[:64] + except Exception: + pass + for attr in ("model_name", "model"): + try: + v = getattr(response, attr, None) + if v is not None: + direct_attrs[attr] = repr(v)[:64] + except Exception: + pass + logger.debug( - "_extract_model_from_response returned None for response of type %s", + "_extract_model_from_response returned None for response of type %s — " + "llm_output_keys=%s response_metadata_keys=%s gen_msg_metadata_keys=%s " + "direct_attrs=%s", response_type, + llm_out_keys, + resp_meta_keys, + gen_msg_meta_keys, + direct_attrs, ) return None diff --git a/src/nullrun/runtime.py b/src/nullrun/runtime.py index 63baa65..592741a 100644 --- a/src/nullrun/runtime.py +++ b/src/nullrun/runtime.py @@ -262,18 +262,10 @@ def __init__( # first call. self._local_cost_cents_estimate: int = 0 - # Coverage counters (Phase 3 of the production-readiness plan). - # The instrumentation layer in `nullrun.instrumentation.auto` - # calls ``_safe_bump_coverage(runtime, "_coverage_seen" / - # "_coverage_tracked" / "_coverage_streaming_skipped", host)`` - # so the dashboard can show "which LLM hosts the SDK is - # seeing vs. successfully tracking". Previous versions - # relied on ``_safe_bump_coverage`` to no-op when these - # attributes were missing -- the dashboard's coverage tab - # was always empty. - self._coverage_seen: dict[str, int] = {} - self._coverage_tracked: dict[str, int] = {} - self._coverage_streaming_skipped: dict[str, int] = {} + # 0.9.0: coverage counters removed. Coverage is now derived + # server-side from the llm_call span metadata (`tracked` and + # `streaming_skipped` flags set by the instrumentation layer). + # The previous per-host dicts and 60s daemon thread are gone. # Remote control plane state (per-workflow, pushed from server via WS). # Unified model: effective_state = max(local_state, remote_state). @@ -399,21 +391,7 @@ def __init__( # a concurrent ``add`` if both interleave on a free-threaded # build). The lock is uncontended on the read path so the # cost is one acquire per call. - # - # We also reuse this lock to guard the coverage-counter - # dicts (§7.2 #33) because the bump + prune sequence must - # be atomic — otherwise two threads could both observe the - # dict at length 4095, both bump their counter, and both - # evict a different entry, growing the dict to 4097 - # before either prune lands. One lock, one source of - # truth, cheaper than two fine-grained ones. self._tools_lock = threading.Lock() - # §7.2 #33: cap the per-host coverage counters. Without - # this, a long-running process that sees thousands of - # custom LLM endpoints over its lifetime would grow these - # dicts without bound — same hazard as - # ``NullRunCallback._active_runs`` (now capped at 4096). - self._COVERAGE_CAP: int = 4096 logger.info("NullRun Runtime initialized: mode=cloud") @@ -1529,186 +1507,6 @@ def is_sensitive_tool(self, tool_name: str) -> bool: t.lower() for t in self._strict_mode_tools } - def coverage_report(self) -> dict[str, dict[str, int]]: - """ - Snapshot of the LLM-host coverage counters that the auto- - instrumentation layer maintains. The SDK tracks three - counters per host: - - - ``seen`` -- every LLM host the SDK observed a request to. - - ``tracked`` -- hosts whose response was successfully - extracted and emitted as an ``llm_call`` event. - - ``streaming_skipped`` -- hosts whose response was a - streaming SSE / ``stream=True`` and was deliberately - NOT buffered (so the user keeps their chunked read). - - The same payload is sent over the WebSocket heartbeat every - 60s and via the HTTP-fallback path when the WS connection - is down. The dashboard's coverage tab uses these counters - to surface "we know about this host but cannot track it" -- - the leading indicator that an SDK upgrade is needed. - - Returns: - ``{"seen": {...}, "tracked": {...}, - "streaming_skipped": {...}}``. Each value is a fresh - ``dict`` so callers can mutate the result without - affecting the runtime's internal state. - """ - return { - "seen": dict(self._coverage_seen), - "tracked": dict(self._coverage_tracked), - "streaming_skipped": dict(self._coverage_streaming_skipped), - } - - def track_coverage(self) -> dict[str, Any] | None: - """Emit a `coverage_report` event with the current per-host counters. - - Returned from ``track_event`` so the caller can observe the - transport-side outcome (queued, deduped, breaker open, etc.). - Returns ``None`` when there are no counters to report yet - (cold start, no LLM traffic) — the backend doesn't need an - empty row per minute per process. - - Background emission is driven by ``start_coverage_reporter``; - most callers don't invoke this method directly. - - Wire shape (2026-06-28 fix): - The per-host counter dicts live under ``metadata`` so the - backend's batch handler (backend/src/proxy/handlers.rs:5909 - -5923) reads them from ``sdk_event.metadata``. The handler - was wired to that location when the SDK moved from the - deprecated ``POST /api/v1/coverage`` endpoint onto the - shared ``/api/v1/track/batch`` pipeline — placing the - counters at the event top level silently dropped them (the - ``SdkTrackRequest`` struct uses explicit fields, no - ``#[serde(flatten)]`` catchall, so unknown top-level keys - are discarded by serde). Nesting under ``metadata`` matches - what the handler reads. - """ - stats = self.coverage_report() - seen_total = sum(stats["seen"].values()) - if seen_total == 0: - # Nothing to report — avoid empty rows. - return None - return self.track_event( - "coverage_report", - metadata={ - "seen": stats["seen"], - "tracked": stats["tracked"], - "streaming_skipped": stats["streaming_skipped"], - }, - ) - - _COVERAGE_REPORT_INTERVAL_SECONDS = 60.0 - - def start_coverage_reporter(self) -> None: - """Start a background thread that emits ``coverage_report`` events - every ``_COVERAGE_REPORT_INTERVAL_SECONDS``. - - Idempotent — second call is a no-op. Caller is responsible - for calling :meth:`stop_coverage_reporter` on shutdown, but - the thread is a daemon so a missed stop does not block exit. - """ - if getattr(self, "_coverage_reporter_thread", None) is not None: - return - thread = threading.Thread( - target=self._coverage_reporter_loop, - name="nullrun-coverage-reporter", - daemon=True, - ) - self._coverage_reporter_thread = thread - thread.start() - - def stop_coverage_reporter(self, timeout: float = 2.0) -> None: - """Signal the coverage reporter to exit and join its thread.""" - self._coverage_reporter_stop = True - thread = getattr(self, "_coverage_reporter_thread", None) - if thread is not None: - thread.join(timeout=timeout) - - def _coverage_reporter_loop(self) -> None: - """Loop body for the coverage reporter thread. - - Emits a coverage report on entry (so the dashboard has data - within ~1s of process start), then every interval until - ``stop_coverage_reporter`` is called. - """ - self._coverage_reporter_stop = False - # Emit once on entry — gives the backend a row even if the - # process is short-lived (CI, batch jobs). - try: - self.track_coverage() - except Exception as e: # noqa: BLE001 — background loop - logger.debug(f"coverage_reporter: initial emit failed: {e}") - while not getattr(self, "_coverage_reporter_stop", False): - # Sleep in short slices so shutdown is responsive. - slept = 0.0 - while slept < self._COVERAGE_REPORT_INTERVAL_SECONDS and not getattr( - self, "_coverage_reporter_stop", False - ): - time.sleep(min(0.5, self._COVERAGE_REPORT_INTERVAL_SECONDS - slept)) - slept += 0.5 - if getattr(self, "_coverage_reporter_stop", False): - break - try: - self.track_coverage() - except Exception as e: # noqa: BLE001 — background loop - logger.debug(f"coverage_reporter: emit failed: {e}") - - def bump_coverage_counter(self, target_attr: str, host: str) -> None: - """Bump a per-host coverage counter with FIFO eviction at the cap. - - §7.2 #33: replaces the previous direct-dict-mutation path - used by ``nullrun.instrumentation.auto._safe_bump_coverage``. - The pre-fix code just did ``target[host] = target.get(host, - 0) + 1``, which let a process with many custom LLM - endpoints grow the dict without bound. We now: - - 1. Take ``_tools_lock`` so concurrent bumps from - multiple threads (sync httpx + async httpx + the - requests transport) can't both pass the cap check - and evict different entries. - 2. If the dict already has the key, increment (LRU - bump via dict insertion order). - 3. If the key is new and we're at the cap, evict the - oldest entry before inserting. - - Tolerates a missing attribute (stub runtimes in tests): - no-op when ``getattr(self, target_attr, None)`` returns - ``None``. Tolerates a non-dict target (also a test-only - scenario): logs DEBUG and moves on. - """ - with self._tools_lock: - target = getattr(self, target_attr, None) - if target is None: - return - if not isinstance(target, dict): - logger.debug( - "bump_coverage_counter: %s is not a dict (%s); skipping", - target_attr, - type(target).__name__, - ) - return - if host in target: - # Insertion-order LRU bump: re-insert so this - # host moves to the end of the dict. - target[host] = int(target.get(host, 0)) + 1 - # Re-set to refresh insertion order (Python dicts - # don't auto-promote on value update). - value = target.pop(host) - target[host] = value - else: - if len(target) >= self._COVERAGE_CAP: - evicted_host, _ = next(iter(target.items())) - del target[evicted_host] - logger.warning( - "coverage counter %s hit cap %d; evicting oldest host=%s", - target_attr, - self._COVERAGE_CAP, - evicted_host, - ) - target[host] = 1 - def get_org_status(self, org_id: str | None = None) -> dict[str, Any]: """Public helper for reading ``/api/v1/orgs/{org_id}/status``. diff --git a/tests/contract/test_llm_call_model_wire.py b/tests/contract/test_llm_call_model_wire.py index 0e737ea..252dce9 100644 --- a/tests/contract/test_llm_call_model_wire.py +++ b/tests/contract/test_llm_call_model_wire.py @@ -319,3 +319,163 @@ def test_patch_httpx_eager_wrap_is_idempotent(): finally: auto.reset_for_tests() pre_existing.close() + + +# ─── patch_chat_model_invoke: init-ordering regression ─────────────── +# +# Audit 2026-06-29 (silent zero-billing): the LangGraph case the +# production trace exposed is +# +# llm = ChatOpenAI(model=...) # before init +# nullrun.init(api_key=...) # patch installed too late +# graph.invoke(input) # llm.invoke() inside the node +# +# `patch_httpx` covers the eager-sweep path (pre-existing +# ``httpx.Client`` instances are wrapped). For LangChain chat models, +# the `BaseCallbackManager.__init__` patch is the original defence. +# ``patch_chat_model_invoke`` is the new belt-and-suspenders layer +# that wraps ``BaseChatModel.invoke`` / ``ainvoke`` directly so a +# ``NullRunCallback`` is present in the per-call config even if the +# callback manager constructor is somehow bypassed. +# +# This regression test pins the wrap so a future refactor can't +# silently drop it. + + +def test_patch_chat_model_invoke_injects_callback_when_llm_pre_exists(): + """Create a fake BaseChatModel BEFORE ``nullrun.init``, then call + ``patch_chat_model_invoke``, then invoke. The wrapped invoke must + inject a ``NullRunCallback`` into the per-call config. + """ + from langchain_core.language_models import BaseChatModel + from langchain_core.messages import AIMessage + from langchain_core.outputs import ChatGeneration, ChatResult + + from nullrun.instrumentation import auto + from nullrun.instrumentation.langgraph import NullRunCallback + + auto.reset_for_tests() + auto._chat_model_invoke_patched = False + + class FakeChatModel(BaseChatModel): + """Minimal BaseChatModel that records the callbacks it saw.""" + + seen_callbacks: list = [] + + @property + def _llm_type(self) -> str: + return "fake" + + def _generate(self, messages, stop=None, run_manager=None, **kwargs): + # Record the callbacks the framework attached (or didn't) + # so the test can assert our wrapper injected one. + seen = getattr(run_manager, "handlers", None) or [] + type(self).seen_callbacks.append(list(seen)) + # Return a properly-shaped ChatResult with an AIMessage so + # the downstream on_llm_end extraction has a generations[0] + # to walk. The model_name is the same string we'd see from + # a real langchain-openai 1.x response. + return ChatResult( + generations=[ + ChatGeneration( + message=AIMessage( + content="ok", + response_metadata={"model_name": "fake-model"}, + ) + ) + ], + llm_output={"model_name": "fake-model"}, + ) + + runtime = MagicMock() + try: + # The user's order: create the LLM before init. + llm = FakeChatModel() + type(llm).seen_callbacks = [] + + ok = auto.patch_chat_model_invoke(runtime) + assert ok is True + + # Invoke the LLM through the wrapped method. The wrap must + # inject a NullRunCallback into config["callbacks"] so the + # internal _generate sees it. + llm.invoke("hello") + + # Assert: at least one NullRunCallback was seen during _generate. + saw_nullrun = any( + any(isinstance(h, NullRunCallback) for h in seen) + for seen in type(llm).seen_callbacks + ) + assert saw_nullrun, ( + "patch_chat_model_invoke did not inject NullRunCallback into " + "the per-call config — the audit fix is broken or missing." + ) + finally: + auto.reset_for_tests() + + +def test_patch_chat_model_invoke_preserves_user_callbacks(): + """If the user already supplied a callback in the config, the + wrap must NOT replace it — only add the NullRunCallback if absent. + """ + from langchain_core.language_models import BaseChatModel + from langchain_core.messages import AIMessage + from langchain_core.outputs import ChatGeneration, ChatResult + from langchain_core.callbacks import BaseCallbackHandler + + from nullrun.instrumentation import auto + from nullrun.instrumentation.langgraph import NullRunCallback + + auto.reset_for_tests() + auto._chat_model_invoke_patched = False + + class UserCallback(BaseCallbackHandler): + """Real BaseCallbackHandler so LangChain's manager doesn't + trip on missing attributes (``ignore_chat_model``, + ``raise_error``, etc.) when it tries to fire the callback.""" + seen_callbacks: list = [] + + def on_chat_model_start(self, *args, **kwargs): + type(self).seen_callbacks.append(("start",)) + + def on_llm_end(self, *args, **kwargs): + type(self).seen_callbacks.append(("end",)) + + class FakeChatModel(BaseChatModel): + seen_callbacks: list = [] + + @property + def _llm_type(self) -> str: + return "fake" + + def _generate(self, messages, stop=None, run_manager=None, **kwargs): + seen = getattr(run_manager, "handlers", None) or [] + type(self).seen_callbacks.append(list(seen)) + return ChatResult( + generations=[ + ChatGeneration(message=AIMessage(content="ok")) + ], + llm_output={"model_name": "fake-model"}, + ) + + runtime = MagicMock() + try: + llm = FakeChatModel() + type(llm).seen_callbacks = [] + ok = auto.patch_chat_model_invoke(runtime) + assert ok is True + + # User already has their own callback in config. + user_cb = UserCallback() + llm.invoke("hello", config={"callbacks": [user_cb]}) + + # The user's callback must still be there, alongside ours. + seen_lists = type(llm).seen_callbacks + assert any(user_cb in seen for seen in seen_lists), ( + "user-supplied callback was lost" + ) + assert any( + any(isinstance(h, NullRunCallback) for h in seen) for seen in seen_lists + ), "NullRunCallback was not injected alongside user callback" + finally: + auto.reset_for_tests() diff --git a/tests/test_auto_requests.py b/tests/test_auto_requests.py index 8056255..535f7c0 100644 --- a/tests/test_auto_requests.py +++ b/tests/test_auto_requests.py @@ -194,16 +194,17 @@ def test_session_send_already_tracked_returns_unchanged(monkeypatch, fresh_patch def test_session_send_streaming_skips_track(monkeypatch, fresh_patch_module): - """``stream=True`` kwarg triggers the streaming skip branch.""" + """0.9.0: ``stream=True`` triggers the streaming branch which + emits an llm_call event tagged `metadata.streaming_skipped: True` + and `metadata.tracked: False`. The call still counts toward + coverage `llm_call_count` (backend's denominator) but not toward + `tracked_call_count`. + """ _install_fake_requests(monkeypatch, streaming=True) recorder = {"track": [], "track_event": []} rt = MagicMock() rt.track.side_effect = lambda ev: recorder["track"].append(ev) rt.track_event.side_effect = lambda **kw: recorder["track_event"].append(kw) - # Pretend the runtime has a coverage counters dict so we can - # observe the streaming-skipped bump. - rt._coverage_streaming_skipped = {} - rt._bump_coverage_counter = MagicMock() from requests import Session @@ -212,14 +213,22 @@ def test_session_send_streaming_skips_track(monkeypatch, fresh_patch_module): assert patch_requests(rt) is True req = SimpleNamespace(url="https://api.openai.com/v1/chat/completions", headers={}) Session().send(req, stream=True) - # Track was NOT called (streaming skip). - assert recorder["track"] == [] - # Streaming-skipped counter was bumped. - assert rt._bump_coverage_counter.called + # Track WAS called (with the streaming-skipped flag) — the new + # behavior replaces the old counter-bump. + assert len(recorder["track"]) == 1 + ev = recorder["track"][0] + assert ev["type"] == "llm_call" + assert ev["host"] == "api.openai.com" + assert ev["has_usage"] is False + assert ev["metadata"]["streaming_skipped"] is True + assert ev["metadata"]["tracked"] is False def test_session_send_accept_event_stream_header_skips_track(monkeypatch, fresh_patch_module): - """``Accept: text/event-stream`` header triggers the streaming skip branch.""" + """0.9.0: ``Accept: text/event-stream`` header triggers the same + streaming branch — emit llm_call tagged + `metadata.streaming_skipped: True`. + """ _install_fake_requests(monkeypatch) recorder = {"track": [], "track_event": []} rt = _fake_runtime(recorder) @@ -233,7 +242,8 @@ def test_session_send_accept_event_stream_header_skips_track(monkeypatch, fresh_ url="https://api.openai.com/v1/chat/completions", headers={"Accept": "text/event-stream"} ) Session().send(req) - assert recorder["track"] == [] + assert len(recorder["track"]) == 1 + assert recorder["track"][0]["metadata"]["streaming_skipped"] is True def test_session_send_no_extractor_for_host_returns_response(monkeypatch, fresh_patch_module): @@ -323,29 +333,10 @@ def test_session_send_track_failure_is_swallowed(monkeypatch, fresh_patch_module assert resp.status_code == 200 -def test_session_send_seen_counter_bumped(monkeypatch, fresh_patch_module): - """Every host bumps the ``_coverage_seen`` counter, including - unknown ones (so the dashboard shows visibility into all - outbound traffic, not just tracked vendors). The bump happens - via ``_safe_bump_coverage`` which mutates the dict directly. - """ - _install_fake_requests(monkeypatch) - rt = MagicMock() - rt.track.side_effect = lambda ev: None - rt.track_event.side_effect = lambda **kw: None - rt._coverage_seen = {} - rt._bump_coverage_counter = MagicMock() - - from requests import Session - - from nullrun.instrumentation.auto_requests import patch_requests - - assert patch_requests(rt) is True - req = SimpleNamespace(url="https://example.com/api", headers={}) - Session().send(req) - # Direct dict mutation: host is now present with count 1. - assert rt._coverage_seen.get("example.com") == 1 - +# 0.9.0: removed `test_session_send_seen_counter_bumped`. The +# `_coverage_seen` per-host counter dict is gone — coverage is +# derived from llm_call span metadata.host. See plan at +# `~/.claude/plans/async-swinging-hanrahan.md`. # ─── reset_for_tests ───────────────────────────────────────────────── @@ -424,36 +415,9 @@ def get(self, *_args, **_kwargs): assert _is_streaming_request(req, {}) is False -def test_bump_streaming_skipped_no_attr(): - """Runtime missing the attribute → silent no-op.""" - from nullrun.instrumentation.auto_requests import _bump_streaming_skipped - - # MagicMock auto-creates attributes, so build a plain object. - class _Runtime: - pass - - rt = _Runtime() # no _coverage_streaming_skipped - _bump_streaming_skipped(rt, "x") # must not raise - - -def test_bump_streaming_skipped_no_bump_method(): - """Runtime missing the bump method → silent no-op.""" - from nullrun.instrumentation.auto_requests import _bump_streaming_skipped - - class _Runtime: - _coverage_streaming_skipped = {} - - rt = _Runtime() # no _bump_coverage_counter - _bump_streaming_skipped(rt, "x") # must not raise - - -def test_bump_streaming_skipped_calls_bump(): - """Happy path: bump is invoked with the target dict and host.""" - from nullrun.instrumentation.auto_requests import _bump_streaming_skipped - - target: dict = {} - rt = MagicMock() - rt._coverage_streaming_skipped = target - rt._bump_coverage_counter = MagicMock() - _bump_streaming_skipped(rt, "api.openai.com") - rt._bump_coverage_counter.assert_called_once_with(target, "api.openai.com") +# 0.9.0: removed three `_bump_streaming_skipped` helper tests. +# The helper is gone — streaming-skipped calls now emit an +# llm_call event tagged `metadata.streaming_skipped: True`. See +# `test_session_send_streaming_skips_track` and +# `test_session_send_accept_event_stream_header_skips_track` above +# for the new behavior assertions. diff --git a/tests/test_blocker_fixes.py b/tests/test_blocker_fixes.py index 320f57e..7a59d71 100644 --- a/tests/test_blocker_fixes.py +++ b/tests/test_blocker_fixes.py @@ -50,30 +50,11 @@ def test_auto_requests_module_importable(): import nullrun.instrumentation.auto_requests # noqa: F401 -def test_safe_bump_coverage_exported(): - """`_safe_bump_coverage` is importable and increments the runtime counter.""" - from nullrun.instrumentation.auto import _safe_bump_coverage - from nullrun.runtime import NullRunRuntime - - runtime = NullRunRuntime(api_key="test", _test_mode=True) - assert runtime._coverage_seen == {} - _safe_bump_coverage(runtime, "_coverage_seen", "api.openai.com") - assert runtime._coverage_seen == {"api.openai.com": 1} - _safe_bump_coverage(runtime, "_coverage_seen", "api.openai.com") - assert runtime._coverage_seen == {"api.openai.com": 2} - _safe_bump_coverage(runtime, "_coverage_seen", "api.anthropic.com") - assert runtime._coverage_seen == {"api.openai.com": 2, "api.anthropic.com": 1} - - -def test_safe_bump_coverage_tolerates_missing_attribute(): - """Stub runtimes (MagicMock, custom doubles) without the attribute don't crash.""" - from nullrun.instrumentation.auto import _safe_bump_coverage - - class StubRuntime: - pass - - # Should not raise. - _safe_bump_coverage(StubRuntime(), "_coverage_seen", "api.openai.com") +# 0.9.0: removed `test_safe_bump_coverage_exported` and +# `test_safe_bump_coverage_tolerates_missing_attribute`. The +# `_safe_bump_coverage` helper is gone — coverage is derived from +# llm_call span metadata. See plan at +# `~/.claude/plans/async-swinging-hanrahan.md`. def test_auto_instrument_patches_requests(): diff --git a/tests/test_coverage_report.py b/tests/test_coverage_report.py deleted file mode 100644 index 32ffb15..0000000 --- a/tests/test_coverage_report.py +++ /dev/null @@ -1,145 +0,0 @@ -""" -tests/test_coverage_report.py — coverage_report event emission. - -The SDK already keeps per-host counters via ``bump_coverage_counter`` -(see §7.2 #33). Pre-fix there was no path to ship those counters -to the backend — ``get_coverage_stats()`` existed but no caller. -This test pins the new ``track_coverage`` / ``start_coverage_reporter`` -contract: - -* ``track_coverage()`` returns ``None`` when no LLM traffic has - been observed (cold start). -* After at least one counter bump, ``track_coverage()`` returns a - track-result dict (the underlying ``track_event`` result). -* The emitted event carries ``type=coverage_report`` plus the - three counter dicts and ``tokens=0`` so the backend's - ``SdkTrackRequest`` deserializer accepts it. -* The counter dicts live under ``metadata`` so the backend's batch - handler (backend/src/proxy/handlers.rs:5909-5923) reads them. - Placed at the event top level, serde deserialization would drop - them (``SdkTrackRequest`` has explicit fields, no flatten - catchall), which is exactly the bug this test guards against. -* ``start_coverage_reporter`` is idempotent and stops cleanly. -""" - -from __future__ import annotations - -import asyncio -import threading -import time - -import pytest - -from nullrun.runtime import NullRunRuntime - - -@pytest.fixture -def runtime(): - r = NullRunRuntime(api_key="test-key-12345678", _test_mode=True) - yield r - r.stop_coverage_reporter() - - -class TestTrackCoverage: - def test_track_coverage_returns_none_when_no_traffic(self, runtime): - # No counter bumps yet → no event. - result = runtime.track_coverage() - assert result is None - - def test_track_coverage_returns_event_after_counter_bump(self, runtime): - runtime.bump_coverage_counter("_coverage_seen", "api.openai.com") - runtime.bump_coverage_counter("_coverage_tracked", "api.openai.com") - runtime.bump_coverage_counter("_coverage_seen", "api.anthropic.com") - - result = runtime.track_coverage() - assert result is not None - # The transport queues the event; the runtime returns the - # dedup/queue result from track_event. - assert "deduped" in result or "accepted" in result or "queued" in result or True - - def test_track_coverage_emits_wire_shape_with_metadata_nesting(self, runtime): - """Pin the wire shape so backend (handlers.rs:5909-5923) can - read the counters from ``event.metadata``. - - Pre-fix this placed the three dicts at the top level of the - event, which serde silently dropped (the batch handler's - ``SdkTrackRequest`` uses explicit fields with no - ``#[serde(flatten)]`` catchall). Page rendered - ``last_coverage_pct = null`` permanently because every - report landed with empty ``seen`` / ``tracked`` / - ``streaming_skipped`` JSONB columns. - """ - runtime.bump_coverage_counter("_coverage_seen", "api.openai.com") - runtime.bump_coverage_counter("_coverage_tracked", "api.openai.com") - runtime.bump_coverage_counter( - "_coverage_streaming_skipped", "api.openai.com" - ) - - # Drain anything left in the buffer from prior tests. - buf = runtime._transport._buffer - try: - buf.clear() - except Exception: - pass - - result = runtime.track_coverage() - assert result is not None, "track_coverage must emit once counters exist" - - # Find the coverage_report event in the buffered payload. - events = [e for e in list(buf) if e.get("type") == "coverage_report"] - assert len(events) >= 1, ( - f"expected at least one coverage_report event in buffer, " - f"saw types={[e.get('type') for e in buf]}" - ) - event = events[-1] - - # Must NOT carry the counters at the top level — that was - # the bug shape (silently dropped by serde). - assert "seen" not in event, ( - "top-level 'seen' silently dropped by SdkTrackRequest; " - "must live under metadata" - ) - assert "tracked" not in event - assert "streaming_skipped" not in event - - # MUST carry them under metadata so the batch handler reads - # them correctly. - metadata = event.get("metadata") - assert isinstance(metadata, dict), ( - f"coverage_report event must have metadata dict, got {type(metadata).__name__}: {event!r}" - ) - assert "seen" in metadata - assert "tracked" in metadata - assert "streaming_skipped" in metadata - assert metadata["seen"].get("api.openai.com") == 1 - assert metadata["tracked"].get("api.openai.com") == 1 - assert metadata["streaming_skipped"].get("api.openai.com") == 1 - - # Backend requires `tokens: u64` (non-Optional) on every - # event; track_event defaults it to 0 so the request - # deserializer accepts the coverage_report row. - assert event.get("tokens") == 0 - - def test_coverage_reporter_emits_immediately(self, runtime): - # Even with no traffic, start+stop should be safe. - runtime.start_coverage_reporter() - # Idempotent. - runtime.start_coverage_reporter() - # Stop should not deadlock. - runtime.stop_coverage_reporter(timeout=2.0) - - def test_coverage_reporter_emits_periodically_with_traffic(self, runtime): - # Override interval to a tiny value so the test runs fast. - runtime._COVERAGE_REPORT_INTERVAL_SECONDS = 0.2 - runtime.bump_coverage_counter("_coverage_seen", "api.openai.com") - runtime.bump_coverage_counter("_coverage_tracked", "api.openai.com") - - runtime.start_coverage_reporter() - # Give the thread time for the initial emit + at least one - # interval tick. 0.5s is comfortably > 2× the 0.2s interval. - time.sleep(0.5) - runtime.stop_coverage_reporter(timeout=2.0) - # No assertion on buffer contents — the test exists to - # confirm the reporter thread runs without crashing. A - # stronger test would mock the transport, but the SDK - # already has transport-level coverage in test_transport.py. diff --git a/tests/test_coverage_seen_httpx.py b/tests/test_coverage_seen_httpx.py deleted file mode 100644 index 381b1af..0000000 --- a/tests/test_coverage_seen_httpx.py +++ /dev/null @@ -1,151 +0,0 @@ -""" -Regression test for plan item P2-1: coverage_seen must be incremented -in the httpx path, not only the requests path. - -Pre-fix, ``_safe_bump_coverage(runtime, "_coverage_seen", host)`` was -only called from ``auto_requests.py:185``. The httpx transport's -``_emit`` (which handles ~95% of LLM traffic — OpenAI, Anthropic, -Gemini, Mistral, Cohere all use httpx under the hood) just called -``runtime.track(...)`` without bumping the counter. - -Net effect: the dashboard's ``coverage_seen`` view was empty for the -majority of customers. Operators couldn't tell which LLM hosts an -agent was actually talking to. - -Post-fix both sync and async httpx ``_emit`` bump the counter. -""" - -import asyncio -from unittest.mock import MagicMock - -import httpx -import pytest - -from nullrun.instrumentation.auto import ( - NullRunAsyncTransport, - NullRunSyncTransport, -) - - -def _make_response(body: bytes, host: str = "api.openai.com") -> httpx.Response: - request = httpx.Request("POST", f"https://{host}/v1/chat/completions") - return httpx.Response( - 200, - headers={"content-type": "application/json"}, - content=body, - request=request, - ) - - -# A minimal OpenAI-completions response body with usage. The extractor -# for api.openai.com reads ``usage.{prompt_tokens, completion_tokens, -# total_tokens}``. -USAGE_BODY = ( - b'{"id":"chatcmpl-1","choices":[{"message":{"role":"assistant","content":"hi"}}],' - b'"usage":{"prompt_tokens":5,"completion_tokens":3,"total_tokens":8}}' -) - - -def test_sync_transport_bumps_coverage_seen(): - """A successful OpenAI call via the sync httpx transport must - bump ``_coverage_seen[api.openai.com]`` to 1.""" - runtime = MagicMock() - # Provide a real dict for _coverage_seen so the bump survives - # the test assertion. - runtime._coverage_seen = {} - - inner = MagicMock() - inner.handle_request.return_value = _make_response(USAGE_BODY) - - transport = NullRunSyncTransport(inner=inner, runtime=runtime) - request = httpx.Request("POST", "https://api.openai.com/v1/chat/completions") - transport.handle_request(request) - - assert runtime._coverage_seen.get("api.openai.com") == 1, ( - f"coverage_seen[api.openai.com] should be 1 after one httpx " - f"call; got {runtime._coverage_seen}" - ) - - -def test_sync_transport_bumps_for_anthropic(): - """Same bump applies to other supported hosts — the dashboard - should see Anthropic traffic too, not just OpenAI.""" - runtime = MagicMock() - runtime._coverage_seen = {} - - # Anthropic-style response body: usage.{input_tokens, output_tokens}. - # See _anthropic_extractor in auto.py. - anthropic_body = ( - b'{"id":"msg-1","content":[{"type":"text","text":"hi"}],' - b'"usage":{"input_tokens":10,"output_tokens":4}}' - ) - inner = MagicMock() - inner.handle_request.return_value = _make_response(anthropic_body, host="api.anthropic.com") - - transport = NullRunSyncTransport(inner=inner, runtime=runtime) - request = httpx.Request("POST", "https://api.anthropic.com/v1/messages") - transport.handle_request(request) - - assert runtime._coverage_seen.get("api.anthropic.com") == 1, ( - f"coverage_seen[api.anthropic.com] should be 1; got {runtime._coverage_seen}" - ) - - -def test_async_transport_bumps_coverage_seen(): - """Async mirror: a call via the async httpx transport also - bumps the counter.""" - runtime = MagicMock() - runtime._coverage_seen = {} - - async def fake_handle(_request): - return _make_response(USAGE_BODY) - - inner = MagicMock() - inner.handle_async_request.side_effect = fake_handle - - transport = NullRunAsyncTransport(inner=inner, runtime=runtime) - request = httpx.Request("POST", "https://api.openai.com/v1/chat/completions") - asyncio.run(transport.handle_async_request(request)) - - assert runtime._coverage_seen.get("api.openai.com") == 1, ( - f"async coverage_seen[api.openai.com] should be 1; got {runtime._coverage_seen}" - ) - - -def test_sync_transport_bumps_incrementally_across_requests(): - """Multiple calls to the same host must accumulate, not overwrite - (so the counter is a real frequency, not a 0/1 flag).""" - runtime = MagicMock() - runtime._coverage_seen = {} - - inner = MagicMock() - inner.handle_request.return_value = _make_response(USAGE_BODY) - - transport = NullRunSyncTransport(inner=inner, runtime=runtime) - request = httpx.Request("POST", "https://api.openai.com/v1/chat/completions") - - for _ in range(3): - transport.handle_request(request) - - assert runtime._coverage_seen.get("api.openai.com") == 3, ( - f"3 calls should produce coverage_seen=3; got {runtime._coverage_seen}" - ) - - -def test_sync_transport_no_bump_when_extractor_misses(): - """If the extractor returns None (no usage block in the body), - we don't call _emit, so the counter is NOT bumped. This is the - right behaviour — we only want to count LLM calls we actually - tracked, not every HTTP round-trip to an LLM host.""" - runtime = MagicMock() - runtime._coverage_seen = {} - - body = b'{"id":"chatcmpl-1","choices":[]}' # no usage block - inner = MagicMock() - inner.handle_request.return_value = _make_response(body) - - transport = NullRunSyncTransport(inner=inner, runtime=runtime) - request = httpx.Request("POST", "https://api.openai.com/v1/chat/completions") - transport.handle_request(request) - - assert runtime._coverage_seen == {}, f"no usage → no bump; got {runtime._coverage_seen}" diff --git a/tests/test_llm_call_metadata_flags.py b/tests/test_llm_call_metadata_flags.py new file mode 100644 index 0000000..9e92b9b --- /dev/null +++ b/tests/test_llm_call_metadata_flags.py @@ -0,0 +1,126 @@ +""" +Pin the wire shape of ``llm_call`` event metadata for coverage derivation. + +The backend's coverage query (backend/src/coverage/mod.rs) reads two +boolean flags off `metadata`: + + - `tracked` — True when the SDK's `_match_extractor` identified a + known provider (extractor returned a non-None usage). False for + hosts without an extractor, or where the model was the literal + "unknown" fallback. + + - `streaming_skipped` — True when the response body exceeded + `MAX_RESPONSE_BYTES` and usage was NOT extractable. The event is + still emitted (counts toward `llm_call_count` denominator) so + coverage_pct is honest about streamed calls. + +0.9.0: these flags REPLACE the old per-host `_coverage_seen` / +`_coverage_tracked` / `_coverage_streaming_skipped` counter dicts. +The previous counter-bump path is gone — see plan at +`~/.claude/plans/async-swinging-hanrahan.md`. + +These tests do NOT exercise the actual HTTP path (that's +`test_streaming_oom_cap.py` and `test_auto_requests.py`). They pin +the wire shape at the SDK boundary so a future refactor that drops +the flags will fail CI immediately. +""" + +from unittest.mock import MagicMock + +import httpx + + +# Mirror the response builder from test_streaming_oom_cap.py to keep +# these tests self-contained. +def _make_response(content: bytes, content_length: int | None = None) -> httpx.Response: + request = httpx.Request("POST", "https://api.openai.com/v1/chat/completions") + headers = {"content-type": "application/json"} + if content_length is not None: + headers["content-length"] = str(content_length) + return httpx.Response(200, headers=headers, content=content, request=request) + + +def test_tracked_flag_true_on_normal_call(): + """A normal call (under cap, extractor matched) emits tracked: True + and NO streaming_skipped flag.""" + from nullrun.instrumentation.auto import ( + MAX_RESPONSE_BYTES, + NullRunSyncTransport, + ) + + runtime = MagicMock() + inner = MagicMock() + body = ( + b'{"id":"chatcmpl-1","choices":[{"message":{"role":"assistant","content":"hi"}}],' + b'"usage":{"prompt_tokens":5,"completion_tokens":3,"total_tokens":8}}' + ) + inner.handle_request.return_value = _make_response(body, content_length=len(body)) + transport = NullRunSyncTransport(inner=inner, runtime=runtime) + transport.handle_request( + httpx.Request("POST", "https://api.openai.com/v1/chat/completions") + ) + + event = runtime.track.call_args[0][0] + assert event["metadata"]["tracked"] is True + # `streaming_skipped` may be absent or False; the absence is the + # honest wire shape. + assert event["metadata"].get("streaming_skipped", False) is False + + +def test_streaming_skipped_flag_on_oversized_response(): + """Oversized response → tracked: False, streaming_skipped: True.""" + from nullrun.instrumentation.auto import ( + MAX_RESPONSE_BYTES, + NullRunSyncTransport, + ) + + runtime = MagicMock() + inner = MagicMock() + body = b"x" * (MAX_RESPONSE_BYTES + 1) + inner.handle_request.return_value = _make_response(body, content_length=len(body)) + transport = NullRunSyncTransport(inner=inner, runtime=runtime) + transport.handle_request( + httpx.Request("POST", "https://api.openai.com/v1/chat/completions") + ) + + event = runtime.track.call_args[0][0] + assert event["metadata"]["tracked"] is False + assert event["metadata"]["streaming_skipped"] is True + + +def test_track_does_not_strip_metadata_flags(): + """`metadata` is NOT in `_WIRE_STRIP_FIELDS` (runtime.py:106-108). + Verify the flags survive the wire boundary by mocking the + transport-level `track` to apply the same stripping rule the + real runtime uses.""" + from nullrun.instrumentation.auto import ( + MAX_RESPONSE_BYTES, + NullRunSyncTransport, + ) + from nullrun.runtime import NullRunRuntime + + runtime = NullRunRuntime(api_key="test", _test_mode=True) + # Capture wire-event shape post-strip: + captured = {} + + def _capture(enriched): + # Mirror runtime.py:1427-1431 strip rule. + _WIRE_STRIP_FIELDS = frozenset({"cost_cents", "_fingerprint", "raw_usage"}) + wire = {k: v for k, v in enriched.items() if k not in _WIRE_STRIP_FIELDS and v is not None} + captured["event"] = wire + return wire + + runtime.track = _capture + inner = MagicMock() + body = b"x" * (MAX_RESPONSE_BYTES + 1) + inner.handle_request.return_value = _make_response(body, content_length=len(body)) + transport = NullRunSyncTransport(inner=inner, runtime=runtime) + transport.handle_request( + httpx.Request("POST", "https://api.openai.com/v1/chat/completions") + ) + + wire = captured["event"] + # `metadata` field is preserved on the wire — backend reads it. + assert "metadata" in wire + assert wire["metadata"]["tracked"] is False + assert wire["metadata"]["streaming_skipped"] is True \ No newline at end of file diff --git a/tests/test_runtime_branches.py b/tests/test_runtime_branches.py index 6e150c9..e659b68 100644 --- a/tests/test_runtime_branches.py +++ b/tests/test_runtime_branches.py @@ -220,62 +220,14 @@ def test_register_sensitive_tools_bulk(): assert "stripe.charge" in tools -# ─── coverage_report / bump_coverage_counter ───────────────────────── - - -def test_coverage_report_returns_independent_copies(): - rt = _make_test_runtime() - rt._coverage_seen["a"] = 1 - snap = rt.coverage_report() - snap["seen"]["b"] = 99 # mutate the snapshot - # Internal state should not observe the mutation. - assert "b" not in rt._coverage_seen - - -def test_bump_coverage_counter_missing_attr_no_op(): - """Stub runtime (duck type) without the attribute → no-op.""" - import threading - - class _Stub: - _tools_lock = threading.Lock() - - stub = _Stub() - NullRunRuntime.bump_coverage_counter(stub, "_coverage_seen", "x") # no raise - - -def test_bump_coverage_counter_non_dict_logs_and_skips(): - """If the target is not a dict, log DEBUG and skip without raising.""" - rt = _make_test_runtime() - rt.__dict__["_coverage_seen"] = "string-not-dict" # bypass dict guard - NullRunRuntime.bump_coverage_counter(rt, "_coverage_seen", "x") # no raise - - -def test_bump_coverage_counter_existing_key_increments(): - rt = _make_test_runtime() - rt._coverage_seen["a"] = 5 - rt.bump_coverage_counter("_coverage_seen", "a") - assert rt._coverage_seen["a"] == 6 - - -def test_bump_coverage_counter_new_key_inserts(): - rt = _make_test_runtime() - rt.bump_coverage_counter("_coverage_seen", "new") - assert rt._coverage_seen["new"] == 1 - - -def test_bump_coverage_counter_evicts_at_cap(caplog): - """When the cap is hit, the OLDEST host is evicted (FIFO).""" - import logging - - rt = _make_test_runtime() - rt._COVERAGE_CAP = 3 - rt._coverage_seen["a"] = 1 - rt._coverage_seen["b"] = 1 - rt._coverage_seen["c"] = 1 - with caplog.at_level(logging.WARNING, logger="nullrun.runtime"): - rt.bump_coverage_counter("_coverage_seen", "d") - assert "a" not in rt._coverage_seen # evicted - assert "d" in rt._coverage_seen +# 0.9.0: removed six `coverage_report` / `bump_coverage_counter` +# tests at lines 223-278. The `_coverage_seen` / +# `_coverage_tracked` / `_coverage_streaming_skipped` dicts, +# `coverage_report()`, `track_coverage()`, +# `start_coverage_reporter()`, `_coverage_reporter_loop()`, and +# `bump_coverage_counter()` method are all gone — coverage is now +# derived server-side from llm_call span metadata. See plan at +# `~/.claude/plans/async-swinging-hanrahan.md`. # ─── execute() mode resolution ────────────────────────────────────── diff --git a/tests/test_streaming_oom_cap.py b/tests/test_streaming_oom_cap.py index e9a2ad2..17b76f9 100644 --- a/tests/test_streaming_oom_cap.py +++ b/tests/test_streaming_oom_cap.py @@ -10,18 +10,19 @@ in long-running services. Post-fix we use a bounded chunked read (``_read_body_with_cap`` / -``_aread_body_with_cap``). When the body exceeds the cap we skip -tracking and increment ``_coverage_streaming_skipped`` so the -dashboard can see which hosts are producing oversized responses. +``_aread_body_with_cap``). When the body exceeds the cap we now +(0.9.0) emit an ``llm_call`` event tagged +``metadata.streaming_skipped: True`` and ``metadata.tracked: False`` +so the backend's coverage query still counts the call toward +``llm_call_count`` but not toward ``tracked_call_count``. The +previous counter-bump on ``_coverage_streaming_skipped`` is gone. """ import asyncio from unittest.mock import MagicMock import httpx -import pytest -from nullrun.instrumentation import auto as auto_mod from nullrun.instrumentation.auto import ( MAX_RESPONSE_BYTES, NullRunAsyncTransport, @@ -89,10 +90,12 @@ def test_aread_body_with_cap_short_circuits_on_content_length(): # =========================================================================== -def test_sync_transport_skips_tracking_on_oversized_response(monkeypatch): +def test_sync_transport_emits_streaming_skipped_event(monkeypatch): """When the response body exceeds MAX_RESPONSE_BYTES, the sync - transport must NOT call ``runtime.track`` and MUST increment - ``_coverage_streaming_skipped``.""" + transport must emit an llm_call event tagged + `metadata.streaming_skipped: True` and `metadata.tracked: False`. + The body is NOT buffered (so the caller can still consume it) + and usage data is not extracted (no tokens field).""" runtime = MagicMock() inner = MagicMock() body = b"x" * (MAX_RESPONSE_BYTES + 1) @@ -104,19 +107,18 @@ def test_sync_transport_skips_tracking_on_oversized_response(monkeypatch): transport.handle_request(request) - # Body was oversized → no llm_call event was emitted. - runtime.track.assert_not_called() - # Coverage counter incremented (best-effort; the runtime mock - # accepts attribute reads). We verify the helper was called via - # the runtime attribute access path: - # ``_safe_bump_coverage(runtime, "_coverage_streaming_skipped", host)`` - # should have read runtime._coverage_streaming_skipped. - # (We don't assert on the dict contents because the mock - # returns a fresh MagicMock for each attribute access; the - # important contract is that track() was NOT called.) + # Track WAS called with the streaming-skipped event. + runtime.track.assert_called_once() + event = runtime.track.call_args[0][0] + assert event["type"] == "llm_call" + assert event["host"] == "api.openai.com" + assert event["has_usage"] is False + assert event["tokens"] == 0 + assert event["metadata"]["streaming_skipped"] is True + assert event["metadata"]["tracked"] is False -def test_async_transport_skips_tracking_on_oversized_response(): +def test_async_transport_emits_streaming_skipped_event(): """Async mirror of the sync test.""" runtime = MagicMock() inner = MagicMock() @@ -132,12 +134,16 @@ async def fake_handle(_request): asyncio.run(transport.handle_async_request(request)) - runtime.track.assert_not_called() + runtime.track.assert_called_once() + event = runtime.track.call_args[0][0] + assert event["metadata"]["streaming_skipped"] is True + assert event["metadata"]["tracked"] is False def test_sync_transport_does_track_normal_sized_response(): """Sanity: the cap doesn't break the happy path. A normal 200-byte - response with a usage block must still be tracked.""" + response with a usage block must still be tracked and the event + must carry `metadata.tracked: True`.""" runtime = MagicMock() inner = MagicMock() body = ( @@ -156,3 +162,6 @@ def test_sync_transport_does_track_normal_sized_response(): event = runtime.track.call_args[0][0] assert event["type"] == "llm_call" assert event["tokens"] == 8 + assert event["metadata"]["tracked"] is True + # streaming_skipped is not present (or False) on a normal tracked call. + assert not event["metadata"].get("streaming_skipped", False) \ No newline at end of file