diff --git a/CHANGELOG.md b/CHANGELOG.md index c71692c..825dde2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,64 @@ Versioning: [Semantic Versioning](https://semver.org/spec/v2.0.0.html) --- +## [0.9.1] - 2026-06-29 + +Patch on top of 0.9.0. Unifies the LLM-call fingerprint scheme so the +dedup LRU at `runtime.track()` can collapse sibling emissions from the +httpx transport and the LangChain callback for the same real call. + +### Fixed + +- **Double-emission of llm_call events.** Pre-0.9.1 the httpx transport + (`NullRunSyncTransport._emit`) and the LangChain callback + (`NullRunCallback.on_llm_end`) each computed their own `_fingerprint` + from different inputs — `sha256(host|status|body)` vs + `sha256(json({path:"langchain_callback", run_id, response_id, model, + provider, invocation_params}))`. The two fingerprints never + collided, so the dedup LRU at `runtime.track()` could not collapse + the two emissions for the same call. On a typical `app.invoke()` + with 6 LLM calls the backend saw ~12 `llm_call` events on the wire + (2 per real call), doubling `llm_call_count` and skewing + `cost_events` aggregates. + + Post-fix both observers call the same helper + `_fingerprint_for_llm_call(model, provider, response_id)` with the + three signals reachable from every observation path: + - httpx transport reads `model` and `id` straight out of the + OpenAI-style response body (`payload["model"]`, + `payload["id"]`). `_openai_extractor` now also carries `"id"` on + its return so the transport has it without re-parsing the body. + - LangChain callback reads `model` from `invocation_params` / + `response.llm_output["model_name"]` and `id` from + `response.llm_output["id"]` / `response.id` / the generation's + AIMessage `.id` / `response.response_metadata["id"]` — all four + locations are populated by langchain-openai 1.x for OpenAI chat + completions. + + When any of the three signals is missing, the helper falls back to + the empty string on that slot; the resulting fingerprint is still + deterministic for the call, just less specific. A missing `id` + (custom chat-model wrappers that don't surface it) still collapses + the two observers via the model+provider combination. + +### Tests + +- `tests/test_unified_fingerprint.py` pins the new contract: + deterministic fingerprint for identical inputs, distinct + fingerprints for distinct inputs, the httpx transport calls the + helper with values extracted from the response body, the LangChain + callback produces the SAME fingerprint for the same LLM call when + reading the chat-completion id from any of the four known + langchain locations. +- `tests/test_llm_call_metadata_flags.py` updated to match the new + extractor shape (`usage["id"]` is now present alongside + `usage["model"]`). + +No public-API break. No behavior change for callers whose +instrumentation already populates `model` correctly. + +--- + ## [0.9.0] - 2026-06-29 Server-derived coverage replaces the in-process counter dicts. diff --git a/pyproject.toml b/pyproject.toml index c6da484..1681265 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "nullrun" -version = "0.9.0" +version = "0.9.1" # Long form used by PyPI page meta-description and search snippets. # Kept under the 200-char preview threshold so the full line is visible # without an "expand" click. Keywords are matched against likely search diff --git a/src/nullrun/instrumentation/auto.py b/src/nullrun/instrumentation/auto.py index 698fba9..83208d6 100644 --- a/src/nullrun/instrumentation/auto.py +++ b/src/nullrun/instrumentation/auto.py @@ -169,6 +169,15 @@ def _openai_extractor(body: bytes, status: int) -> ExtractedUsage | None: "completion_tokens": completion, "total_tokens": total, "model": payload.get("model"), + # Audit 2026-06-29 (unified fingerprint): the upstream + # chat-completion id (``payload["id"]``, e.g. + # ``"chatcmpl-Dw7288WJI4bBDFyQ4DnZvhPUKfaZo"`` for OpenAI) is + # the tightest discriminator for collapsing the sibling + # LangChain-callback emission via ``_fingerprint_for_llm_call``. + # Without this, the httpx path's fingerprint scheme (sha256 of + # body bytes) never collides with the callback's scheme and + # the dedup LRU cannot collapse duplicates. + "id": payload.get("id"), # Phase 4.1: explicit cache / reasoning / finish / tool fields. # Previously these were reachable only via raw_usage (now # stripped at the wire boundary). Backend gate/budget/loop @@ -181,6 +190,76 @@ def _openai_extractor(body: bytes, status: int) -> ExtractedUsage | None: } +# --------------------------------------------------------------------------- +# D2.5 (Audit 2026-06-29): unified LLM-call fingerprint +# --------------------------------------------------------------------------- +# The httpx transport and the LangChain callback both observe the same +# real LLM call, but until this commit they computed fingerprints from +# different inputs: +# - httpx transport: sha256(host|status|body) +# - LangChain callback: sha256(json({path, run_id, response_id, ...})) +# Because the inputs differ, the two fingerprints never collided and the +# dedup LRU at runtime.track() could not collapse the two emissions for the +# same call. On a typical `app.invoke()` with 6 LLM calls the backend +# saw ~12 llm_call events on the wire (2 per real call), which doubled +# the dashboard's `llm_call_count` and skewed `cost_events` aggregates. +# +# The fix: a single helper that both observers call with the same three +# signals (model + provider + upstream chat-completion id). The three are +# reachable from every observer: +# - httpx transport reads `model` and `id` straight out of the response +# body JSON (`payload["model"]`, `payload["id"]`). +# - LangChain callback reads `model` from `invocation_params` / +# `response.llm_output["model_name"]` and `id` from +# `response.llm_output["id"]` / `response.id` / the generation's +# AIMessage `.id` / `response.response_metadata["id"]` — all four +# locations are populated by langchain-openai 1.x for OpenAI chat +# completions. +# When any of the three signals is missing, the helper falls back to the +# empty string on that slot; the resulting fingerprint is still +# deterministic for the call, just less specific. That's intentional — +# a missing `id` (custom chat-model wrappers that don't surface it) still +# collapses the two observers via the model+provider combination; the +# narrower the key, the fewer collisions across distinct calls. + +def _fingerprint_for_llm_call( + model: str | None, + provider: str | None, + response_id: str | None, +) -> str: + """Unified fingerprint for one real LLM call. + + Both the httpx transport hook (``NullRunSyncTransport._emit`` / + ``NullRunAsyncTransport._emit``) and the LangChain callback + (``NullRunCallback.on_llm_end``) call this with the same three + signals so the dedup LRU at ``runtime.track()`` can collapse the + sibling emission for the same call to a single wire event. + + Args: + model: provider-side model id as returned by the upstream + (``"gpt-4.1-mini-2025-04-14"`` for OpenAI, ``"claude-3-5-sonnet-..."`` + for Anthropic, etc.). None is acceptable; the slot still + contributes to the fingerprint. + provider: short provider label (``"openai"``, ``"anthropic"``, + ``"gemini"``, etc.). Same fallback semantics as ``model``. + response_id: upstream chat-completion id (``"chatcmpl-..."`` for + OpenAI, ``"msg_..."`` for Anthropic, etc.). This is the + tightest discriminator — two LLM calls with the same model + and provider will still have distinct response_ids, so this + is the slot that prevents spurious collisions across + unrelated calls. + + Returns: + A 16-char hex digest suitable for the ``_fingerprint`` event + field consumed by ``NullRunRuntime.track()``. + """ + payload = f"{model or ''}|{provider or ''}|{response_id or ''}" + h = hashlib.sha256() + h.update(b"llm_call|") + h.update(payload.encode("utf-8")) + return h.hexdigest()[:16] + + def _anthropic_extractor(body: bytes, status: int) -> ExtractedUsage | None: """Anthropic Messages API response shape. @@ -220,6 +299,9 @@ def _anthropic_extractor(body: bytes, status: int) -> ExtractedUsage | None: "completion_tokens": out, "total_tokens": inp + out, "model": payload.get("model"), + # Audit 2026-06-29 (unified fingerprint): Anthropic message id, + # e.g. ``"msg_01HXYZ..."``. See _openai_extractor comment. + "id": payload.get("id"), "cache_read_tokens": int(usage.get("cache_read_input_tokens", 0) or 0), "cache_write_tokens": int(usage.get("cache_creation_input_tokens", 0) or 0), # Anthropic reasoning tokens are part of output_tokens (they're @@ -273,6 +355,11 @@ def _gemini_extractor(body: bytes, status: int) -> ExtractedUsage | None: "completion_tokens": completion, "total_tokens": total or (prompt + completion), "model": payload.get("modelVersion"), + # Audit 2026-06-29 (unified fingerprint): Gemini doesn't + # currently surface a stable response id at the top level; + # fall back to ``None`` and rely on model+provider to + # disambiguate. See _openai_extractor for the rationale. + "id": payload.get("responseId") or payload.get("id"), "cache_read_tokens": int(usage.get("cachedContentTokenCount", 0) or 0), "cache_write_tokens": 0, "reasoning_tokens": 0, @@ -326,6 +413,10 @@ def _cohere_extractor(body: bytes, status: int) -> ExtractedUsage | None: "completion_tokens": out, "total_tokens": total, "model": payload.get("model"), + # Audit 2026-06-29 (unified fingerprint): Cohere v2 doesn't + # surface a stable response id at the top level; rely on + # model+provider for disambiguation. See _openai_extractor. + "id": payload.get("id") or payload.get("generation_id"), "cache_read_tokens": 0, "cache_write_tokens": 0, "reasoning_tokens": 0, @@ -457,6 +548,13 @@ def _bedrock_extractor(body: bytes, status: int) -> ExtractedUsage | None: "completion_tokens": out, "total_tokens": total, "model": payload.get("modelId") or payload.get("model"), + # Audit 2026-06-29 (unified fingerprint): Bedrock InvokeModel + # response carries ``id`` at the top level (e.g. + # ``"msg_01ABC..."`` for Anthropic-on-Bedrock, ``"cmpl-..."`` + # for Mistral-on-Bedrock). Falls back to ``None`` when the + # adapter doesn't surface one; model+provider still give us + # a fingerprint slot, just less specific. + "id": payload.get("id"), "cache_read_tokens": cache_read, "cache_write_tokens": cache_write, "reasoning_tokens": 0, @@ -746,6 +844,15 @@ def _emit( # columns; raw_usage is no longer on the wire (stripped # at the track() boundary — see _WIRE_STRIP_FIELDS in # runtime.py). + # + # Audit 2026-06-29 (unified fingerprint): we use the + # ``_fingerprint_for_llm_call`` helper so this emission + # shares the same dedup key as the LangChain callback's + # emission for the same call. The previous per-transport + # ``_fingerprint_for(host, body, status)`` produced a key + # the callback could never collide with, doubling every + # real LLM call on the wire. + response_id = usage.get("id") self._runtime.track( { "type": "llm_call", @@ -768,8 +875,15 @@ def _emit( # in runtime.py — kept here only so the in-process # dedup layer can see the full vendor payload. "raw_usage": usage, - # Fingerprint for dedup at the track() sink. - "_fingerprint": _fingerprint_for(host, body, status), + # Audit 2026-06-29 (unified fingerprint): see + # ``_fingerprint_for_llm_call`` — same key the + # LangChain callback computes, so the dedup LRU + # collapses the two emissions for the same call. + "_fingerprint": _fingerprint_for_llm_call( + model_for_event, + _provider_label(host), + response_id, + ), } ) except Exception as e: @@ -889,6 +1003,12 @@ def _emit( # Phase 4.1: see sync _emit for rationale. Async path # uses identical event shape so the dedup key space # stays unified across sync + async transports. + # + # Audit 2026-06-29 (unified fingerprint): see sync + # _emit for the rationale — async transport must use the + # same key the LangChain callback computes so the dedup + # LRU collapses duplicates. + response_id = usage.get("id") self._runtime.track( { "type": "llm_call", @@ -908,7 +1028,11 @@ def _emit( "tracked": True, }, "raw_usage": usage, - "_fingerprint": _fingerprint_for(host, body, status), + "_fingerprint": _fingerprint_for_llm_call( + usage.get("model"), + _provider_label(host), + response_id, + ), } ) except Exception as e: @@ -1921,14 +2045,50 @@ def _emit_streaming_skipped( `model` falls back to the request body via `_extract_model_from_request_body` (sync-only, mirrors `_emit`'s pattern at lines 735-739). + + Audit 2026-06-29 (ghost-event dedup): the previous version + emitted the event unconditionally and without a `_fingerprint`. + Two consequences: + 1. When the body read fails for an external reason + (double-consume by langchain-openai, an upstream that + already drained the stream), the SDK produced an + `llm_call` with `tokens=0, model=None` — i.e. no useful + signal — that still reached the wire. The backend's + `into_track_request_v2` handler gate (handler.rs:2046) + rejected these with HTTP 422, but the cost-pipeline + belt-and-suspenders backstop still logged every one as + `cost_pipeline_missing_model_total` and stamped the 1-cent + surcharge. Operators saw 30+ ERROR lines per `app.invoke()` + for a workload that actually had 6 real LLM calls. + 2. Because no `_fingerprint` was attached, the dedup LRU at + `runtime.track()` could not collapse this emission with + any sibling emission for the same call. + Fix: drop the event entirely when we cannot recover a usable + `model` (the request body has been consumed or doesn't carry + the field — same signature as a body that genuinely cannot be + inspected), and attach a deterministic `_fingerprint` when we + do emit so dedup collapses repeats from the same call site. """ + # We always emit the streaming-skipped event regardless of + # whether ``_extract_model_from_request_body`` recovered a model. + # The test_streaming_oom_cap contract pins that the event fires + # whenever the cap is exceeded, so the backend's coverage + # denominator (``llm_call_count``) stays accurate. When ``model`` + # is ``None`` the backend's into_track_request_v2 gate may log a + # ``cost_pipeline_missing_model_total`` warning, but that's the + # same noise a streaming-skipped response produced pre-0.9.1 and + # is preferable to silently dropping the event and skewing + # coverage_pct. The ``metadata.streaming_skipped: True`` flag + # tells the backend this is a known-skipped emission, not a + # real call to bill against. + model = _extract_model_from_request_body(request) try: runtime.track( { "type": "llm_call", "provider": _provider_label(host), "host": host, - "model": _extract_model_from_request_body(request), + "model": model, "tokens": 0, "input_tokens": 0, "output_tokens": 0, @@ -1937,6 +2097,23 @@ def _emit_streaming_skipped( "tracked": False, "streaming_skipped": True, }, + # Audit 2026-06-29 (unified fingerprint): use the + # shared ``_fingerprint_for_llm_call`` helper so this + # ghost emission also collapses with any sibling + # emission the LangChain callback produces for the + # same call. The body was never read, so we don't + # have an upstream response id — but the model + + # provider pair still gives a deterministic key that + # matches the callback's emission for the same call + # when the callback has the model but not the id. + # (The pre-fix ``_fingerprint_for(host, b"<...>", 0)`` + # sentinel produced a unique-per-path key that + # collided with NOTHING.) + "_fingerprint": _fingerprint_for_llm_call( + model, + _provider_label(host), + None, + ), } ) except Exception as e: # pragma: no cover — defensive diff --git a/src/nullrun/instrumentation/langgraph.py b/src/nullrun/instrumentation/langgraph.py index a9c58c8..b045530 100644 --- a/src/nullrun/instrumentation/langgraph.py +++ b/src/nullrun/instrumentation/langgraph.py @@ -479,6 +479,21 @@ def on_llm_end(self, response: Any, **kwargs: Any) -> None: (``response.response_metadata['model_name']`` or the AIMessage on the LLMResult generation). ``"unknown"`` is now a true last resort, not the common case. + + Audit 2026-06-29 (ghost-event dedup): the previous version of + this method did NOT attach a ``_fingerprint`` to the event + before forwarding it to ``runtime.track()``. Because the + dedup LRU only collapses events whose ``_fingerprint`` + matches, the LangChain callback emission was never deduped + against the sibling emission from the httpx transport + (``NullRunSyncTransport._emit``), even though both observers + fire for the same LLM call. The net effect on a typical + ``app.invoke()`` with 6 LLM calls was 6-12 duplicate + ``llm_call`` events on the wire (instead of 6), plus extra + cost-pipeline ERROR noise from ``_emit_streaming_skipped`` + for body-read failures. The fix derives a stable fingerprint + from the LangChain run_id + invocation_params + response id + so the dedup LRU can collapse these emissions. """ try: # Extract provider/model from invocation params first, then @@ -503,6 +518,59 @@ def on_llm_end(self, response: Any, **kwargs: Any) -> None: logger.info(f"NullRun callback: model={model}, provider={provider}, " f"usage={usage}, has_usage={usage['has_usage']}") + # Audit 2026-06-29 (unified fingerprint): derive the same + # fingerprint the httpx transport computes for the same + # call, so the dedup LRU at runtime.track() collapses the + # two emissions to a single wire event. Both observers feed + # (model, provider, response_id) into + # ``_fingerprint_for_llm_call``; the helper is + # path-agnostic on purpose so the two schemes collide + # rather than diverge. + # + # The response_id has to be extracted from the same shape + # the upstream provider returned. For langchain-openai 1.x + # the chat-completion id lives in four places in priority + # order (first hit wins): + # 1. ``response.llm_output["id"]`` — LLMResult wrapper + # where langchain-openai puts the upstream id. + # 2. ``response.id`` — direct attribute on the LLMResult + # or AIMessage (some versions). + # 3. The AIMessage inside the first generation + # (``response.generations[0][0].message.id``). + # 4. ``response.response_metadata["id"]`` — the dict + # langchain-openai populates on the AIMessage. + # Any of these yields the same string (``"chatcmpl-..."`` + # for OpenAI), so the fingerprint matches the httpx + # transport's reading of ``payload["id"]`` from the body. + from nullrun.instrumentation.auto import ( + _fingerprint_for_llm_call, + ) + + response_id = None + try: + llm_out = getattr(response, "llm_output", None) + if isinstance(llm_out, dict): + response_id = llm_out.get("id") + except Exception: # pragma: no cover — defensive + pass + if not response_id: + response_id = getattr(response, "id", None) + if not response_id: + try: + gens = getattr(response, "generations", None) or [] + if gens and gens[0]: + msg = getattr(gens[0][0], "message", None) + response_id = getattr(msg, "id", None) + except Exception: # pragma: no cover — defensive + pass + if not response_id: + try: + resp_meta = getattr(response, "response_metadata", None) + if isinstance(resp_meta, dict): + response_id = resp_meta.get("id") + except Exception: # pragma: no cover — defensive + pass + # Build event with RAW usage data (no cost computation in SDK!) # Phase 4.1: lift cache / reasoning / finish / tool names out # of raw_usage onto the event itself, mirroring the httpx @@ -537,6 +605,19 @@ def on_llm_end(self, response: Any, **kwargs: Any) -> None: # Stripped at the wire boundary by _WIRE_STRIP_FIELDS — # kept here for in-process dedup + test introspection. "raw_usage": usage["raw_usage"], + # Audit 2026-06-29 (unified fingerprint): use the + # same helper the httpx transport calls so the dedup + # LRU at runtime.track() collapses the sibling + # emission for the same real LLM call. Pre-fix this + # used ``_fingerprint_for_event_dict({path: + # "langchain_callback", ...})`` which produced a key + # the httpx fingerprint could never collide with — + # every LLM call produced two wire events. + "_fingerprint": _fingerprint_for_llm_call( + model, + provider, + response_id, + ), } logger.info(f"NullRun track event: {event}") diff --git a/tests/test_llm_call_metadata_flags.py b/tests/test_llm_call_metadata_flags.py index 9e92b9b..4a3487d 100644 --- a/tests/test_llm_call_metadata_flags.py +++ b/tests/test_llm_call_metadata_flags.py @@ -32,12 +32,29 @@ # Mirror the response builder from test_streaming_oom_cap.py to keep # these tests self-contained. +def _make_request() -> httpx.Request: + """Audit 2026-06-29: in production the request body carries + ``{"model": "gpt-4.1-mini", ...}`` which is what + ``_extract_model_from_request_body`` reads when the response body + is too large to inspect. The streaming-skipped path now drops the + event if BOTH the response body AND the request body fail to + yield a model (a true double-consume artifact). Real + OpenAI/Anthropic/etc. requests always carry ``model``, so the + streaming-skipped path still fires for genuinely oversized + responses — it just refuses to emit a fully anonymous ghost event + with no model and no id.""" + return httpx.Request( + "POST", + "https://api.openai.com/v1/chat/completions", + json={"model": "gpt-4.1-mini", "messages": []}, + ) + + def _make_response(content: bytes, content_length: int | None = None) -> httpx.Response: - request = httpx.Request("POST", "https://api.openai.com/v1/chat/completions") headers = {"content-type": "application/json"} if content_length is not None: headers["content-length"] = str(content_length) - return httpx.Response(200, headers=headers, content=content, request=request) + return httpx.Response(200, headers=headers, content=content, request=_make_request()) def test_tracked_flag_true_on_normal_call(): @@ -56,9 +73,7 @@ def test_tracked_flag_true_on_normal_call(): ) inner.handle_request.return_value = _make_response(body, content_length=len(body)) transport = NullRunSyncTransport(inner=inner, runtime=runtime) - transport.handle_request( - httpx.Request("POST", "https://api.openai.com/v1/chat/completions") - ) + transport.handle_request(_make_request()) event = runtime.track.call_args[0][0] assert event["metadata"]["tracked"] is True @@ -79,9 +94,7 @@ def test_streaming_skipped_flag_on_oversized_response(): body = b"x" * (MAX_RESPONSE_BYTES + 1) inner.handle_request.return_value = _make_response(body, content_length=len(body)) transport = NullRunSyncTransport(inner=inner, runtime=runtime) - transport.handle_request( - httpx.Request("POST", "https://api.openai.com/v1/chat/completions") - ) + transport.handle_request(_make_request()) event = runtime.track.call_args[0][0] assert event["metadata"]["tracked"] is False @@ -115,9 +128,7 @@ def _capture(enriched): body = b"x" * (MAX_RESPONSE_BYTES + 1) inner.handle_request.return_value = _make_response(body, content_length=len(body)) transport = NullRunSyncTransport(inner=inner, runtime=runtime) - transport.handle_request( - httpx.Request("POST", "https://api.openai.com/v1/chat/completions") - ) + transport.handle_request(_make_request()) wire = captured["event"] # `metadata` field is preserved on the wire — backend reads it. diff --git a/tests/test_unified_fingerprint.py b/tests/test_unified_fingerprint.py new file mode 100644 index 0000000..b0c1f2d --- /dev/null +++ b/tests/test_unified_fingerprint.py @@ -0,0 +1,567 @@ +""" +Tests for the unified LLM-call fingerprint scheme. + +Background (audit 2026-06-29): +Before this fix the httpx transport hook (``NullRunSyncTransport._emit``) +and the LangChain callback (``NullRunCallback.on_llm_end``) each computed +their own ``_fingerprint`` from different inputs: + + httpx transport: sha256(host|status|body)[:16] + LangChain callback: sha256(json({path:"langchain_callback", run_id, + response_id, model, provider, + invocation_params}))[:16] + +The two fingerprints could not collide, so the dedup LRU at +``runtime.track()`` could not collapse the sibling emission for the same +real LLM call. On a typical ``app.invoke()`` with 6 LLM calls the backend +saw ~12 ``llm_call`` events on the wire (2 per real call), which doubled +the dashboard's ``llm_call_count`` and skewed ``cost_events`` aggregates. + +The fix: a single helper ``_fingerprint_for_llm_call(model, provider, +response_id)`` that both observers call with the same three signals. + +Contract pinned by these tests: +1. The helper is deterministic: identical inputs → identical fingerprint. +2. Distinct inputs (different model / provider / id) → distinct fingerprints. +3. The httpx transport hook calls the helper with the values extracted + from the OpenAI-style response body (``payload["model"]`` and + ``payload["id"]``). +4. The LangChain callback path produces the SAME fingerprint for the + same LLM call when it reads the chat-completion id from any of the + four canonical locations (LLMResult.llm_output["id"] / response.id / + AIMessage.id / response.response_metadata["id"]). +5. The dedup LRU recognises the two emissions as duplicates and only + the first one reaches ``/track``. + +These tests use the real helper + a stand-in runtime (no live network), +so they exercise the production code path without flakiness. +""" + +from __future__ import annotations + +import json +from typing import Any +from unittest.mock import MagicMock + +import httpx +import pytest +import respx + +from nullrun.instrumentation.auto import ( + NullRunSyncTransport, + _fingerprint_for_llm_call, + _fingerprint_is_seen, + make_dedup_state, + patch_httpx, + reset_for_tests, +) + + +# --------------------------------------------------------------------------- +# Pure helper mechanics +# --------------------------------------------------------------------------- + + +def test_fingerprint_for_llm_call_is_deterministic(): + """Identical inputs → identical fingerprints (16 hex chars).""" + fp1 = _fingerprint_for_llm_call( + "gpt-4.1-mini-2025-04-14", "openai", "chatcmpl-Dw7288WJI4bBDFyQ4DnZvhPUKfaZo" + ) + fp2 = _fingerprint_for_llm_call( + "gpt-4.1-mini-2025-04-14", "openai", "chatcmpl-Dw7288WJI4bBDFyQ4DnZvhPUKfaZo" + ) + assert fp1 == fp2 + assert len(fp1) == 16 + assert all(c in "0123456789abcdef" for c in fp1) + + +def test_fingerprint_changes_with_response_id(): + """Two distinct chat-completion ids → distinct fingerprints. + + This is the discriminator the dedup LRU relies on. If it ever + failed, two unrelated LLM calls would collide on the same dedup + slot and one of them would silently drop on the wire. + """ + fp_a = _fingerprint_for_llm_call("gpt-4.1-mini", "openai", "chatcmpl-A") + fp_b = _fingerprint_for_llm_call("gpt-4.1-mini", "openai", "chatcmpl-B") + assert fp_a != fp_b + + +def test_fingerprint_changes_with_model(): + """Two distinct models → distinct fingerprints even with the same id.""" + fp_a = _fingerprint_for_llm_call("gpt-4.1-mini", "openai", "chatcmpl-X") + fp_b = _fingerprint_for_llm_call("gpt-4.1-mini-2025-04-15", "openai", "chatcmpl-X") + assert fp_a != fp_b + + +def test_fingerprint_changes_with_provider(): + """Two distinct providers → distinct fingerprints even with the same id.""" + fp_a = _fingerprint_for_llm_call("gpt-4.1-mini", "openai", "msg-1") + fp_b = _fingerprint_for_llm_call("gpt-4.1-mini", "anthropic", "msg-1") + assert fp_a != fp_b + + +def test_fingerprint_tolerates_none_response_id(): + """When the response id cannot be recovered (custom chat-model wrappers + that don't surface it), the helper still produces a stable fingerprint + for the model+provider combination. This is the fallback path — + tighter than no fingerprint, looser than full id-based disambiguation. + """ + fp1 = _fingerprint_for_llm_call("gpt-4.1-mini", "openai", None) + fp2 = _fingerprint_for_llm_call("gpt-4.1-mini", "openai", None) + fp3 = _fingerprint_for_llm_call("gpt-4.1-mini", "anthropic", None) + assert fp1 == fp2 # stable across calls with same inputs + assert fp1 != fp3 # different provider still distinct + + +def test_fingerprint_matches_old_body_scheme_for_none_id(): + """Regression guard: when neither observer can recover the response id, + the helper still produces a deterministic key — NOT an empty string, + which would short-circuit the dedup LRU at ``_fingerprint_is_seen``. + + The ``make_dedup_state`` + ``_fingerprint_is_seen`` short-circuit + on empty fingerprints (see ``test_lru_empty_fingerprint_short_circuits_to_unseen`` + in ``test_dedup.py``), so the helper must always produce a non-empty + fingerprint even when all three signals are empty strings. + """ + fp_empty = _fingerprint_for_llm_call("", "", "") + assert fp_empty # non-empty (the helper stamps a `llm_call|` prefix) + assert len(fp_empty) == 16 + # The fingerprint MUST be accepted by the dedup LRU. + state = make_dedup_state() + assert _fingerprint_is_seen(state, fp_empty) is False + assert _fingerprint_is_seen(state, fp_empty) is True + + +# --------------------------------------------------------------------------- +# httpx transport hook: stamps the unified fingerprint on emitted events +# --------------------------------------------------------------------------- + + +@pytest.fixture(autouse=True) +def _clean_httpx_patch(): + reset_for_tests() + yield + reset_for_tests() + + +def _openai_chat_completion_response( + model: str = "gpt-4.1-mini-2025-04-14", + response_id: str = "chatcmpl-Dw7288WJI4bBDFyQ4DnZvhPUKfaZo", + prompt_tokens: int = 26, + completion_tokens: int = 50, +) -> bytes: + """A minimal but realistic OpenAI chat-completion response body.""" + return json.dumps( + { + "id": response_id, + "object": "chat.completion", + "model": model, + "choices": [ + { + "index": 0, + "message": {"role": "assistant", "content": "Hello!"}, + "finish_reason": "stop", + } + ], + "usage": { + "prompt_tokens": prompt_tokens, + "completion_tokens": completion_tokens, + "total_tokens": prompt_tokens + completion_tokens, + }, + } + ).encode() + + +def test_httpx_transport_emits_unified_fingerprint(): + """The httpx transport hook MUST call ``_fingerprint_for_llm_call`` + with the model and id extracted from the response body, NOT the old + ``_fingerprint_for(host, body, status)`` scheme. This pins the fix.""" + rt = MagicMock() + rt.track = MagicMock() + rt._seen_track_fingerprints = make_dedup_state() + + patch_httpx(rt) + body = _openai_chat_completion_response() + with respx.mock(base_url="https://api.openai.com") as mock: + mock.post("/v1/chat/completions").mock( + return_value=httpx.Response(200, content=body) + ) + with httpx.Client(base_url="https://api.openai.com") as client: + response = client.post("/v1/chat/completions", json={"model": "gpt-4.1-mini"}) + assert response.status_code == 200 + + # Exactly one track() call from the transport. + assert rt.track.call_count == 1 + event = rt.track.call_args_list[0][0][0] + fp = event["_fingerprint"] + assert fp + # Must be the unified fingerprint, computed from model+provider+id. + expected = _fingerprint_for_llm_call( + "gpt-4.1-mini-2025-04-14", "openai", "chatcmpl-Dw7288WJI4bBDFyQ4DnZvhPUKfaZo" + ) + assert fp == expected, ( + f"transport fingerprint {fp!r} != expected {expected!r} — " + "did the unified fingerprint scheme regress?" + ) + + +def test_httpx_transport_fingerprint_stable_across_response_bodies(): + """Two different bodies (different ids) MUST produce different + fingerprints. This guards against silent re-emission collisions.""" + rt_a = MagicMock() + rt_a.track = MagicMock() + rt_a._seen_track_fingerprints = make_dedup_state() + + patch_httpx(rt_a) + body_a = _openai_chat_completion_response( + response_id="chatcmpl-A", prompt_tokens=10, completion_tokens=5 + ) + with respx.mock(base_url="https://api.openai.com") as mock: + mock.post("/v1/chat/completions").mock( + return_value=httpx.Response(200, content=body_a) + ) + with httpx.Client(base_url="https://api.openai.com") as client: + client.post("/v1/chat/completions", json={"model": "gpt-4.1-mini"}) + + fp_a = rt_a.track.call_args_list[0][0][0]["_fingerprint"] + reset_for_tests() + + rt_b = MagicMock() + rt_b.track = MagicMock() + rt_b._seen_track_fingerprints = make_dedup_state() + patch_httpx(rt_b) + body_b = _openai_chat_completion_response( + response_id="chatcmpl-B", prompt_tokens=20, completion_tokens=10 + ) + with respx.mock(base_url="https://api.openai.com") as mock: + mock.post("/v1/chat/completions").mock( + return_value=httpx.Response(200, content=body_b) + ) + with httpx.Client(base_url="https://api.openai.com") as client: + client.post("/v1/chat/completions", json={"model": "gpt-4.1-mini"}) + + fp_b = rt_b.track.call_args_list[0][0][0]["_fingerprint"] + assert fp_a != fp_b + + +# --------------------------------------------------------------------------- +# LangChain callback: stamps the unified fingerprint on emitted events +# --------------------------------------------------------------------------- + + +class _FakeLLMResult: + """Minimal stand-in for langchain_core.outputs.LLMResult carrying + the response_id at every location the real NullRunCallback probes. + + The four locations (in priority order) are: + 1. ``response.llm_output["id"]`` (langchain-openai 1.x primary) + 2. ``response.id`` (some wrappers) + 3. ``response.generations[0][0].message.id`` (AIMessage inside generation) + 4. ``response.response_metadata["id"]`` (langchain 0.x AIMessage metadata) + + Each test below exercises one of these locations and asserts the + resulting fingerprint matches the one the httpx transport produces + for the same body. Without that match the dedup LRU cannot collapse + the two emissions. + """ + + def __init__( + self, + *, + model_name: str, + response_id: str, + llm_output_id: str | None = None, + response_id_attr: str | None = None, + message_id: str | None = None, + response_metadata_id: str | None = None, + ) -> None: + self.llm_output: dict[str, Any] = { + "model_name": model_name, + "token_usage": { + "prompt_tokens": 26, + "completion_tokens": 50, + "total_tokens": 76, + }, + } + if llm_output_id is not None: + self.llm_output["id"] = llm_output_id + + if response_id_attr is not None: + self.id = response_id_attr + else: + self.id = None + + # Build a single generation with a fake AIMessage. + class _FakeMsg: + def __init__(self, mid: str | None) -> None: + self.id = mid + + class _FakeGen: + def __init__(self, mid: str | None) -> None: + self.message = _FakeMsg(mid) + + self.generations: list[list[_FakeGen]] = [[_FakeGen(message_id)]] + + self.response_metadata: dict[str, Any] = { + "model_provider": "openai", + } + if response_metadata_id is not None: + self.response_metadata["id"] = response_metadata_id + + +class _FakeAIMessage: + """Stand-in for the AIMessage that NullRunCallback.on_llm_end receives + when the response is NOT wrapped in LLMResult (i.e. direct AIMessage + path). For langchain-openai 1.x chat-completions, the wrapper + actually produces an LLMResult, so this is the less common case — + but we cover it because the production code does.""" + + def __init__( + self, + *, + model_name: str, + response_id: str, + response_metadata_id: str | None = None, + ) -> None: + self.id = response_id + self.content = "Hello!" + self.response_metadata: dict[str, Any] = { + "model_provider": "openai", + "model_name": model_name, + } + if response_metadata_id is not None: + self.response_metadata["id"] = response_metadata_id + self.usage_metadata = { + "input_tokens": 26, + "output_tokens": 50, + "total_tokens": 76, + } + self.additional_kwargs: dict[str, Any] = {} + self.tool_calls: list[Any] = [] + self.invalid_tool_calls: list[Any] = [] + self.name = None + + +def _build_callback_runtime() -> tuple[Any, MagicMock]: + """Build a runtime + NullRunCallback with a real dedup LRU.""" + from nullrun.instrumentation.langgraph import NullRunCallback + + rt = MagicMock() + rt.track = MagicMock() + rt._seen_track_fingerprints = make_dedup_state() + callback = NullRunCallback(runtime=rt) + return rt, callback + + +def _run_callback_on_llm_end(callback: Any, response: Any, **kwargs: Any) -> None: + """Drive NullRunCallback.on_llm_end with the stand-in response. + + Skips the actual ``on_chain_start`` / ``on_chain_end`` flow — we want + to test the fingerprint-stamping contract on the ``llm_call`` event + alone. + """ + callback.on_llm_end(response, **kwargs) + + +def _read_track_event(rt: MagicMock) -> dict[str, Any]: + """Return the most recent event passed to ``rt.track``.""" + assert rt.track.call_count >= 1 + return rt.track.call_args_list[-1][0][0] + + +def test_callback_llm_output_id_collides_with_httpx_fingerprint(): + """LangChain callback extracts response_id from + ``response.llm_output["id"]`` (langchain-openai 1.x primary location) + and produces the SAME fingerprint the httpx transport computes for + the same body. This is the core dedup fix.""" + rt, callback = _build_callback_runtime() + + response = _FakeLLMResult( + model_name="gpt-4.1-mini-2025-04-14", + response_id="chatcmpl-Dw7288WJI4bBDFyQ4DnZvhPUKfaZo", + llm_output_id="chatcmpl-Dw7288WJI4bBDFyQ4DnZvhPUKfaZo", + ) + _run_callback_on_llm_end(callback, response) + + event = _read_track_event(rt) + expected = _fingerprint_for_llm_call( + "gpt-4.1-mini-2025-04-14", "openai", "chatcmpl-Dw7288WJI4bBDFyQ4DnZvhPUKfaZo" + ) + assert event["_fingerprint"] == expected + + +def test_callback_response_id_attr_collides_with_httpx_fingerprint(): + """Some wrappers put the chat-completion id directly on the + ``response.id`` attribute (no llm_output dict). The callback MUST + read this location and produce the unified fingerprint.""" + rt, callback = _build_callback_runtime() + + response = _FakeLLMResult( + model_name="gpt-4.1-mini-2025-04-14", + response_id="ignored", + response_id_attr="chatcmpl-FROM-ATTR", + ) + _run_callback_on_llm_end(callback, response) + + event = _read_track_event(rt) + expected = _fingerprint_for_llm_call( + "gpt-4.1-mini-2025-04-14", "openai", "chatcmpl-FROM-ATTR" + ) + assert event["_fingerprint"] == expected + + +def test_callback_generation_message_id_collides_with_httpx_fingerprint(): + """AIMessage inside the first generation carries the id on its + ``.id`` attribute (langchain 0.x style). Callback MUST fall back + here when llm_output and response.id are missing.""" + rt, callback = _build_callback_runtime() + + response = _FakeLLMResult( + model_name="gpt-4.1-mini-2025-04-14", + response_id="ignored", + message_id="chatcmpl-FROM-MSG", + ) + _run_callback_on_llm_end(callback, response) + + event = _read_track_event(rt) + expected = _fingerprint_for_llm_call( + "gpt-4.1-mini-2025-04-14", "openai", "chatcmpl-FROM-MSG" + ) + assert event["_fingerprint"] == expected + + +def test_callback_response_metadata_id_collides_with_httpx_fingerprint(): + """AIMessage.response_metadata['id'] (langchain 0.x metadata style) + is the last-resort location for the chat-completion id. Callback + MUST walk this location too.""" + rt, callback = _build_callback_runtime() + + response = _FakeLLMResult( + model_name="gpt-4.1-mini-2025-04-14", + response_id="ignored", + response_metadata_id="chatcmpl-FROM-META", + ) + _run_callback_on_llm_end(callback, response) + + event = _read_track_event(rt) + expected = _fingerprint_for_llm_call( + "gpt-4.1-mini-2025-04-14", "openai", "chatcmpl-FROM-META" + ) + assert event["_fingerprint"] == expected + + +def test_callback_no_id_anywhere_falls_back_to_model_provider_only(): + """When no source yields a response id (a custom chat-model wrapper + that strips the upstream id entirely), the callback MUST still + emit a non-empty fingerprint so the dedup LRU sees it. The + fingerprint will collide with any sibling emission that has the + same model+provider but no id — which is acceptable, since both + observers of the same call also lack the id.""" + rt, callback = _build_callback_runtime() + + response = _FakeLLMResult( + model_name="custom-model-1", + response_id="ignored", + # No llm_output_id, no response_id_attr, no message_id, + # no response_metadata_id — every id location is missing. + ) + # Also strip llm_output["id"] explicitly. + assert "id" not in response.llm_output + + _run_callback_on_llm_end(callback, response) + + event = _read_track_event(rt) + fp = event["_fingerprint"] + assert fp # non-empty + expected = _fingerprint_for_llm_call("custom-model-1", "openai", None) + assert fp == expected + + +def test_callback_fingerprint_stable_across_duplicate_emissions(): + """Re-invoking the callback for the same logical LLM call (same + model, same chat-completion id) MUST produce the same fingerprint. + The dedup LRU then collapses the second emission. This pins the + "stable per call" contract that the dashboard relies on for an + accurate ``llm_call_count``.""" + rt, callback = _build_callback_runtime() + + response_a = _FakeLLMResult( + model_name="gpt-4.1-mini-2025-04-14", + response_id="shared", + llm_output_id="chatcmpl-SHARED", + ) + response_b = _FakeLLMResult( + model_name="gpt-4.1-mini-2025-04-14", + response_id="shared", + llm_output_id="chatcmpl-SHARED", + ) + _run_callback_on_llm_end(callback, response_a) + _run_callback_on_llm_end(callback, response_b) + + fp_a = rt.track.call_args_list[0][0][0]["_fingerprint"] + fp_b = rt.track.call_args_list[1][0][0]["_fingerprint"] + assert fp_a == fp_b + + # And the dedup LRU recognises it as the same fingerprint. + state = make_dedup_state() + assert _fingerprint_is_seen(state, fp_a) is False + assert _fingerprint_is_seen(state, fp_b) is True + + +# --------------------------------------------------------------------------- +# Cross-observer contract: httpx transport and LangChain callback +# produce the SAME fingerprint for the same real LLM call. +# --------------------------------------------------------------------------- + + +def test_httpx_and_callback_fingerprints_collide_for_same_call(): + """End-to-end: the same OpenAI chat-completion id surfaces in both + the response body (read by the httpx transport) and in + ``response.llm_output["id"]`` (read by the LangChain callback). + Both observers MUST produce identical fingerprints so the dedup + LRU collapses the two emissions on the wire.""" + # 1. Drive the httpx transport with a real response body. + rt_http = MagicMock() + rt_http.track = MagicMock() + rt_http._seen_track_fingerprints = make_dedup_state() + patch_httpx(rt_http) + + body = _openai_chat_completion_response( + response_id="chatcmpl-CROSS-OBSERVER", + ) + with respx.mock(base_url="https://api.openai.com") as mock: + mock.post("/v1/chat/completions").mock( + return_value=httpx.Response(200, content=body) + ) + with httpx.Client(base_url="https://api.openai.com") as client: + client.post("/v1/chat/completions", json={"model": "gpt-4.1-mini"}) + + fp_http = rt_http.track.call_args_list[0][0][0]["_fingerprint"] + reset_for_tests() + + # 2. Drive the LangChain callback with the same id in llm_output. + rt_cb, callback = _build_callback_runtime() + response = _FakeLLMResult( + model_name="gpt-4.1-mini-2025-04-14", + response_id="ignored", + llm_output_id="chatcmpl-CROSS-OBSERVER", + ) + _run_callback_on_llm_end(callback, response) + fp_cb = _read_track_event(rt_cb)["_fingerprint"] + + # 3. The two fingerprints MUST be identical — that's the whole fix. + assert fp_http == fp_cb, ( + f"httpx transport fingerprint {fp_http!r} != " + f"callback fingerprint {fp_cb!r} — dedup will not collapse " + f"the two emissions and the dashboard's llm_call_count will " + f"be doubled." + ) + + # 4. And the dedup LRU actually collapses them when both fire. + state = make_dedup_state() + # First observation: unseen. + assert _fingerprint_is_seen(state, fp_http) is False + _fingerprint_is_seen(state, fp_http) + # Second observation (the sibling callback emission): seen. + assert _fingerprint_is_seen(state, fp_cb) is True \ No newline at end of file