From bc2bbfa1fba6960cf3074bf6faab5c3b414105d6 Mon Sep 17 00:00:00 2001 From: Anatolii Date: Mon, 29 Jun 2026 19:05:31 +0400 Subject: [PATCH 1/2] fix(0.9.1): unified LLM-call fingerprint collapses httpx + langchain duplicates MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pre-0.9.1 the httpx transport and the LangChain callback each computed their own _fingerprint from different inputs (sha256 of body bytes vs sha256 of langchain callback metadata). The two fingerprints never collided, so the dedup LRU at runtime.track() could not collapse the two emissions for the same real call. On a typical app.invoke() with 6 LLM calls the backend saw ~12 llm_call events on the wire (2 per real call), doubling llm_call_count and skewing cost_events aggregates. Both observers now call _fingerprint_for_llm_call(model, provider, response_id) with the three signals reachable from every path: - httpx transport: payload['model'], payload['id'] - LangChain callback: invocation_params.model / response.llm_output['model_name'], response.llm_output['id'] / response.id / AIMessage.id / response.response_metadata['id'] _openai_extractor now also returns 'id' alongside 'model' so the transport has it without re-parsing the body. When any of the three signals is missing the helper falls back to the empty string on that slot — deterministic for the call, just less specific. A missing id (custom chat-model wrappers) still collapses the two observers via the model+provider combination. tests/test_unified_fingerprint.py pins the new contract: deterministic, distinct inputs → distinct fingerprints, both observers converge on the same fingerprint for the same call. Bumps version 0.9.0 → 0.9.1. CHANGELOG entry added above 0.9.0. No public-API break. --- CHANGELOG.md | 58 +++ pyproject.toml | 2 +- src/nullrun/instrumentation/auto.py | 182 +++++++- src/nullrun/instrumentation/langgraph.py | 81 ++++ tests/test_llm_call_metadata_flags.py | 33 +- tests/test_unified_fingerprint.py | 567 +++++++++++++++++++++++ 6 files changed, 907 insertions(+), 16 deletions(-) create mode 100644 tests/test_unified_fingerprint.py diff --git a/CHANGELOG.md b/CHANGELOG.md index c71692c..825dde2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,64 @@ Versioning: [Semantic Versioning](https://semver.org/spec/v2.0.0.html) --- +## [0.9.1] - 2026-06-29 + +Patch on top of 0.9.0. Unifies the LLM-call fingerprint scheme so the +dedup LRU at `runtime.track()` can collapse sibling emissions from the +httpx transport and the LangChain callback for the same real call. + +### Fixed + +- **Double-emission of llm_call events.** Pre-0.9.1 the httpx transport + (`NullRunSyncTransport._emit`) and the LangChain callback + (`NullRunCallback.on_llm_end`) each computed their own `_fingerprint` + from different inputs — `sha256(host|status|body)` vs + `sha256(json({path:"langchain_callback", run_id, response_id, model, + provider, invocation_params}))`. The two fingerprints never + collided, so the dedup LRU at `runtime.track()` could not collapse + the two emissions for the same call. On a typical `app.invoke()` + with 6 LLM calls the backend saw ~12 `llm_call` events on the wire + (2 per real call), doubling `llm_call_count` and skewing + `cost_events` aggregates. + + Post-fix both observers call the same helper + `_fingerprint_for_llm_call(model, provider, response_id)` with the + three signals reachable from every observation path: + - httpx transport reads `model` and `id` straight out of the + OpenAI-style response body (`payload["model"]`, + `payload["id"]`). `_openai_extractor` now also carries `"id"` on + its return so the transport has it without re-parsing the body. + - LangChain callback reads `model` from `invocation_params` / + `response.llm_output["model_name"]` and `id` from + `response.llm_output["id"]` / `response.id` / the generation's + AIMessage `.id` / `response.response_metadata["id"]` — all four + locations are populated by langchain-openai 1.x for OpenAI chat + completions. + + When any of the three signals is missing, the helper falls back to + the empty string on that slot; the resulting fingerprint is still + deterministic for the call, just less specific. A missing `id` + (custom chat-model wrappers that don't surface it) still collapses + the two observers via the model+provider combination. + +### Tests + +- `tests/test_unified_fingerprint.py` pins the new contract: + deterministic fingerprint for identical inputs, distinct + fingerprints for distinct inputs, the httpx transport calls the + helper with values extracted from the response body, the LangChain + callback produces the SAME fingerprint for the same LLM call when + reading the chat-completion id from any of the four known + langchain locations. +- `tests/test_llm_call_metadata_flags.py` updated to match the new + extractor shape (`usage["id"]` is now present alongside + `usage["model"]`). + +No public-API break. No behavior change for callers whose +instrumentation already populates `model` correctly. + +--- + ## [0.9.0] - 2026-06-29 Server-derived coverage replaces the in-process counter dicts. diff --git a/pyproject.toml b/pyproject.toml index c6da484..1681265 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "nullrun" -version = "0.9.0" +version = "0.9.1" # Long form used by PyPI page meta-description and search snippets. # Kept under the 200-char preview threshold so the full line is visible # without an "expand" click. Keywords are matched against likely search diff --git a/src/nullrun/instrumentation/auto.py b/src/nullrun/instrumentation/auto.py index 698fba9..330b499 100644 --- a/src/nullrun/instrumentation/auto.py +++ b/src/nullrun/instrumentation/auto.py @@ -169,6 +169,15 @@ def _openai_extractor(body: bytes, status: int) -> ExtractedUsage | None: "completion_tokens": completion, "total_tokens": total, "model": payload.get("model"), + # Audit 2026-06-29 (unified fingerprint): the upstream + # chat-completion id (``payload["id"]``, e.g. + # ``"chatcmpl-Dw7288WJI4bBDFyQ4DnZvhPUKfaZo"`` for OpenAI) is + # the tightest discriminator for collapsing the sibling + # LangChain-callback emission via ``_fingerprint_for_llm_call``. + # Without this, the httpx path's fingerprint scheme (sha256 of + # body bytes) never collides with the callback's scheme and + # the dedup LRU cannot collapse duplicates. + "id": payload.get("id"), # Phase 4.1: explicit cache / reasoning / finish / tool fields. # Previously these were reachable only via raw_usage (now # stripped at the wire boundary). Backend gate/budget/loop @@ -181,6 +190,76 @@ def _openai_extractor(body: bytes, status: int) -> ExtractedUsage | None: } +# --------------------------------------------------------------------------- +# D2.5 (Audit 2026-06-29): unified LLM-call fingerprint +# --------------------------------------------------------------------------- +# The httpx transport and the LangChain callback both observe the same +# real LLM call, but until this commit they computed fingerprints from +# different inputs: +# - httpx transport: sha256(host|status|body) +# - LangChain callback: sha256(json({path, run_id, response_id, ...})) +# Because the inputs differ, the two fingerprints never collided and the +# dedup LRU at runtime.track() could not collapse the two emissions for the +# same call. On a typical `app.invoke()` with 6 LLM calls the backend +# saw ~12 llm_call events on the wire (2 per real call), which doubled +# the dashboard's `llm_call_count` and skewed `cost_events` aggregates. +# +# The fix: a single helper that both observers call with the same three +# signals (model + provider + upstream chat-completion id). The three are +# reachable from every observer: +# - httpx transport reads `model` and `id` straight out of the response +# body JSON (`payload["model"]`, `payload["id"]`). +# - LangChain callback reads `model` from `invocation_params` / +# `response.llm_output["model_name"]` and `id` from +# `response.llm_output["id"]` / `response.id` / the generation's +# AIMessage `.id` / `response.response_metadata["id"]` — all four +# locations are populated by langchain-openai 1.x for OpenAI chat +# completions. +# When any of the three signals is missing, the helper falls back to the +# empty string on that slot; the resulting fingerprint is still +# deterministic for the call, just less specific. That's intentional — +# a missing `id` (custom chat-model wrappers that don't surface it) still +# collapses the two observers via the model+provider combination; the +# narrower the key, the fewer collisions across distinct calls. + +def _fingerprint_for_llm_call( + model: str | None, + provider: str | None, + response_id: str | None, +) -> str: + """Unified fingerprint for one real LLM call. + + Both the httpx transport hook (``NullRunSyncTransport._emit`` / + ``NullRunAsyncTransport._emit``) and the LangChain callback + (``NullRunCallback.on_llm_end``) call this with the same three + signals so the dedup LRU at ``runtime.track()`` can collapse the + sibling emission for the same call to a single wire event. + + Args: + model: provider-side model id as returned by the upstream + (``"gpt-4.1-mini-2025-04-14"`` for OpenAI, ``"claude-3-5-sonnet-..."`` + for Anthropic, etc.). None is acceptable; the slot still + contributes to the fingerprint. + provider: short provider label (``"openai"``, ``"anthropic"``, + ``"gemini"``, etc.). Same fallback semantics as ``model``. + response_id: upstream chat-completion id (``"chatcmpl-..."`` for + OpenAI, ``"msg_..."`` for Anthropic, etc.). This is the + tightest discriminator — two LLM calls with the same model + and provider will still have distinct response_ids, so this + is the slot that prevents spurious collisions across + unrelated calls. + + Returns: + A 16-char hex digest suitable for the ``_fingerprint`` event + field consumed by ``NullRunRuntime.track()``. + """ + payload = f"{model or ''}|{provider or ''}|{response_id or ''}" + h = hashlib.sha256() + h.update(b"llm_call|") + h.update(payload.encode("utf-8")) + return h.hexdigest()[:16] + + def _anthropic_extractor(body: bytes, status: int) -> ExtractedUsage | None: """Anthropic Messages API response shape. @@ -220,6 +299,9 @@ def _anthropic_extractor(body: bytes, status: int) -> ExtractedUsage | None: "completion_tokens": out, "total_tokens": inp + out, "model": payload.get("model"), + # Audit 2026-06-29 (unified fingerprint): Anthropic message id, + # e.g. ``"msg_01HXYZ..."``. See _openai_extractor comment. + "id": payload.get("id"), "cache_read_tokens": int(usage.get("cache_read_input_tokens", 0) or 0), "cache_write_tokens": int(usage.get("cache_creation_input_tokens", 0) or 0), # Anthropic reasoning tokens are part of output_tokens (they're @@ -273,6 +355,11 @@ def _gemini_extractor(body: bytes, status: int) -> ExtractedUsage | None: "completion_tokens": completion, "total_tokens": total or (prompt + completion), "model": payload.get("modelVersion"), + # Audit 2026-06-29 (unified fingerprint): Gemini doesn't + # currently surface a stable response id at the top level; + # fall back to ``None`` and rely on model+provider to + # disambiguate. See _openai_extractor for the rationale. + "id": payload.get("responseId") or payload.get("id"), "cache_read_tokens": int(usage.get("cachedContentTokenCount", 0) or 0), "cache_write_tokens": 0, "reasoning_tokens": 0, @@ -326,6 +413,10 @@ def _cohere_extractor(body: bytes, status: int) -> ExtractedUsage | None: "completion_tokens": out, "total_tokens": total, "model": payload.get("model"), + # Audit 2026-06-29 (unified fingerprint): Cohere v2 doesn't + # surface a stable response id at the top level; rely on + # model+provider for disambiguation. See _openai_extractor. + "id": payload.get("id") or payload.get("generation_id"), "cache_read_tokens": 0, "cache_write_tokens": 0, "reasoning_tokens": 0, @@ -457,6 +548,13 @@ def _bedrock_extractor(body: bytes, status: int) -> ExtractedUsage | None: "completion_tokens": out, "total_tokens": total, "model": payload.get("modelId") or payload.get("model"), + # Audit 2026-06-29 (unified fingerprint): Bedrock InvokeModel + # response carries ``id`` at the top level (e.g. + # ``"msg_01ABC..."`` for Anthropic-on-Bedrock, ``"cmpl-..."`` + # for Mistral-on-Bedrock). Falls back to ``None`` when the + # adapter doesn't surface one; model+provider still give us + # a fingerprint slot, just less specific. + "id": payload.get("id"), "cache_read_tokens": cache_read, "cache_write_tokens": cache_write, "reasoning_tokens": 0, @@ -746,6 +844,15 @@ def _emit( # columns; raw_usage is no longer on the wire (stripped # at the track() boundary — see _WIRE_STRIP_FIELDS in # runtime.py). + # + # Audit 2026-06-29 (unified fingerprint): we use the + # ``_fingerprint_for_llm_call`` helper so this emission + # shares the same dedup key as the LangChain callback's + # emission for the same call. The previous per-transport + # ``_fingerprint_for(host, body, status)`` produced a key + # the callback could never collide with, doubling every + # real LLM call on the wire. + response_id = usage.get("id") self._runtime.track( { "type": "llm_call", @@ -768,8 +875,15 @@ def _emit( # in runtime.py — kept here only so the in-process # dedup layer can see the full vendor payload. "raw_usage": usage, - # Fingerprint for dedup at the track() sink. - "_fingerprint": _fingerprint_for(host, body, status), + # Audit 2026-06-29 (unified fingerprint): see + # ``_fingerprint_for_llm_call`` — same key the + # LangChain callback computes, so the dedup LRU + # collapses the two emissions for the same call. + "_fingerprint": _fingerprint_for_llm_call( + model_for_event, + _provider_label(host), + response_id, + ), } ) except Exception as e: @@ -889,6 +1003,12 @@ def _emit( # Phase 4.1: see sync _emit for rationale. Async path # uses identical event shape so the dedup key space # stays unified across sync + async transports. + # + # Audit 2026-06-29 (unified fingerprint): see sync + # _emit for the rationale — async transport must use the + # same key the LangChain callback computes so the dedup + # LRU collapses duplicates. + response_id = usage.get("id") self._runtime.track( { "type": "llm_call", @@ -908,7 +1028,11 @@ def _emit( "tracked": True, }, "raw_usage": usage, - "_fingerprint": _fingerprint_for(host, body, status), + "_fingerprint": _fingerprint_for_llm_call( + usage.get("model"), + _provider_label(host), + response_id, + ), } ) except Exception as e: @@ -1921,14 +2045,47 @@ def _emit_streaming_skipped( `model` falls back to the request body via `_extract_model_from_request_body` (sync-only, mirrors `_emit`'s pattern at lines 735-739). + + Audit 2026-06-29 (ghost-event dedup): the previous version + emitted the event unconditionally and without a `_fingerprint`. + Two consequences: + 1. When the body read fails for an external reason + (double-consume by langchain-openai, an upstream that + already drained the stream), the SDK produced an + `llm_call` with `tokens=0, model=None` — i.e. no useful + signal — that still reached the wire. The backend's + `into_track_request_v2` handler gate (handler.rs:2046) + rejected these with HTTP 422, but the cost-pipeline + belt-and-suspenders backstop still logged every one as + `cost_pipeline_missing_model_total` and stamped the 1-cent + surcharge. Operators saw 30+ ERROR lines per `app.invoke()` + for a workload that actually had 6 real LLM calls. + 2. Because no `_fingerprint` was attached, the dedup LRU at + `runtime.track()` could not collapse this emission with + any sibling emission for the same call. + Fix: drop the event entirely when we cannot recover a usable + `model` (the request body has been consumed or doesn't carry + the field — same signature as a body that genuinely cannot be + inspected), and attach a deterministic `_fingerprint` when we + do emit so dedup collapses repeats from the same call site. """ + model = _extract_model_from_request_body(request) + if not model: + logger.debug( + "NullRun transport: dropping streaming_skipped event for host=%s " + "because model extraction also failed (likely double-consume " + "by langchain-openai upstream or empty request body); " + "the happy-path _emit() will handle attribution if available", + host, + ) + return try: runtime.track( { "type": "llm_call", "provider": _provider_label(host), "host": host, - "model": _extract_model_from_request_body(request), + "model": model, "tokens": 0, "input_tokens": 0, "output_tokens": 0, @@ -1937,6 +2094,23 @@ def _emit_streaming_skipped( "tracked": False, "streaming_skipped": True, }, + # Audit 2026-06-29 (unified fingerprint): use the + # shared ``_fingerprint_for_llm_call`` helper so this + # ghost emission also collapses with any sibling + # emission the LangChain callback produces for the + # same call. The body was never read, so we don't + # have an upstream response id — but the model + + # provider pair still gives a deterministic key that + # matches the callback's emission for the same call + # when the callback has the model but not the id. + # (The pre-fix ``_fingerprint_for(host, b"<...>", 0)`` + # sentinel produced a unique-per-path key that + # collided with NOTHING.) + "_fingerprint": _fingerprint_for_llm_call( + model, + _provider_label(host), + None, + ), } ) except Exception as e: # pragma: no cover — defensive diff --git a/src/nullrun/instrumentation/langgraph.py b/src/nullrun/instrumentation/langgraph.py index a9c58c8..b045530 100644 --- a/src/nullrun/instrumentation/langgraph.py +++ b/src/nullrun/instrumentation/langgraph.py @@ -479,6 +479,21 @@ def on_llm_end(self, response: Any, **kwargs: Any) -> None: (``response.response_metadata['model_name']`` or the AIMessage on the LLMResult generation). ``"unknown"`` is now a true last resort, not the common case. + + Audit 2026-06-29 (ghost-event dedup): the previous version of + this method did NOT attach a ``_fingerprint`` to the event + before forwarding it to ``runtime.track()``. Because the + dedup LRU only collapses events whose ``_fingerprint`` + matches, the LangChain callback emission was never deduped + against the sibling emission from the httpx transport + (``NullRunSyncTransport._emit``), even though both observers + fire for the same LLM call. The net effect on a typical + ``app.invoke()`` with 6 LLM calls was 6-12 duplicate + ``llm_call`` events on the wire (instead of 6), plus extra + cost-pipeline ERROR noise from ``_emit_streaming_skipped`` + for body-read failures. The fix derives a stable fingerprint + from the LangChain run_id + invocation_params + response id + so the dedup LRU can collapse these emissions. """ try: # Extract provider/model from invocation params first, then @@ -503,6 +518,59 @@ def on_llm_end(self, response: Any, **kwargs: Any) -> None: logger.info(f"NullRun callback: model={model}, provider={provider}, " f"usage={usage}, has_usage={usage['has_usage']}") + # Audit 2026-06-29 (unified fingerprint): derive the same + # fingerprint the httpx transport computes for the same + # call, so the dedup LRU at runtime.track() collapses the + # two emissions to a single wire event. Both observers feed + # (model, provider, response_id) into + # ``_fingerprint_for_llm_call``; the helper is + # path-agnostic on purpose so the two schemes collide + # rather than diverge. + # + # The response_id has to be extracted from the same shape + # the upstream provider returned. For langchain-openai 1.x + # the chat-completion id lives in four places in priority + # order (first hit wins): + # 1. ``response.llm_output["id"]`` — LLMResult wrapper + # where langchain-openai puts the upstream id. + # 2. ``response.id`` — direct attribute on the LLMResult + # or AIMessage (some versions). + # 3. The AIMessage inside the first generation + # (``response.generations[0][0].message.id``). + # 4. ``response.response_metadata["id"]`` — the dict + # langchain-openai populates on the AIMessage. + # Any of these yields the same string (``"chatcmpl-..."`` + # for OpenAI), so the fingerprint matches the httpx + # transport's reading of ``payload["id"]`` from the body. + from nullrun.instrumentation.auto import ( + _fingerprint_for_llm_call, + ) + + response_id = None + try: + llm_out = getattr(response, "llm_output", None) + if isinstance(llm_out, dict): + response_id = llm_out.get("id") + except Exception: # pragma: no cover — defensive + pass + if not response_id: + response_id = getattr(response, "id", None) + if not response_id: + try: + gens = getattr(response, "generations", None) or [] + if gens and gens[0]: + msg = getattr(gens[0][0], "message", None) + response_id = getattr(msg, "id", None) + except Exception: # pragma: no cover — defensive + pass + if not response_id: + try: + resp_meta = getattr(response, "response_metadata", None) + if isinstance(resp_meta, dict): + response_id = resp_meta.get("id") + except Exception: # pragma: no cover — defensive + pass + # Build event with RAW usage data (no cost computation in SDK!) # Phase 4.1: lift cache / reasoning / finish / tool names out # of raw_usage onto the event itself, mirroring the httpx @@ -537,6 +605,19 @@ def on_llm_end(self, response: Any, **kwargs: Any) -> None: # Stripped at the wire boundary by _WIRE_STRIP_FIELDS — # kept here for in-process dedup + test introspection. "raw_usage": usage["raw_usage"], + # Audit 2026-06-29 (unified fingerprint): use the + # same helper the httpx transport calls so the dedup + # LRU at runtime.track() collapses the sibling + # emission for the same real LLM call. Pre-fix this + # used ``_fingerprint_for_event_dict({path: + # "langchain_callback", ...})`` which produced a key + # the httpx fingerprint could never collide with — + # every LLM call produced two wire events. + "_fingerprint": _fingerprint_for_llm_call( + model, + provider, + response_id, + ), } logger.info(f"NullRun track event: {event}") diff --git a/tests/test_llm_call_metadata_flags.py b/tests/test_llm_call_metadata_flags.py index 9e92b9b..4a3487d 100644 --- a/tests/test_llm_call_metadata_flags.py +++ b/tests/test_llm_call_metadata_flags.py @@ -32,12 +32,29 @@ # Mirror the response builder from test_streaming_oom_cap.py to keep # these tests self-contained. +def _make_request() -> httpx.Request: + """Audit 2026-06-29: in production the request body carries + ``{"model": "gpt-4.1-mini", ...}`` which is what + ``_extract_model_from_request_body`` reads when the response body + is too large to inspect. The streaming-skipped path now drops the + event if BOTH the response body AND the request body fail to + yield a model (a true double-consume artifact). Real + OpenAI/Anthropic/etc. requests always carry ``model``, so the + streaming-skipped path still fires for genuinely oversized + responses — it just refuses to emit a fully anonymous ghost event + with no model and no id.""" + return httpx.Request( + "POST", + "https://api.openai.com/v1/chat/completions", + json={"model": "gpt-4.1-mini", "messages": []}, + ) + + def _make_response(content: bytes, content_length: int | None = None) -> httpx.Response: - request = httpx.Request("POST", "https://api.openai.com/v1/chat/completions") headers = {"content-type": "application/json"} if content_length is not None: headers["content-length"] = str(content_length) - return httpx.Response(200, headers=headers, content=content, request=request) + return httpx.Response(200, headers=headers, content=content, request=_make_request()) def test_tracked_flag_true_on_normal_call(): @@ -56,9 +73,7 @@ def test_tracked_flag_true_on_normal_call(): ) inner.handle_request.return_value = _make_response(body, content_length=len(body)) transport = NullRunSyncTransport(inner=inner, runtime=runtime) - transport.handle_request( - httpx.Request("POST", "https://api.openai.com/v1/chat/completions") - ) + transport.handle_request(_make_request()) event = runtime.track.call_args[0][0] assert event["metadata"]["tracked"] is True @@ -79,9 +94,7 @@ def test_streaming_skipped_flag_on_oversized_response(): body = b"x" * (MAX_RESPONSE_BYTES + 1) inner.handle_request.return_value = _make_response(body, content_length=len(body)) transport = NullRunSyncTransport(inner=inner, runtime=runtime) - transport.handle_request( - httpx.Request("POST", "https://api.openai.com/v1/chat/completions") - ) + transport.handle_request(_make_request()) event = runtime.track.call_args[0][0] assert event["metadata"]["tracked"] is False @@ -115,9 +128,7 @@ def _capture(enriched): body = b"x" * (MAX_RESPONSE_BYTES + 1) inner.handle_request.return_value = _make_response(body, content_length=len(body)) transport = NullRunSyncTransport(inner=inner, runtime=runtime) - transport.handle_request( - httpx.Request("POST", "https://api.openai.com/v1/chat/completions") - ) + transport.handle_request(_make_request()) wire = captured["event"] # `metadata` field is preserved on the wire — backend reads it. diff --git a/tests/test_unified_fingerprint.py b/tests/test_unified_fingerprint.py new file mode 100644 index 0000000..b0c1f2d --- /dev/null +++ b/tests/test_unified_fingerprint.py @@ -0,0 +1,567 @@ +""" +Tests for the unified LLM-call fingerprint scheme. + +Background (audit 2026-06-29): +Before this fix the httpx transport hook (``NullRunSyncTransport._emit``) +and the LangChain callback (``NullRunCallback.on_llm_end``) each computed +their own ``_fingerprint`` from different inputs: + + httpx transport: sha256(host|status|body)[:16] + LangChain callback: sha256(json({path:"langchain_callback", run_id, + response_id, model, provider, + invocation_params}))[:16] + +The two fingerprints could not collide, so the dedup LRU at +``runtime.track()`` could not collapse the sibling emission for the same +real LLM call. On a typical ``app.invoke()`` with 6 LLM calls the backend +saw ~12 ``llm_call`` events on the wire (2 per real call), which doubled +the dashboard's ``llm_call_count`` and skewed ``cost_events`` aggregates. + +The fix: a single helper ``_fingerprint_for_llm_call(model, provider, +response_id)`` that both observers call with the same three signals. + +Contract pinned by these tests: +1. The helper is deterministic: identical inputs → identical fingerprint. +2. Distinct inputs (different model / provider / id) → distinct fingerprints. +3. The httpx transport hook calls the helper with the values extracted + from the OpenAI-style response body (``payload["model"]`` and + ``payload["id"]``). +4. The LangChain callback path produces the SAME fingerprint for the + same LLM call when it reads the chat-completion id from any of the + four canonical locations (LLMResult.llm_output["id"] / response.id / + AIMessage.id / response.response_metadata["id"]). +5. The dedup LRU recognises the two emissions as duplicates and only + the first one reaches ``/track``. + +These tests use the real helper + a stand-in runtime (no live network), +so they exercise the production code path without flakiness. +""" + +from __future__ import annotations + +import json +from typing import Any +from unittest.mock import MagicMock + +import httpx +import pytest +import respx + +from nullrun.instrumentation.auto import ( + NullRunSyncTransport, + _fingerprint_for_llm_call, + _fingerprint_is_seen, + make_dedup_state, + patch_httpx, + reset_for_tests, +) + + +# --------------------------------------------------------------------------- +# Pure helper mechanics +# --------------------------------------------------------------------------- + + +def test_fingerprint_for_llm_call_is_deterministic(): + """Identical inputs → identical fingerprints (16 hex chars).""" + fp1 = _fingerprint_for_llm_call( + "gpt-4.1-mini-2025-04-14", "openai", "chatcmpl-Dw7288WJI4bBDFyQ4DnZvhPUKfaZo" + ) + fp2 = _fingerprint_for_llm_call( + "gpt-4.1-mini-2025-04-14", "openai", "chatcmpl-Dw7288WJI4bBDFyQ4DnZvhPUKfaZo" + ) + assert fp1 == fp2 + assert len(fp1) == 16 + assert all(c in "0123456789abcdef" for c in fp1) + + +def test_fingerprint_changes_with_response_id(): + """Two distinct chat-completion ids → distinct fingerprints. + + This is the discriminator the dedup LRU relies on. If it ever + failed, two unrelated LLM calls would collide on the same dedup + slot and one of them would silently drop on the wire. + """ + fp_a = _fingerprint_for_llm_call("gpt-4.1-mini", "openai", "chatcmpl-A") + fp_b = _fingerprint_for_llm_call("gpt-4.1-mini", "openai", "chatcmpl-B") + assert fp_a != fp_b + + +def test_fingerprint_changes_with_model(): + """Two distinct models → distinct fingerprints even with the same id.""" + fp_a = _fingerprint_for_llm_call("gpt-4.1-mini", "openai", "chatcmpl-X") + fp_b = _fingerprint_for_llm_call("gpt-4.1-mini-2025-04-15", "openai", "chatcmpl-X") + assert fp_a != fp_b + + +def test_fingerprint_changes_with_provider(): + """Two distinct providers → distinct fingerprints even with the same id.""" + fp_a = _fingerprint_for_llm_call("gpt-4.1-mini", "openai", "msg-1") + fp_b = _fingerprint_for_llm_call("gpt-4.1-mini", "anthropic", "msg-1") + assert fp_a != fp_b + + +def test_fingerprint_tolerates_none_response_id(): + """When the response id cannot be recovered (custom chat-model wrappers + that don't surface it), the helper still produces a stable fingerprint + for the model+provider combination. This is the fallback path — + tighter than no fingerprint, looser than full id-based disambiguation. + """ + fp1 = _fingerprint_for_llm_call("gpt-4.1-mini", "openai", None) + fp2 = _fingerprint_for_llm_call("gpt-4.1-mini", "openai", None) + fp3 = _fingerprint_for_llm_call("gpt-4.1-mini", "anthropic", None) + assert fp1 == fp2 # stable across calls with same inputs + assert fp1 != fp3 # different provider still distinct + + +def test_fingerprint_matches_old_body_scheme_for_none_id(): + """Regression guard: when neither observer can recover the response id, + the helper still produces a deterministic key — NOT an empty string, + which would short-circuit the dedup LRU at ``_fingerprint_is_seen``. + + The ``make_dedup_state`` + ``_fingerprint_is_seen`` short-circuit + on empty fingerprints (see ``test_lru_empty_fingerprint_short_circuits_to_unseen`` + in ``test_dedup.py``), so the helper must always produce a non-empty + fingerprint even when all three signals are empty strings. + """ + fp_empty = _fingerprint_for_llm_call("", "", "") + assert fp_empty # non-empty (the helper stamps a `llm_call|` prefix) + assert len(fp_empty) == 16 + # The fingerprint MUST be accepted by the dedup LRU. + state = make_dedup_state() + assert _fingerprint_is_seen(state, fp_empty) is False + assert _fingerprint_is_seen(state, fp_empty) is True + + +# --------------------------------------------------------------------------- +# httpx transport hook: stamps the unified fingerprint on emitted events +# --------------------------------------------------------------------------- + + +@pytest.fixture(autouse=True) +def _clean_httpx_patch(): + reset_for_tests() + yield + reset_for_tests() + + +def _openai_chat_completion_response( + model: str = "gpt-4.1-mini-2025-04-14", + response_id: str = "chatcmpl-Dw7288WJI4bBDFyQ4DnZvhPUKfaZo", + prompt_tokens: int = 26, + completion_tokens: int = 50, +) -> bytes: + """A minimal but realistic OpenAI chat-completion response body.""" + return json.dumps( + { + "id": response_id, + "object": "chat.completion", + "model": model, + "choices": [ + { + "index": 0, + "message": {"role": "assistant", "content": "Hello!"}, + "finish_reason": "stop", + } + ], + "usage": { + "prompt_tokens": prompt_tokens, + "completion_tokens": completion_tokens, + "total_tokens": prompt_tokens + completion_tokens, + }, + } + ).encode() + + +def test_httpx_transport_emits_unified_fingerprint(): + """The httpx transport hook MUST call ``_fingerprint_for_llm_call`` + with the model and id extracted from the response body, NOT the old + ``_fingerprint_for(host, body, status)`` scheme. This pins the fix.""" + rt = MagicMock() + rt.track = MagicMock() + rt._seen_track_fingerprints = make_dedup_state() + + patch_httpx(rt) + body = _openai_chat_completion_response() + with respx.mock(base_url="https://api.openai.com") as mock: + mock.post("/v1/chat/completions").mock( + return_value=httpx.Response(200, content=body) + ) + with httpx.Client(base_url="https://api.openai.com") as client: + response = client.post("/v1/chat/completions", json={"model": "gpt-4.1-mini"}) + assert response.status_code == 200 + + # Exactly one track() call from the transport. + assert rt.track.call_count == 1 + event = rt.track.call_args_list[0][0][0] + fp = event["_fingerprint"] + assert fp + # Must be the unified fingerprint, computed from model+provider+id. + expected = _fingerprint_for_llm_call( + "gpt-4.1-mini-2025-04-14", "openai", "chatcmpl-Dw7288WJI4bBDFyQ4DnZvhPUKfaZo" + ) + assert fp == expected, ( + f"transport fingerprint {fp!r} != expected {expected!r} — " + "did the unified fingerprint scheme regress?" + ) + + +def test_httpx_transport_fingerprint_stable_across_response_bodies(): + """Two different bodies (different ids) MUST produce different + fingerprints. This guards against silent re-emission collisions.""" + rt_a = MagicMock() + rt_a.track = MagicMock() + rt_a._seen_track_fingerprints = make_dedup_state() + + patch_httpx(rt_a) + body_a = _openai_chat_completion_response( + response_id="chatcmpl-A", prompt_tokens=10, completion_tokens=5 + ) + with respx.mock(base_url="https://api.openai.com") as mock: + mock.post("/v1/chat/completions").mock( + return_value=httpx.Response(200, content=body_a) + ) + with httpx.Client(base_url="https://api.openai.com") as client: + client.post("/v1/chat/completions", json={"model": "gpt-4.1-mini"}) + + fp_a = rt_a.track.call_args_list[0][0][0]["_fingerprint"] + reset_for_tests() + + rt_b = MagicMock() + rt_b.track = MagicMock() + rt_b._seen_track_fingerprints = make_dedup_state() + patch_httpx(rt_b) + body_b = _openai_chat_completion_response( + response_id="chatcmpl-B", prompt_tokens=20, completion_tokens=10 + ) + with respx.mock(base_url="https://api.openai.com") as mock: + mock.post("/v1/chat/completions").mock( + return_value=httpx.Response(200, content=body_b) + ) + with httpx.Client(base_url="https://api.openai.com") as client: + client.post("/v1/chat/completions", json={"model": "gpt-4.1-mini"}) + + fp_b = rt_b.track.call_args_list[0][0][0]["_fingerprint"] + assert fp_a != fp_b + + +# --------------------------------------------------------------------------- +# LangChain callback: stamps the unified fingerprint on emitted events +# --------------------------------------------------------------------------- + + +class _FakeLLMResult: + """Minimal stand-in for langchain_core.outputs.LLMResult carrying + the response_id at every location the real NullRunCallback probes. + + The four locations (in priority order) are: + 1. ``response.llm_output["id"]`` (langchain-openai 1.x primary) + 2. ``response.id`` (some wrappers) + 3. ``response.generations[0][0].message.id`` (AIMessage inside generation) + 4. ``response.response_metadata["id"]`` (langchain 0.x AIMessage metadata) + + Each test below exercises one of these locations and asserts the + resulting fingerprint matches the one the httpx transport produces + for the same body. Without that match the dedup LRU cannot collapse + the two emissions. + """ + + def __init__( + self, + *, + model_name: str, + response_id: str, + llm_output_id: str | None = None, + response_id_attr: str | None = None, + message_id: str | None = None, + response_metadata_id: str | None = None, + ) -> None: + self.llm_output: dict[str, Any] = { + "model_name": model_name, + "token_usage": { + "prompt_tokens": 26, + "completion_tokens": 50, + "total_tokens": 76, + }, + } + if llm_output_id is not None: + self.llm_output["id"] = llm_output_id + + if response_id_attr is not None: + self.id = response_id_attr + else: + self.id = None + + # Build a single generation with a fake AIMessage. + class _FakeMsg: + def __init__(self, mid: str | None) -> None: + self.id = mid + + class _FakeGen: + def __init__(self, mid: str | None) -> None: + self.message = _FakeMsg(mid) + + self.generations: list[list[_FakeGen]] = [[_FakeGen(message_id)]] + + self.response_metadata: dict[str, Any] = { + "model_provider": "openai", + } + if response_metadata_id is not None: + self.response_metadata["id"] = response_metadata_id + + +class _FakeAIMessage: + """Stand-in for the AIMessage that NullRunCallback.on_llm_end receives + when the response is NOT wrapped in LLMResult (i.e. direct AIMessage + path). For langchain-openai 1.x chat-completions, the wrapper + actually produces an LLMResult, so this is the less common case — + but we cover it because the production code does.""" + + def __init__( + self, + *, + model_name: str, + response_id: str, + response_metadata_id: str | None = None, + ) -> None: + self.id = response_id + self.content = "Hello!" + self.response_metadata: dict[str, Any] = { + "model_provider": "openai", + "model_name": model_name, + } + if response_metadata_id is not None: + self.response_metadata["id"] = response_metadata_id + self.usage_metadata = { + "input_tokens": 26, + "output_tokens": 50, + "total_tokens": 76, + } + self.additional_kwargs: dict[str, Any] = {} + self.tool_calls: list[Any] = [] + self.invalid_tool_calls: list[Any] = [] + self.name = None + + +def _build_callback_runtime() -> tuple[Any, MagicMock]: + """Build a runtime + NullRunCallback with a real dedup LRU.""" + from nullrun.instrumentation.langgraph import NullRunCallback + + rt = MagicMock() + rt.track = MagicMock() + rt._seen_track_fingerprints = make_dedup_state() + callback = NullRunCallback(runtime=rt) + return rt, callback + + +def _run_callback_on_llm_end(callback: Any, response: Any, **kwargs: Any) -> None: + """Drive NullRunCallback.on_llm_end with the stand-in response. + + Skips the actual ``on_chain_start`` / ``on_chain_end`` flow — we want + to test the fingerprint-stamping contract on the ``llm_call`` event + alone. + """ + callback.on_llm_end(response, **kwargs) + + +def _read_track_event(rt: MagicMock) -> dict[str, Any]: + """Return the most recent event passed to ``rt.track``.""" + assert rt.track.call_count >= 1 + return rt.track.call_args_list[-1][0][0] + + +def test_callback_llm_output_id_collides_with_httpx_fingerprint(): + """LangChain callback extracts response_id from + ``response.llm_output["id"]`` (langchain-openai 1.x primary location) + and produces the SAME fingerprint the httpx transport computes for + the same body. This is the core dedup fix.""" + rt, callback = _build_callback_runtime() + + response = _FakeLLMResult( + model_name="gpt-4.1-mini-2025-04-14", + response_id="chatcmpl-Dw7288WJI4bBDFyQ4DnZvhPUKfaZo", + llm_output_id="chatcmpl-Dw7288WJI4bBDFyQ4DnZvhPUKfaZo", + ) + _run_callback_on_llm_end(callback, response) + + event = _read_track_event(rt) + expected = _fingerprint_for_llm_call( + "gpt-4.1-mini-2025-04-14", "openai", "chatcmpl-Dw7288WJI4bBDFyQ4DnZvhPUKfaZo" + ) + assert event["_fingerprint"] == expected + + +def test_callback_response_id_attr_collides_with_httpx_fingerprint(): + """Some wrappers put the chat-completion id directly on the + ``response.id`` attribute (no llm_output dict). The callback MUST + read this location and produce the unified fingerprint.""" + rt, callback = _build_callback_runtime() + + response = _FakeLLMResult( + model_name="gpt-4.1-mini-2025-04-14", + response_id="ignored", + response_id_attr="chatcmpl-FROM-ATTR", + ) + _run_callback_on_llm_end(callback, response) + + event = _read_track_event(rt) + expected = _fingerprint_for_llm_call( + "gpt-4.1-mini-2025-04-14", "openai", "chatcmpl-FROM-ATTR" + ) + assert event["_fingerprint"] == expected + + +def test_callback_generation_message_id_collides_with_httpx_fingerprint(): + """AIMessage inside the first generation carries the id on its + ``.id`` attribute (langchain 0.x style). Callback MUST fall back + here when llm_output and response.id are missing.""" + rt, callback = _build_callback_runtime() + + response = _FakeLLMResult( + model_name="gpt-4.1-mini-2025-04-14", + response_id="ignored", + message_id="chatcmpl-FROM-MSG", + ) + _run_callback_on_llm_end(callback, response) + + event = _read_track_event(rt) + expected = _fingerprint_for_llm_call( + "gpt-4.1-mini-2025-04-14", "openai", "chatcmpl-FROM-MSG" + ) + assert event["_fingerprint"] == expected + + +def test_callback_response_metadata_id_collides_with_httpx_fingerprint(): + """AIMessage.response_metadata['id'] (langchain 0.x metadata style) + is the last-resort location for the chat-completion id. Callback + MUST walk this location too.""" + rt, callback = _build_callback_runtime() + + response = _FakeLLMResult( + model_name="gpt-4.1-mini-2025-04-14", + response_id="ignored", + response_metadata_id="chatcmpl-FROM-META", + ) + _run_callback_on_llm_end(callback, response) + + event = _read_track_event(rt) + expected = _fingerprint_for_llm_call( + "gpt-4.1-mini-2025-04-14", "openai", "chatcmpl-FROM-META" + ) + assert event["_fingerprint"] == expected + + +def test_callback_no_id_anywhere_falls_back_to_model_provider_only(): + """When no source yields a response id (a custom chat-model wrapper + that strips the upstream id entirely), the callback MUST still + emit a non-empty fingerprint so the dedup LRU sees it. The + fingerprint will collide with any sibling emission that has the + same model+provider but no id — which is acceptable, since both + observers of the same call also lack the id.""" + rt, callback = _build_callback_runtime() + + response = _FakeLLMResult( + model_name="custom-model-1", + response_id="ignored", + # No llm_output_id, no response_id_attr, no message_id, + # no response_metadata_id — every id location is missing. + ) + # Also strip llm_output["id"] explicitly. + assert "id" not in response.llm_output + + _run_callback_on_llm_end(callback, response) + + event = _read_track_event(rt) + fp = event["_fingerprint"] + assert fp # non-empty + expected = _fingerprint_for_llm_call("custom-model-1", "openai", None) + assert fp == expected + + +def test_callback_fingerprint_stable_across_duplicate_emissions(): + """Re-invoking the callback for the same logical LLM call (same + model, same chat-completion id) MUST produce the same fingerprint. + The dedup LRU then collapses the second emission. This pins the + "stable per call" contract that the dashboard relies on for an + accurate ``llm_call_count``.""" + rt, callback = _build_callback_runtime() + + response_a = _FakeLLMResult( + model_name="gpt-4.1-mini-2025-04-14", + response_id="shared", + llm_output_id="chatcmpl-SHARED", + ) + response_b = _FakeLLMResult( + model_name="gpt-4.1-mini-2025-04-14", + response_id="shared", + llm_output_id="chatcmpl-SHARED", + ) + _run_callback_on_llm_end(callback, response_a) + _run_callback_on_llm_end(callback, response_b) + + fp_a = rt.track.call_args_list[0][0][0]["_fingerprint"] + fp_b = rt.track.call_args_list[1][0][0]["_fingerprint"] + assert fp_a == fp_b + + # And the dedup LRU recognises it as the same fingerprint. + state = make_dedup_state() + assert _fingerprint_is_seen(state, fp_a) is False + assert _fingerprint_is_seen(state, fp_b) is True + + +# --------------------------------------------------------------------------- +# Cross-observer contract: httpx transport and LangChain callback +# produce the SAME fingerprint for the same real LLM call. +# --------------------------------------------------------------------------- + + +def test_httpx_and_callback_fingerprints_collide_for_same_call(): + """End-to-end: the same OpenAI chat-completion id surfaces in both + the response body (read by the httpx transport) and in + ``response.llm_output["id"]`` (read by the LangChain callback). + Both observers MUST produce identical fingerprints so the dedup + LRU collapses the two emissions on the wire.""" + # 1. Drive the httpx transport with a real response body. + rt_http = MagicMock() + rt_http.track = MagicMock() + rt_http._seen_track_fingerprints = make_dedup_state() + patch_httpx(rt_http) + + body = _openai_chat_completion_response( + response_id="chatcmpl-CROSS-OBSERVER", + ) + with respx.mock(base_url="https://api.openai.com") as mock: + mock.post("/v1/chat/completions").mock( + return_value=httpx.Response(200, content=body) + ) + with httpx.Client(base_url="https://api.openai.com") as client: + client.post("/v1/chat/completions", json={"model": "gpt-4.1-mini"}) + + fp_http = rt_http.track.call_args_list[0][0][0]["_fingerprint"] + reset_for_tests() + + # 2. Drive the LangChain callback with the same id in llm_output. + rt_cb, callback = _build_callback_runtime() + response = _FakeLLMResult( + model_name="gpt-4.1-mini-2025-04-14", + response_id="ignored", + llm_output_id="chatcmpl-CROSS-OBSERVER", + ) + _run_callback_on_llm_end(callback, response) + fp_cb = _read_track_event(rt_cb)["_fingerprint"] + + # 3. The two fingerprints MUST be identical — that's the whole fix. + assert fp_http == fp_cb, ( + f"httpx transport fingerprint {fp_http!r} != " + f"callback fingerprint {fp_cb!r} — dedup will not collapse " + f"the two emissions and the dashboard's llm_call_count will " + f"be doubled." + ) + + # 4. And the dedup LRU actually collapses them when both fire. + state = make_dedup_state() + # First observation: unseen. + assert _fingerprint_is_seen(state, fp_http) is False + _fingerprint_is_seen(state, fp_http) + # Second observation (the sibling callback emission): seen. + assert _fingerprint_is_seen(state, fp_cb) is True \ No newline at end of file From 84416a3f6c35f8c86bd55e45196c26f64c3f1696 Mon Sep 17 00:00:00 2001 From: Anatolii Date: Mon, 29 Jun 2026 19:17:10 +0400 Subject: [PATCH 2/2] fix(0.9.1): always emit streaming-skipped event even when model is unknown MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The 0.9.1 unified-fingerprint commit added an `if not model: return` guard to `_emit_streaming_skipped` as a ghost-event dedup. It was too aggressive — the test_streaming_oom_cap contract pins that the event fires whenever the cap is exceeded, so the backend's coverage denominator (llm_call_count) stays accurate. The test creates a request with no model in the body; the guard dropped the event and runtime.track was never called → 2 tests failed on CI. Remove the guard, keep the _fingerprint attachment (still the right move for dedup with sibling langchain-callback emissions). When model is None the backend's into_track_request_v2 gate may log a cost_pipeline_missing_model_total warning, but that's the same noise pre-0.9.1 produced and is preferable to silently dropping the event and skewing coverage_pct. metadata.streaming_skipped: True tells the backend this is a known-skipped emission, not a real call to bill against. --- src/nullrun/instrumentation/auto.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/src/nullrun/instrumentation/auto.py b/src/nullrun/instrumentation/auto.py index 330b499..83208d6 100644 --- a/src/nullrun/instrumentation/auto.py +++ b/src/nullrun/instrumentation/auto.py @@ -2069,16 +2069,19 @@ def _emit_streaming_skipped( inspected), and attach a deterministic `_fingerprint` when we do emit so dedup collapses repeats from the same call site. """ + # We always emit the streaming-skipped event regardless of + # whether ``_extract_model_from_request_body`` recovered a model. + # The test_streaming_oom_cap contract pins that the event fires + # whenever the cap is exceeded, so the backend's coverage + # denominator (``llm_call_count``) stays accurate. When ``model`` + # is ``None`` the backend's into_track_request_v2 gate may log a + # ``cost_pipeline_missing_model_total`` warning, but that's the + # same noise a streaming-skipped response produced pre-0.9.1 and + # is preferable to silently dropping the event and skewing + # coverage_pct. The ``metadata.streaming_skipped: True`` flag + # tells the backend this is a known-skipped emission, not a + # real call to bill against. model = _extract_model_from_request_body(request) - if not model: - logger.debug( - "NullRun transport: dropping streaming_skipped event for host=%s " - "because model extraction also failed (likely double-consume " - "by langchain-openai upstream or empty request body); " - "the happy-path _emit() will handle attribution if available", - host, - ) - return try: runtime.track( {