From 2a4f0dc81e54ff9e905f484fc96907afa86c36fc Mon Sep 17 00:00:00 2001 From: Prajna1999 Date: Mon, 15 Jun 2026 18:35:45 +0530 Subject: [PATCH 1/4] feat: guardrails endpoint --- .../067_add_meta_and_guardrails_jobtype.py | 46 +++ .../api/docs/guardrails/apply_guardrails.md | 63 ++++ backend/app/api/main.py | 2 + backend/app/api/routes/guardrails.py | 206 +++++++++++ backend/app/celery/tasks/job_execution.py | 18 + backend/app/celery/utils.py | 18 + backend/app/crud/jobs.py | 9 +- backend/app/models/guardrails/__init__.py | 25 ++ backend/app/models/guardrails/request.py | 69 ++++ backend/app/models/guardrails/response.py | 78 ++++ backend/app/models/job.py | 19 +- backend/app/services/guardrails/__init__.py | 0 backend/app/services/guardrails/jobs.py | 340 ++++++++++++++++++ backend/app/services/llm/guardrails.py | 114 ++++++ backend/app/services/llm/jobs.py | 101 ++---- 15 files changed, 1037 insertions(+), 71 deletions(-) create mode 100644 backend/app/alembic/versions/067_add_meta_and_guardrails_jobtype.py create mode 100644 backend/app/api/docs/guardrails/apply_guardrails.md create mode 100644 backend/app/api/routes/guardrails.py create mode 100644 backend/app/models/guardrails/__init__.py create mode 100644 backend/app/models/guardrails/request.py create mode 100644 backend/app/models/guardrails/response.py create mode 100644 backend/app/services/guardrails/__init__.py create mode 100644 backend/app/services/guardrails/jobs.py diff --git a/backend/app/alembic/versions/067_add_meta_and_guardrails_jobtype.py b/backend/app/alembic/versions/067_add_meta_and_guardrails_jobtype.py new file mode 100644 index 000000000..2f240d46b --- /dev/null +++ b/backend/app/alembic/versions/067_add_meta_and_guardrails_jobtype.py @@ -0,0 +1,46 @@ +"""add job.meta JSONB column and LLM_GUARDRAILS jobtype enum value + +Revision ID: 067 +Revises: 066 +Create Date: 2026-06-15 00:00:00.000000 + +""" + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import postgresql + +revision = "067" +down_revision = "066" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # Add new enum value for guardrails-only jobs. ALTER TYPE ... ADD VALUE + # cannot run inside a transaction block, hence the autocommit guard. + with op.get_context().autocommit_block(): + op.execute("ALTER TYPE jobtype ADD VALUE IF NOT EXISTS 'LLM_GUARDRAILS'") + + # Add nullable meta JSONB column on job for per-job-type tracking payloads + # (e.g. guardrails request/response). Nullable + no default so it imposes + # zero cost on existing rows and on job types that do not write to it. + op.add_column( + "job", + sa.Column( + "meta", + postgresql.JSONB(astext_type=sa.Text()), + nullable=True, + comment=( + "Per-job-type tracking payload. For LLM_GUARDRAILS this stores " + "{'request': {...}, 'response': {...}} capturing the inbound " + "guardrails request and the upstream guardrails service response." + ), + ), + ) + + +def downgrade() -> None: + op.drop_column("job", "meta") + # NOTE: Postgres has no clean way to remove a single enum value. Leaving + # 'LLM_GUARDRAILS' on the type on downgrade is intentional and harmless. diff --git a/backend/app/api/docs/guardrails/apply_guardrails.md b/backend/app/api/docs/guardrails/apply_guardrails.md new file mode 100644 index 000000000..e943c5161 --- /dev/null +++ b/backend/app/api/docs/guardrails/apply_guardrails.md @@ -0,0 +1,63 @@ +Apply guardrails to a piece of text and deliver the sanitised result via a webhook callback. + +This endpoint exists for callers who manage their own LLM workflow but want to +reuse Kaapi's guardrails service. It is symmetric for input and output +guardrails: send the text that needs sanitisation in `text` along with one or +more `validator_config_id`s, and receive the sanitised text on your +`callback_url`. + +### Flow + +1. Caller POSTs `{text, guardrail_config, callback_url}` to `/api/v1/guardrails`. +2. Kaapi creates a job (`job_type=LLM_GUARDRAILS`), returns `job_id` with HTTP 200 + immediately. +3. A Celery worker resolves the validators, calls the guardrails service, and + POSTs the sanitised text (or a hard-block error) to `callback_url`. +4. The full upstream guardrails response and the original request body are + persisted on `job.meta` for traceability and can be inspected via + `GET /api/v1/guardrails/{job_id}`. + +### Webhook payload + +The webhook receives a standard `APIResponse` envelope: + +```json +{ + "success": true, + "data": { + "response": { + "response_id": "", + "output": { + "type": "text", + "content": { "format": "text", "value": "" } + } + }, + "usage": { + "input_tokens": 0, + "output_tokens": 0, + "total_tokens": 0, + "reasoning_tokens": 0 + }, + "provider_raw_response": null + }, + "error": null, + "metadata": { "": "...", "warnings": [] } +} +``` + +If the guardrails service hard-blocks the text, `success` is `false`, `error` +carries the upstream message, and `data` is `null`. If the guardrails service +is unreachable the job still succeeds but the webhook carries the original +text unchanged and `metadata.warnings` includes +`guardrails_service_unavailable_text_returned_unchanged`. + +### Notes + +- `guardrail_config[].type` and `guardrail_config[].tag` are caller-side + bookkeeping. They are not interpreted by the server but are useful for your + own correlation (echoed back via `request_metadata` if you include them + there). +- For output-guardrail flows that need the original prompt paired with the + LLM output, this endpoint v1 sends only `text`; pairing is not exposed. +- The same webhook signing scheme as `/llm/call` is used when a webhook secret + is configured. diff --git a/backend/app/api/main.py b/backend/app/api/main.py index 8ccf9b8d6..dc4ece395 100644 --- a/backend/app/api/main.py +++ b/backend/app/api/main.py @@ -16,6 +16,7 @@ evaluations, features, fine_tuning, + guardrails, languages, llm, llm_chain, @@ -54,6 +55,7 @@ api_router.include_router(languages.router) api_router.include_router(llm.router) api_router.include_router(llm_chain.router) +api_router.include_router(guardrails.router) api_router.include_router(login.router) api_router.include_router(model_config.router) api_router.include_router(model_evaluation.router) diff --git a/backend/app/api/routes/guardrails.py b/backend/app/api/routes/guardrails.py new file mode 100644 index 000000000..ab86cc3d0 --- /dev/null +++ b/backend/app/api/routes/guardrails.py @@ -0,0 +1,206 @@ +import logging +from uuid import UUID + +from fastapi import APIRouter, Depends, HTTPException +from opentelemetry import trace + +from app.api.deps import AuthContextDep, SessionDep +from app.api.permissions import Permission, require_permission +from app.core.rate_monitor import monitor_rate +from app.core.telemetry import log_context +from app.crud.jobs import JobCrud +from app.models import JobStatus, JobType +from app.models.guardrails import ( + GuardrailsCallbackData, + GuardrailsJobImmediatePublic, + GuardrailsJobPublic, + GuardrailsRequest, +) +from app.services.guardrails.jobs import start_job +from app.utils import APIResponse, load_description, validate_callback_url + +logger = logging.getLogger(__name__) + +router = APIRouter(tags=["Guardrails"]) +guardrails_callback_router = APIRouter() + + +@guardrails_callback_router.post( + "{$callback_url}", + name="guardrails_callback", +) +def guardrails_callback_notification(body: APIResponse[GuardrailsCallbackData]): + """ + Callback endpoint specification for /guardrails completion. + + The callback will receive: + - On success: APIResponse with success=True and data containing + GuardrailsCallbackData (sanitised text under data.response.output). + - On hard-block / failure: APIResponse with success=False and error set. + - metadata field will always include any request_metadata supplied with + the original request, plus a `warnings` list. + """ + ... + + +@router.post( + "/guardrails", + description=load_description("guardrails/apply_guardrails.md"), + response_model=APIResponse[GuardrailsJobImmediatePublic], + callbacks=guardrails_callback_router.routes, + dependencies=[ + Depends(require_permission(Permission.REQUIRE_PROJECT)), + Depends(monitor_rate("llm_call")), + ], +) +def apply_guardrails_endpoint( + _current_user: AuthContextDep, + session: SessionDep, + request: GuardrailsRequest, +): + """Initiate a guardrails-only job. Returns the job_id immediately; the + sanitised text is delivered via callback_url (or polled via GET).""" + project_id = _current_user.project_.id + organization_id = _current_user.organization_.id + + with log_context( + tag="guardrails", + system="guardrails", + lifecycle="api.guardrails.apply", + project_id=project_id, + organization_id=organization_id, + callback_enabled=request.callback_url is not None, + ): + span = trace.get_current_span() + if span.is_recording(): + span.set_attribute("kaapi.project_id", project_id) + span.set_attribute("kaapi.organization_id", organization_id) + span.set_attribute( + "guardrails.callback_enabled", request.callback_url is not None + ) + + if request.callback_url: + validate_callback_url(str(request.callback_url)) + + job_id = start_job( + db=session, + request=request, + project_id=project_id, + organization_id=organization_id, + ) + + if span.is_recording(): + span.set_attribute("guardrails.job_id", str(job_id)) + + job = JobCrud(session=session).get(job_id=job_id, project_id=project_id) + if not job: + raise HTTPException(status_code=404, detail="Job not found") + + message = ( + "Guardrails are being applied; the sanitised text will be delivered via callback." + if request.callback_url + else "Guardrails are being applied; poll GET /guardrails/{job_id} for the result." + ) + + return APIResponse.success_response( + data=GuardrailsJobImmediatePublic( + job_id=job.id, + status=job.status.value, + message=message, + job_inserted_at=job.inserted_at, + job_updated_at=job.updated_at, + ) + ) + + +@router.get( + "/guardrails/{job_id}", + response_model=APIResponse[GuardrailsJobPublic], + dependencies=[Depends(require_permission(Permission.REQUIRE_PROJECT))], +) +def get_guardrails_job_status( + _current_user: AuthContextDep, + session: SessionDep, + job_id: UUID, +) -> APIResponse[GuardrailsJobPublic]: + """Poll for a /guardrails job's status and result. + + On SUCCESS the sanitised text is rehydrated from the persisted upstream + response stored on ``job.meta``. + """ + project_id = _current_user.project_.id + + with log_context( + tag="guardrails", + system="guardrails", + lifecycle="api.guardrails.status", + job_id=job_id, + project_id=project_id, + organization_id=_current_user.organization_.id, + ): + job = JobCrud(session=session).get(job_id=job_id, project_id=project_id) + if not job or job.job_type != JobType.LLM_GUARDRAILS: + # Hide non-guardrails jobs behind a 404 so this endpoint cannot + # be used to enumerate or peek at LLM-call / chain job rows. + raise HTTPException(status_code=404, detail="Job not found") + + meta = job.meta if isinstance(job.meta, dict) else {} + callback_blob = meta.get("callback") if isinstance(meta, dict) else None + + warnings: list[str] = [] + if isinstance(callback_blob, dict): + raw_warnings = callback_blob.get("warnings") + if isinstance(raw_warnings, list): + warnings = [w for w in raw_warnings if isinstance(w, str)] + + guardrails_response: GuardrailsCallbackData | None = None + if job.status.value == JobStatus.SUCCESS: + response_blob = meta.get("response") or {} + data_blob = ( + response_blob.get("data") if isinstance(response_blob, dict) else None + ) or {} + safe_text = ( + data_blob.get("safe_text") if isinstance(data_blob, dict) else None + ) + request_blob = meta.get("request") or {} + original_text = ( + request_blob.get("text") if isinstance(request_blob, dict) else None + ) + value = safe_text if isinstance(safe_text, str) else (original_text or "") + + # Prefer the server-minted response_id stamped on the callback + # blob; fall back to None if (for older rows) it is absent. + response_id: str | None = None + if isinstance(callback_blob, dict): + rid = callback_blob.get("response_id") + if isinstance(rid, str): + response_id = rid + + guardrails_response = GuardrailsCallbackData.model_validate( + { + "response": { + "response_id": response_id, + "output": { + "type": "text", + "content": {"format": "text", "value": value}, + }, + }, + "usage": ( + data_blob.get("usage") + if isinstance(data_blob, dict) + and isinstance(data_blob.get("usage"), dict) + else {} + ), + "provider_raw_response": None, + } + ) + + return APIResponse.success_response( + data=GuardrailsJobPublic( + job_id=job.id, + status=job.status.value, + guardrails_response=guardrails_response, + error_message=job.error_message, + warnings=warnings, + ) + ) diff --git a/backend/app/celery/tasks/job_execution.py b/backend/app/celery/tasks/job_execution.py index 34a3f5878..26236c79e 100644 --- a/backend/app/celery/tasks/job_execution.py +++ b/backend/app/celery/tasks/job_execution.py @@ -96,6 +96,24 @@ def run_llm_chain_job(self, project_id: int, job_id: str, trace_id: str, **kwarg ) +@celery_app.task(bind=True, queue="high_priority", priority=9) +@gevent_timeout(settings.CELERY_TASK_SOFT_TIME_LIMIT, "run_guardrails_job") +def run_guardrails_job(self, project_id: int, job_id: str, trace_id: str, **kwargs): + from app.services.guardrails.jobs import execute_job + + _set_trace(trace_id) + return _run_with_otel_parent( + self, + lambda: execute_job( + project_id=project_id, + job_id=job_id, + task_id=current_task.request.id, + task_instance=self, + **kwargs, + ), + ) + + @celery_app.task(bind=True, queue="high_priority", priority=9) @gevent_timeout(settings.CELERY_TASK_SOFT_TIME_LIMIT, "run_response_job") def run_response_job(self, project_id: int, job_id: str, trace_id: str, **kwargs): diff --git a/backend/app/celery/utils.py b/backend/app/celery/utils.py index f643c7df6..05d314372 100644 --- a/backend/app/celery/utils.py +++ b/backend/app/celery/utils.py @@ -56,6 +56,24 @@ def start_llm_chain_job( return task_id +def start_guardrails_job( + project_id: int, job_id: str, trace_id: str = "N/A", **kwargs +) -> str: + from app.celery.tasks.job_execution import run_guardrails_job + + task_id = _enqueue_with_trace_context( + run_guardrails_job, + project_id=project_id, + job_id=job_id, + trace_id=trace_id, + **kwargs, + ) + logger.info( + f"[start_guardrails_job] Started job {job_id} with Celery task {task_id}" + ) + return task_id + + def start_response_job( project_id: int, job_id: str, trace_id: str = "N/A", **kwargs ) -> str: diff --git a/backend/app/crud/jobs.py b/backend/app/crud/jobs.py index 542446527..90348959b 100644 --- a/backend/app/crud/jobs.py +++ b/backend/app/crud/jobs.py @@ -1,4 +1,5 @@ import logging +from typing import Any from sqlmodel import Session from uuid import UUID @@ -17,8 +18,14 @@ def create( job_type: JobType, trace_id: str | None = None, project_id: int | None = None, + meta: dict[str, Any] | None = None, ) -> Job: - new_job = Job(job_type=job_type, trace_id=trace_id, project_id=project_id) + new_job = Job( + job_type=job_type, + trace_id=trace_id, + project_id=project_id, + meta=meta, + ) self.session.add(new_job) self.session.commit() self.session.refresh(new_job) diff --git a/backend/app/models/guardrails/__init__.py b/backend/app/models/guardrails/__init__.py new file mode 100644 index 000000000..423311150 --- /dev/null +++ b/backend/app/models/guardrails/__init__.py @@ -0,0 +1,25 @@ +from app.models.guardrails.request import ( + GuardrailValidator, + GuardrailsRequest, +) +from app.models.guardrails.response import ( + GuardrailsCallbackData, + GuardrailsCallbackResponse, + GuardrailsCallbackUsage, + GuardrailsJobImmediatePublic, + GuardrailsJobPublic, + GuardrailsOutput, + GuardrailsOutputContent, +) + +__all__ = [ + "GuardrailValidator", + "GuardrailsRequest", + "GuardrailsCallbackData", + "GuardrailsCallbackResponse", + "GuardrailsCallbackUsage", + "GuardrailsJobImmediatePublic", + "GuardrailsJobPublic", + "GuardrailsOutput", + "GuardrailsOutputContent", +] diff --git a/backend/app/models/guardrails/request.py b/backend/app/models/guardrails/request.py new file mode 100644 index 000000000..74a82b977 --- /dev/null +++ b/backend/app/models/guardrails/request.py @@ -0,0 +1,69 @@ +from typing import Any +from uuid import UUID + +from pydantic import HttpUrl +from sqlmodel import Field, SQLModel + + +class GuardrailValidator(SQLModel): + """Single validator reference inside a /guardrails request. + + Only ``validator_config_id`` is meaningful to the server. ``type`` and + ``tag`` are caller-side bookkeeping fields: they are accepted and + persisted on ``job.meta.request`` for traceability, but they are not + echoed back on the webhook payload. To correlate webhooks with + requests, put your correlation keys in ``request_metadata`` (which is + echoed under the callback's ``metadata`` field). + """ + + validator_config_id: UUID = Field( + ..., + description="ID of a validator configuration registered with the guardrails service.", + ) + type: str | None = Field( + default=None, + description=( + "Optional bookkeeping label for the caller (e.g. 'input_guardrail' " + "or 'output_guardrail'). Server does not interpret this field." + ), + ) + tag: str | None = Field( + default=None, + description="Optional caller-side tag. Server does not interpret this field.", + ) + + +class GuardrailsRequest(SQLModel): + """Request body for ``POST /api/v1/guardrails``. + + The endpoint is symmetric for input and output guardrails: send the text + that needs sanitisation in ``text`` along with the validator IDs to apply. + The sanitised text is delivered via the configured ``callback_url``. + """ + + text: str = Field( + ..., + min_length=1, + description="Text to validate/sanitise. May be a user prompt or an LLM response.", + ) + guardrail_config: list[GuardrailValidator] = Field( + ..., + min_length=1, + description="Validators to apply, identified by validator_config_id.", + ) + callback_url: HttpUrl | None = Field( + default=None, + description=( + "Webhook URL that will receive the sanitised result. When omitted " + "the caller must poll GET /guardrails/{job_id}." + ), + ) + request_metadata: dict[str, Any] | None = Field( + default=None, + description=( + "Client-provided metadata passed through unchanged in the callback's " + "`metadata` field. Use this to correlate callbacks with requests. " + "Note: the server appends a `warnings: [...]` key to this object " + "on the wire; any caller-supplied `warnings` key will be overwritten." + ), + ) diff --git a/backend/app/models/guardrails/response.py b/backend/app/models/guardrails/response.py new file mode 100644 index 000000000..954cea441 --- /dev/null +++ b/backend/app/models/guardrails/response.py @@ -0,0 +1,78 @@ +from datetime import datetime +from typing import Any, Literal +from uuid import UUID + +from sqlmodel import Field, SQLModel + + +class GuardrailsJobImmediatePublic(SQLModel): + """Immediate 200 response from POST /guardrails before the job runs.""" + + job_id: UUID + status: str + message: str + job_inserted_at: datetime + job_updated_at: datetime + + +class GuardrailsOutputContent(SQLModel): + format: Literal["text"] = "text" + value: str = Field( + ..., description="Sanitised text returned by the guardrails service." + ) + + +class GuardrailsOutput(SQLModel): + type: Literal["text"] = "text" + content: GuardrailsOutputContent + + +class GuardrailsCallbackUsage(SQLModel): + """Token usage reported by the upstream guardrails service, if any. + + All fields default to 0 so the callback shape stays stable when the + upstream payload omits usage data. + """ + + input_tokens: int = 0 + output_tokens: int = 0 + total_tokens: int = 0 + reasoning_tokens: int = 0 + + +class GuardrailsCallbackResponse(SQLModel): + response_id: str | None = Field( + default=None, + description="Response ID assigned by the guardrails service, if returned.", + ) + output: GuardrailsOutput + + +class GuardrailsCallbackData(SQLModel): + """Payload delivered to the webhook on completion. + + Wrapped by the standard APIResponse envelope: ``data`` carries this model, + ``metadata`` carries the request's ``metadata`` plus any server warnings. + """ + + response: GuardrailsCallbackResponse + usage: GuardrailsCallbackUsage = Field(default_factory=GuardrailsCallbackUsage) + provider_raw_response: dict[str, Any] | None = None + + +class GuardrailsJobPublic(SQLModel): + """Full job response for GET /guardrails/{job_id}.""" + + job_id: UUID + status: str + guardrails_response: GuardrailsCallbackData | None = None + error_message: str | None = None + warnings: list[str] = Field( + default_factory=list, + description=( + "Server-emitted warnings for this job (e.g. " + "'guardrails_service_unavailable_text_returned_unchanged'). " + "Mirrors the `metadata.warnings` field of the callback payload " + "so polling callers do not miss the bypass signal." + ), + ) diff --git a/backend/app/models/job.py b/backend/app/models/job.py index 3ea4d8650..32737e2d9 100644 --- a/backend/app/models/job.py +++ b/backend/app/models/job.py @@ -1,7 +1,10 @@ from datetime import datetime from enum import Enum +from typing import Any from uuid import UUID, uuid4 +from sqlalchemy import Column +from sqlalchemy.dialects.postgresql import JSONB from sqlmodel import Field, SQLModel from app.core.util import now @@ -18,6 +21,7 @@ class JobType(str, Enum): RESPONSE = "RESPONSE" LLM_API = "LLM_API" LLM_CHAIN = "LLM_CHAIN" + LLM_GUARDRAILS = "LLM_GUARDRAILS" class Job(SQLModel, table=True): @@ -63,9 +67,21 @@ class Job(SQLModel, table=True): job_type: JobType = Field( description="Type of job being executed (e.g., response, ingestion).", sa_column_kwargs={ - "comment": "Type of job being executed (e.g., RESPONSE, LLM_API)" + "comment": "Type of job being executed (e.g., RESPONSE, LLM_API, LLM_CHAIN, LLM_GUARDRAILS)" }, ) + meta: dict[str, Any] | None = Field( + default=None, + sa_column=Column( + JSONB, + nullable=True, + comment=( + "Per-job-type tracking payload. For LLM_GUARDRAILS this stores " + "{'request': {...}, 'response': {...}} capturing the inbound " + "guardrails request and the upstream guardrails service response." + ), + ), + ) # Timestamps inserted_at: datetime = Field( @@ -82,3 +98,4 @@ class JobUpdate(SQLModel): status: JobStatus | None = None error_message: str | None = None task_id: str | None = None + meta: dict[str, Any] | None = None diff --git a/backend/app/services/guardrails/__init__.py b/backend/app/services/guardrails/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/backend/app/services/guardrails/jobs.py b/backend/app/services/guardrails/jobs.py new file mode 100644 index 000000000..5e7b674c7 --- /dev/null +++ b/backend/app/services/guardrails/jobs.py @@ -0,0 +1,340 @@ +import logging +from typing import Any +from uuid import UUID + +from asgi_correlation_id import correlation_id +from fastapi import HTTPException +from opentelemetry import trace +from sqlmodel import Session + +from app.celery.utils import start_guardrails_job +from app.core.db import engine +from app.core.telemetry import log_context +from app.crud.jobs import JobCrud +from app.models import JobStatus, JobType, JobUpdate +from app.models.guardrails import ( + GuardrailsCallbackData, + GuardrailsCallbackResponse, + GuardrailsCallbackUsage, + GuardrailsOutput, + GuardrailsOutputContent, + GuardrailsRequest, +) +from app.models.llm.request import Validator +from app.services.llm.guardrails import apply_guardrails +from app.utils import APIResponse, get_webhook_secret, send_callback + +logger = logging.getLogger(__name__) +tracer = trace.get_tracer(__name__) + + +def start_job( + db: Session, + request: GuardrailsRequest, + project_id: int, + organization_id: int, +) -> UUID: + """Create a guardrails-only job and schedule its Celery task.""" + trace_id = correlation_id.get() or "N/A" + + with log_context( + tag="guardrails", + lifecycle="guardrails.start_job", + project_id=project_id, + organization_id=organization_id, + ), tracer.start_as_current_span("guardrails.start_job") as span: + span.set_attribute("kaapi.project_id", project_id) + span.set_attribute("kaapi.organization_id", organization_id) + + job_crud = JobCrud(session=db) + job = job_crud.create( + job_type=JobType.LLM_GUARDRAILS, + trace_id=trace_id, + project_id=project_id, + ) + span.set_attribute("guardrails.job_id", str(job.id)) + + # Persist the inbound request for traceability before the worker runs. + # We intentionally store the raw text: this endpoint exists to inspect + # potentially-unsafe content, so the request body itself is the audit log. + job_crud.update( + job_id=job.id, + job_update=JobUpdate(meta={"request": request.model_dump(mode="json")}), + ) + + try: + task_id = start_guardrails_job( + project_id=project_id, + job_id=str(job.id), + trace_id=trace_id, + request_data=request.model_dump(mode="json"), + organization_id=organization_id, + ) + except Exception as e: + span.record_exception(e) + span.set_status(trace.Status(trace.StatusCode.ERROR, str(e))) + logger.error( + f"[start_job] Error starting Celery task: {e} | job_id={job.id}, project_id={project_id}", + exc_info=True, + ) + job_crud.update( + job_id=job.id, + job_update=JobUpdate(status=JobStatus.FAILED, error_message=str(e)), + ) + raise HTTPException( + status_code=500, + detail="Internal server error while scheduling guardrails job", + ) + + logger.info( + f"[start_job] Job scheduled for guardrails | job_id={job.id}, " + f"project_id={project_id}, task_id={task_id}" + ) + return job.id + + +def _coerce_int(value: Any) -> int: + """Best-effort int coercion that tolerates malformed upstream usage payloads.""" + if value is None: + return 0 + try: + return int(value) + except (TypeError, ValueError): + return 0 + + +def _build_callback_payload( + *, + response_id: str, + safe_text: str, + raw: dict[str, Any], + request_metadata: dict[str, Any] | None, + warnings: list[str], +) -> dict[str, Any]: + """Construct the webhook payload in the documented /guardrails shape. + + The structure mirrors LLM-call callbacks so callers can reuse parsing: + ``data.response.output.content.value`` is always the sanitised text. + + ``response_id`` is generated server-side (a UUID4 string) so callers + always have a stable correlation handle even when the upstream guardrails + service omits an ID in its payload. + """ + upstream_data = raw.get("data") if isinstance(raw, dict) else None + usage_payload: dict[str, Any] = {} + if isinstance(upstream_data, dict): + raw_usage = upstream_data.get("usage") + if isinstance(raw_usage, dict): + usage_payload = raw_usage + + callback_data = GuardrailsCallbackData( + response=GuardrailsCallbackResponse( + response_id=response_id, + output=GuardrailsOutput( + content=GuardrailsOutputContent(value=safe_text), + ), + ), + usage=GuardrailsCallbackUsage( + input_tokens=_coerce_int(usage_payload.get("input_tokens")), + output_tokens=_coerce_int(usage_payload.get("output_tokens")), + total_tokens=_coerce_int(usage_payload.get("total_tokens")), + reasoning_tokens=_coerce_int(usage_payload.get("reasoning_tokens")), + ), + provider_raw_response=None, + ) + + metadata = dict(request_metadata or {}) + if "warnings" in metadata: + logger.info( + "[_build_callback_payload] Caller-supplied 'warnings' key in " + "request_metadata overwritten by server warnings." + ) + metadata["warnings"] = warnings + + return APIResponse.success_response( + data=callback_data.model_dump(mode="json"), + metadata=metadata, + ).model_dump() + + +def _send_failure_callback( + *, + callback_url: str | None, + error: str, + request_metadata: dict[str, Any] | None, + project_id: int, + organization_id: int, +) -> None: + if not callback_url: + return + metadata = dict(request_metadata or {}) + metadata.setdefault("warnings", []) + payload = APIResponse.failure_response(error=error, metadata=metadata).model_dump() + webhook_secret = get_webhook_secret(project_id, organization_id) + with tracer.start_as_current_span("guardrails.send_callback") as cb_span: + cb_span.set_attribute("callback.url", callback_url) + cb_span.set_attribute("callback.status", "failure") + send_callback( + callback_url=callback_url, + data=payload, + webhook_secret=webhook_secret, + ) + + +def execute_job( + *, + project_id: int, + job_id: str, + task_id: str, + task_instance: Any, + request_data: dict[str, Any], + organization_id: int, + **_: Any, +) -> dict[str, Any]: + """Celery worker entrypoint for /guardrails jobs. + + Resolves validators, calls the guardrails service via ``apply_guardrails``, + persists the upstream response on ``job.meta.response`` for traceability, + and dispatches the result to the caller's webhook. + """ + job_uuid = UUID(job_id) + request = GuardrailsRequest.model_validate(request_data) + callback_url = str(request.callback_url) if request.callback_url else None + + with log_context( + tag="guardrails", + lifecycle="guardrails.execute_job", + job_id=job_id, + project_id=project_id, + organization_id=organization_id, + ), tracer.start_as_current_span("guardrails.execute_job") as span: + span.set_attribute("guardrails.job_id", job_id) + span.set_attribute("kaapi.project_id", project_id) + span.set_attribute("kaapi.organization_id", organization_id) + + with Session(engine) as session: + JobCrud(session=session).update( + job_id=job_uuid, + job_update=JobUpdate(status=JobStatus.PROCESSING, task_id=task_id), + ) + + try: + # Deduplicate validator IDs while preserving submission order so a + # caller passing the same ID twice does not double-bill the + # upstream guardrails service. + seen_ids: set[UUID] = set() + validators: list[Validator] = [] + for g in request.guardrail_config: + if g.validator_config_id in seen_ids: + continue + seen_ids.add(g.validator_config_id) + validators.append(Validator(validator_config_id=g.validator_config_id)) + outcome = apply_guardrails( + text=request.text, + validators=validators, + job_id=job_uuid, + project_id=project_id, + organization_id=organization_id, + ) + except Exception as e: + logger.error( + f"[execute_job] Guardrails execution crashed | job_id={job_id}: {e}", + exc_info=True, + ) + span.record_exception(e) + span.set_status(trace.Status(trace.StatusCode.ERROR, str(e))) + with Session(engine) as session: + JobCrud(session=session).update( + job_id=job_uuid, + job_update=JobUpdate(status=JobStatus.FAILED, error_message=str(e)), + ) + _send_failure_callback( + callback_url=callback_url, + error=str(e), + request_metadata=request.request_metadata, + project_id=project_id, + organization_id=organization_id, + ) + return {"success": False, "error": str(e)} + + warnings: list[str] = [] + if outcome.bypassed: + warnings.append("guardrails_service_unavailable_text_returned_unchanged") + elif not outcome.raw and validators: + # apply_guardrails short-circuited with raw={} despite the caller + # sending non-empty validators — list_validators_config returned + # an empty list. The most common cause is the guardrails service + # being unreachable during the config-fetch step (it swallows + # transport errors and returns []). Surface this as a warning so + # callers do not mistake an effective no-op for a clean pass. + warnings.append("validator_configs_unresolved_text_returned_unchanged") + logger.warning( + f"[execute_job] Validators were submitted but none resolved against " + f"the guardrails service; returning original text. job_id={job_id}" + ) + + # Hard block: guardrails service rejected the text. + if outcome.error is not None: + with Session(engine) as session: + JobCrud(session=session).update( + job_id=job_uuid, + job_update=JobUpdate( + status=JobStatus.FAILED, + error_message=outcome.error, + meta={ + "request": request.model_dump(mode="json"), + "response": outcome.raw, + }, + ), + ) + _send_failure_callback( + callback_url=callback_url, + error=outcome.error, + request_metadata=request.request_metadata, + project_id=project_id, + organization_id=organization_id, + ) + return {"success": False, "error": outcome.error} + + safe_text = outcome.safe_text if outcome.safe_text is not None else request.text + # response_id is server-minted so callers always have a stable + # correlation handle, independent of whether the upstream guardrails + # service returns one. + response_id = str(uuid4()) + callback_payload = _build_callback_payload( + response_id=response_id, + safe_text=safe_text, + raw=outcome.raw, + request_metadata=request.request_metadata, + warnings=warnings, + ) + + with Session(engine) as session: + JobCrud(session=session).update( + job_id=job_uuid, + job_update=JobUpdate( + status=JobStatus.SUCCESS, + meta={ + "request": request.model_dump(mode="json"), + "response": outcome.raw, + "callback": { + "response_id": response_id, + "delivered": callback_url is not None, + "warnings": warnings, + }, + }, + ), + ) + + if callback_url: + webhook_secret = get_webhook_secret(project_id, organization_id) + with tracer.start_as_current_span("guardrails.send_callback") as cb_span: + cb_span.set_attribute("callback.url", callback_url) + cb_span.set_attribute("callback.status", "success") + send_callback( + callback_url=callback_url, + data=callback_payload, + webhook_secret=webhook_secret, + ) + + return callback_payload diff --git a/backend/app/services/llm/guardrails.py b/backend/app/services/llm/guardrails.py index 5c83b5254..9aaf409a0 100644 --- a/backend/app/services/llm/guardrails.py +++ b/backend/app/services/llm/guardrails.py @@ -1,3 +1,4 @@ +from dataclasses import dataclass, field from typing import Any from uuid import UUID import logging @@ -10,6 +11,119 @@ logger = logging.getLogger(__name__) +@dataclass +class GuardrailsOutcome: + """Result of a single guardrails service call, in domain-agnostic form. + + Callers map this onto their own domain types (QueryParams, BlockResult, + or a raw text response for the /guardrails endpoint). + """ + + safe_text: str | None + + error: str | None + + bypassed: bool + """True when the guardrails service was unreachable and we fell back to + the original text. Callers should treat this as a soft pass.""" + + rephrase_needed: bool + """Input-guardrails-only signal: when True, safe_text is a canned response + that should be returned directly to the user without invoking the LLM.""" + + raw: dict[str, Any] = field(default_factory=dict) + + @property + def applied(self) -> bool: + return bool(self.raw) and not self.bypassed + + +def apply_guardrails( + *, + text: str, + validators: list[Validator] | None, + job_id: UUID, + project_id: int | None, + organization_id: int | None, + output_text: str | None = None, +) -> GuardrailsOutcome: + """Resolve validator configs by ID and run validation against the + guardrails service. Transport-only — no domain wrappers. + + Used by: + - /llm/call and /llm/chain (via the apply_input_guardrails / + apply_output_guardrails adapters in app.services.llm.jobs) + - /guardrails (dedicated endpoint) directly + + Args: + text: Primary text to validate. Sent as `input` to the service. + validators: Validator references (config IDs). When None/empty the + function short-circuits with a no-op outcome. + output_text: When provided, sent as `output` to the service for + output-guardrail validators that compare input/output pairs. + Also routes the IDs through the output-config fetch path. + """ + if not validators: + return GuardrailsOutcome( + safe_text=text, error=None, bypassed=False, rephrase_needed=False, raw={} + ) + + is_output = output_text is not None + input_cfgs, output_cfgs = list_validators_config( + organization_id=organization_id, + project_id=project_id, + input_validator_configs=None if is_output else validators, + output_validator_configs=validators if is_output else None, + ) + resolved = output_cfgs if is_output else input_cfgs + if not resolved: + return GuardrailsOutcome( + safe_text=text, error=None, bypassed=False, rephrase_needed=False, raw={} + ) + + safe = run_guardrails_validation( + text, + resolved, + job_id, + project_id, + organization_id, + suppress_pass_logs=True, + output_text=output_text, + ) + + logger.info( + f"[apply_guardrails] Validation result | success={safe.get('success')}, " + f"bypassed={safe.get('bypassed', False)}, job_id={job_id}" + ) + + if safe.get("bypassed"): + return GuardrailsOutcome( + safe_text=text, + error=None, + bypassed=True, + rephrase_needed=False, + raw=safe, + ) + + if safe.get("success"): + data = safe.get("data", {}) or {} + return GuardrailsOutcome( + safe_text=data.get("safe_text", text), + error=None, + bypassed=False, + rephrase_needed=bool(data.get("rephrase_needed")), + raw=safe, + ) + + return GuardrailsOutcome( + safe_text=None, + error=safe.get("error"), + bypassed=False, + rephrase_needed=False, + raw=safe, + ) + + def run_guardrails_validation( input_text: str, guardrail_config: list[Validator | dict[str, Any]], diff --git a/backend/app/services/llm/jobs.py b/backend/app/services/llm/jobs.py index 94472abfe..f2fff9b6e 100644 --- a/backend/app/services/llm/jobs.py +++ b/backend/app/services/llm/jobs.py @@ -63,10 +63,7 @@ Usage, ) from app.services.llm.chain.types import BlockResult -from app.services.llm.guardrails import ( - list_validators_config, - run_guardrails_validation, -) +from app.services.llm.guardrails import apply_guardrails from app.services.llm.mappers import transform_kaapi_config_to_native from app.services.llm.providers.registry import get_llm_provider from app.utils import ( @@ -370,6 +367,9 @@ def apply_input_guardrails( ) -> tuple[QueryParams, str | None, str | None]: """Apply input guardrails from a config_blob. Shared with llm-call and llm-chain. + Thin adapter over ``apply_guardrails`` that maps the outcome onto a + ``QueryParams`` payload. + Returns (query, error, guardrail_direct_response) where: - error is set when guardrails hard-block the request - guardrail_direct_response is set when rephrase_needed=True and the safe_text @@ -386,46 +386,27 @@ def apply_input_guardrails( ) return query, None, None - input_guardrails, _ = list_validators_config( - organization_id=organization_id, + outcome = apply_guardrails( + text=query.input.content.value, + validators=config_blob.input_guardrails, + job_id=job_id, project_id=project_id, - input_validator_configs=config_blob.input_guardrails, - output_validator_configs=None, - ) - - if not input_guardrails: - return query, None, None - - safe = run_guardrails_validation( - query.input.content.value, - input_guardrails, - job_id, - project_id, - organization_id, - suppress_pass_logs=True, + organization_id=organization_id, ) - logger.info( - f"[apply_input_guardrails] Validation result | success={safe['success']}, job_id={job_id}" - ) + if outcome.error is not None: + return query, outcome.error, None - if safe.get("bypassed"): + if outcome.rephrase_needed: logger.info( - f"[apply_input_guardrails] Guardrails bypassed (service unavailable) | job_id={job_id}" + f"[apply_input_guardrails] rephrase_needed=True, returning safe_text directly | job_id={job_id}" ) - return query, None, None - - if safe["success"]: - safe_text = safe["data"]["safe_text"] - if safe["data"].get("rephrase_needed"): - logger.info( - f"[apply_input_guardrails] rephrase_needed=True, returning safe_text directly | job_id={job_id}" - ) - return query, None, safe_text - query.input.content.value = safe_text - return query, None, None + return query, None, outcome.safe_text - return query, safe["error"], None + # No-op paths (no validators, bypassed) leave the query untouched. + if outcome.applied and outcome.safe_text is not None: + query.input.content.value = outcome.safe_text + return query, None, None def apply_output_guardrails( @@ -439,6 +420,9 @@ def apply_output_guardrails( ) -> tuple[BlockResult, str | None]: """Apply output guardrails from a config_blob. Shared by /llm/call and /llm/chain. + Thin adapter over ``apply_guardrails`` that maps the outcome onto a + ``BlockResult``. + Returns (modified_result, None) on success, or (result, error_string) on failure. """ if not config_blob or not config_blob.output_guardrails: @@ -452,42 +436,21 @@ def apply_output_guardrails( ) return result, None - _, output_guardrails = list_validators_config( - organization_id=organization_id, + outcome = apply_guardrails( + text=input_text or "", + validators=config_blob.output_guardrails, + job_id=job_id, project_id=project_id, - input_validator_configs=None, - output_validator_configs=config_blob.output_guardrails, - ) - - if not output_guardrails: - return result, None - - llm_output = result.response.response.output.content.value - safe = run_guardrails_validation( - input_text or "", - output_guardrails, - job_id, - project_id, - organization_id, - suppress_pass_logs=True, - output_text=llm_output, - ) - - logger.info( - f"[apply_output_guardrails] Validation result | success={safe['success']}, job_id={job_id}" + organization_id=organization_id, + output_text=result.response.response.output.content.value, ) - if safe.get("bypassed"): - logger.info( - f"[apply_output_guardrails] Guardrails bypassed (service unavailable) | job_id={job_id}" - ) - return result, None - - if safe["success"]: - result.response.response.output.content.value = safe["data"]["safe_text"] - return result, None + if outcome.error is not None: + return result, outcome.error - return result, safe["error"] + if outcome.applied and outcome.safe_text is not None: + result.response.response.output.content.value = outcome.safe_text + return result, None def execute_llm_call( From a5485deab92fcda1a6e117f61fe5ce0b5dc2484f Mon Sep 17 00:00:00 2001 From: Prajna Prayas Date: Thu, 18 Jun 2026 08:26:15 +0530 Subject: [PATCH 2/4] Update backend/app/services/guardrails/jobs.py Co-authored-by: Ayush <80516839+Ayush8923@users.noreply.github.com> --- backend/app/services/guardrails/jobs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/app/services/guardrails/jobs.py b/backend/app/services/guardrails/jobs.py index 5e7b674c7..cee684418 100644 --- a/backend/app/services/guardrails/jobs.py +++ b/backend/app/services/guardrails/jobs.py @@ -1,6 +1,6 @@ import logging from typing import Any -from uuid import UUID +from uuid import UUID, uuid4 from asgi_correlation_id import correlation_id from fastapi import HTTPException From 14ba05541af29267e9cb6652a658fa681213d7db Mon Sep 17 00:00:00 2001 From: Prajna Prayas Date: Thu, 18 Jun 2026 08:28:06 +0530 Subject: [PATCH 3/4] Apply suggestion from @Ayush8923 Co-authored-by: Ayush <80516839+Ayush8923@users.noreply.github.com> --- backend/app/api/routes/guardrails.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/app/api/routes/guardrails.py b/backend/app/api/routes/guardrails.py index ab86cc3d0..b0a76ac2e 100644 --- a/backend/app/api/routes/guardrails.py +++ b/backend/app/api/routes/guardrails.py @@ -57,7 +57,7 @@ def apply_guardrails_endpoint( _current_user: AuthContextDep, session: SessionDep, request: GuardrailsRequest, -): +) -> APIResponse[GuardrailsJobImmediatePublic]: """Initiate a guardrails-only job. Returns the job_id immediately; the sanitised text is delivered via callback_url (or polled via GET).""" project_id = _current_user.project_.id From 176679be9d3a738c17a32e61ae611ccc9a0bd757 Mon Sep 17 00:00:00 2001 From: Prajna1999 Date: Thu, 18 Jun 2026 12:45:39 +0530 Subject: [PATCH 4/4] fix: fix comments and add logging --- ...=> 068_add_meta_and_guardrails_jobtype.py} | 8 +- .../api/docs/guardrails/apply_guardrails.md | 10 +- backend/app/api/routes/guardrails.py | 9 +- backend/app/models/guardrails/request.py | 2 +- backend/app/services/guardrails/jobs.py | 112 ++++++++++++------ backend/app/services/llm/guardrails.py | 64 ++++++++-- 6 files changed, 144 insertions(+), 61 deletions(-) rename backend/app/alembic/versions/{067_add_meta_and_guardrails_jobtype.py => 068_add_meta_and_guardrails_jobtype.py} (95%) diff --git a/backend/app/alembic/versions/067_add_meta_and_guardrails_jobtype.py b/backend/app/alembic/versions/068_add_meta_and_guardrails_jobtype.py similarity index 95% rename from backend/app/alembic/versions/067_add_meta_and_guardrails_jobtype.py rename to backend/app/alembic/versions/068_add_meta_and_guardrails_jobtype.py index 2f240d46b..6285204be 100644 --- a/backend/app/alembic/versions/067_add_meta_and_guardrails_jobtype.py +++ b/backend/app/alembic/versions/068_add_meta_and_guardrails_jobtype.py @@ -1,7 +1,7 @@ """add job.meta JSONB column and LLM_GUARDRAILS jobtype enum value -Revision ID: 067 -Revises: 066 +Revision ID: 068 +Revises: 067 Create Date: 2026-06-15 00:00:00.000000 """ @@ -10,8 +10,8 @@ from alembic import op from sqlalchemy.dialects import postgresql -revision = "067" -down_revision = "066" +revision = "068" +down_revision = "067" branch_labels = None depends_on = None diff --git a/backend/app/api/docs/guardrails/apply_guardrails.md b/backend/app/api/docs/guardrails/apply_guardrails.md index e943c5161..62ff35f91 100644 --- a/backend/app/api/docs/guardrails/apply_guardrails.md +++ b/backend/app/api/docs/guardrails/apply_guardrails.md @@ -8,7 +8,7 @@ more `validator_config_id`s, and receive the sanitised text on your ### Flow -1. Caller POSTs `{text, guardrail_config, callback_url}` to `/api/v1/guardrails`. +1. Caller POSTs `{text, config, callback_url}` to `/api/v1/guardrails`. 2. Kaapi creates a job (`job_type=LLM_GUARDRAILS`), returns `job_id` with HTTP 200 immediately. 3. A Celery worker resolves the validators, calls the guardrails service, and @@ -48,12 +48,14 @@ The webhook receives a standard `APIResponse` envelope: If the guardrails service hard-blocks the text, `success` is `false`, `error` carries the upstream message, and `data` is `null`. If the guardrails service is unreachable the job still succeeds but the webhook carries the original -text unchanged and `metadata.warnings` includes -`guardrails_service_unavailable_text_returned_unchanged`. +text unchanged and `metadata.warnings` carries a human-readable note (e.g. +`"Guardrails service was unavailable; original text was returned unchanged."`). +Other warnings may surface for duplicate validator IDs, an empty validator +list, or a missing sanitised text in the upstream response. ### Notes -- `guardrail_config[].type` and `guardrail_config[].tag` are caller-side +- `config[].type` and `config[].tag` are caller-side bookkeeping. They are not interpreted by the server but are useful for your own correlation (echoed back via `request_metadata` if you include them there). diff --git a/backend/app/api/routes/guardrails.py b/backend/app/api/routes/guardrails.py index b0a76ac2e..e22db3410 100644 --- a/backend/app/api/routes/guardrails.py +++ b/backend/app/api/routes/guardrails.py @@ -57,7 +57,7 @@ def apply_guardrails_endpoint( _current_user: AuthContextDep, session: SessionDep, request: GuardrailsRequest, -) -> APIResponse[GuardrailsJobImmediatePublic]: +) -> APIResponse[GuardrailsJobImmediatePublic]: """Initiate a guardrails-only job. Returns the job_id immediately; the sanitised text is delivered via callback_url (or polled via GET).""" project_id = _current_user.project_.id @@ -140,8 +140,7 @@ def get_guardrails_job_status( ): job = JobCrud(session=session).get(job_id=job_id, project_id=project_id) if not job or job.job_type != JobType.LLM_GUARDRAILS: - # Hide non-guardrails jobs behind a 404 so this endpoint cannot - # be used to enumerate or peek at LLM-call / chain job rows. + # 404 (not 403) to avoid leaking existence of non-guardrails jobs. raise HTTPException(status_code=404, detail="Job not found") meta = job.meta if isinstance(job.meta, dict) else {} @@ -154,7 +153,7 @@ def get_guardrails_job_status( warnings = [w for w in raw_warnings if isinstance(w, str)] guardrails_response: GuardrailsCallbackData | None = None - if job.status.value == JobStatus.SUCCESS: + if job.status == JobStatus.SUCCESS: response_blob = meta.get("response") or {} data_blob = ( response_blob.get("data") if isinstance(response_blob, dict) else None @@ -168,8 +167,6 @@ def get_guardrails_job_status( ) value = safe_text if isinstance(safe_text, str) else (original_text or "") - # Prefer the server-minted response_id stamped on the callback - # blob; fall back to None if (for older rows) it is absent. response_id: str | None = None if isinstance(callback_blob, dict): rid = callback_blob.get("response_id") diff --git a/backend/app/models/guardrails/request.py b/backend/app/models/guardrails/request.py index 74a82b977..04fb6916c 100644 --- a/backend/app/models/guardrails/request.py +++ b/backend/app/models/guardrails/request.py @@ -46,7 +46,7 @@ class GuardrailsRequest(SQLModel): min_length=1, description="Text to validate/sanitise. May be a user prompt or an LLM response.", ) - guardrail_config: list[GuardrailValidator] = Field( + config: list[GuardrailValidator] = Field( ..., min_length=1, description="Validators to apply, identified by validator_config_id.", diff --git a/backend/app/services/guardrails/jobs.py b/backend/app/services/guardrails/jobs.py index cee684418..a02132632 100644 --- a/backend/app/services/guardrails/jobs.py +++ b/backend/app/services/guardrails/jobs.py @@ -36,6 +36,11 @@ def start_job( ) -> UUID: """Create a guardrails-only job and schedule its Celery task.""" trace_id = correlation_id.get() or "N/A" + logger.debug( + f"[start_job] Received guardrails request | text_len={len(request.text)}, " + f"validators={len(request.config)}, callback={request.callback_url is not None}, " + f"project_id={project_id}, organization_id={organization_id}" + ) with log_context( tag="guardrails", @@ -47,19 +52,16 @@ def start_job( span.set_attribute("kaapi.organization_id", organization_id) job_crud = JobCrud(session=db) + # Raw text persisted intentionally: this endpoint inspects unsafe content. job = job_crud.create( job_type=JobType.LLM_GUARDRAILS, trace_id=trace_id, project_id=project_id, + meta={"request": request.model_dump(mode="json")}, ) span.set_attribute("guardrails.job_id", str(job.id)) - - # Persist the inbound request for traceability before the worker runs. - # We intentionally store the raw text: this endpoint exists to inspect - # potentially-unsafe content, so the request body itself is the audit log. - job_crud.update( - job_id=job.id, - job_update=JobUpdate(meta={"request": request.model_dump(mode="json")}), + logger.debug( + f"[start_job] Job row created | job_id={job.id}, trace_id={trace_id}" ) try: @@ -111,15 +113,7 @@ def _build_callback_payload( request_metadata: dict[str, Any] | None, warnings: list[str], ) -> dict[str, Any]: - """Construct the webhook payload in the documented /guardrails shape. - - The structure mirrors LLM-call callbacks so callers can reuse parsing: - ``data.response.output.content.value`` is always the sanitised text. - - ``response_id`` is generated server-side (a UUID4 string) so callers - always have a stable correlation handle even when the upstream guardrails - service omits an ID in its payload. - """ + """Build the /guardrails webhook payload; sanitised text at data.response.output.content.value.""" upstream_data = raw.get("data") if isinstance(raw, dict) else None usage_payload: dict[str, Any] = {} if isinstance(upstream_data, dict): @@ -174,6 +168,10 @@ def _send_failure_callback( with tracer.start_as_current_span("guardrails.send_callback") as cb_span: cb_span.set_attribute("callback.url", callback_url) cb_span.set_attribute("callback.status", "failure") + logger.debug( + f"[_send_failure_callback] Dispatching failure callback | " + f"callback_url={callback_url}, error={error}" + ) send_callback( callback_url=callback_url, data=payload, @@ -191,15 +189,15 @@ def execute_job( organization_id: int, **_: Any, ) -> dict[str, Any]: - """Celery worker entrypoint for /guardrails jobs. - - Resolves validators, calls the guardrails service via ``apply_guardrails``, - persists the upstream response on ``job.meta.response`` for traceability, - and dispatches the result to the caller's webhook. - """ + """Celery worker entrypoint for /guardrails jobs.""" job_uuid = UUID(job_id) request = GuardrailsRequest.model_validate(request_data) callback_url = str(request.callback_url) if request.callback_url else None + logger.debug( + f"[execute_job] Picked up job | job_id={job_id}, task_id={task_id}, " + f"text_len={len(request.text)}, validators={len(request.config)}, " + f"callback={callback_url is not None}" + ) with log_context( tag="guardrails", @@ -218,17 +216,33 @@ def execute_job( job_update=JobUpdate(status=JobStatus.PROCESSING, task_id=task_id), ) + warnings: list[str] = [] + try: - # Deduplicate validator IDs while preserving submission order so a - # caller passing the same ID twice does not double-bill the - # upstream guardrails service. + # Dedupe validator IDs (preserve order) to avoid double-billing upstream. seen_ids: set[UUID] = set() validators: list[Validator] = [] - for g in request.guardrail_config: + duplicates = 0 + for g in request.config: if g.validator_config_id in seen_ids: + duplicates += 1 continue seen_ids.add(g.validator_config_id) validators.append(Validator(validator_config_id=g.validator_config_id)) + if duplicates: + warnings.append( + f"Request contained {duplicates} duplicate validator_config_id " + "entries; duplicates were ignored before calling the guardrails service." + ) + if not validators: + warnings.append( + "Request contained no usable validators after deduplication; " + "original text was returned unchanged." + ) + logger.debug( + f"[execute_job] Calling guardrails service | job_id={job_id}, " + f"unique_validators={len(validators)}, duplicates_skipped={duplicates}" + ) outcome = apply_guardrails( text=request.text, validators=validators, @@ -236,6 +250,12 @@ def execute_job( project_id=project_id, organization_id=organization_id, ) + logger.debug( + f"[execute_job] Guardrails outcome | job_id={job_id}, " + f"bypassed={outcome.bypassed}, has_error={outcome.error is not None}, " + f"safe_text_present={outcome.safe_text is not None}, " + f"raw_keys={list(outcome.raw.keys()) if isinstance(outcome.raw, dict) else None}" + ) except Exception as e: logger.error( f"[execute_job] Guardrails execution crashed | job_id={job_id}: {e}", @@ -257,24 +277,33 @@ def execute_job( ) return {"success": False, "error": str(e)} - warnings: list[str] = [] if outcome.bypassed: - warnings.append("guardrails_service_unavailable_text_returned_unchanged") + warnings.append( + "Guardrails service was unavailable; original text was returned unchanged." + ) elif not outcome.raw and validators: - # apply_guardrails short-circuited with raw={} despite the caller - # sending non-empty validators — list_validators_config returned - # an empty list. The most common cause is the guardrails service - # being unreachable during the config-fetch step (it swallows - # transport errors and returns []). Surface this as a warning so - # callers do not mistake an effective no-op for a clean pass. - warnings.append("validator_configs_unresolved_text_returned_unchanged") + # Validators submitted but none resolved — likely upstream unreachable. + warnings.append( + "Validators were submitted but none resolved against the guardrails " + "service; original text was returned unchanged." + ) logger.warning( f"[execute_job] Validators were submitted but none resolved against " f"the guardrails service; returning original text. job_id={job_id}" ) + if outcome.error is None and outcome.safe_text is None and validators: + warnings.append( + "Guardrails service did not return a sanitised text; original text " + "was returned unchanged." + ) + # Hard block: guardrails service rejected the text. if outcome.error is not None: + logger.info( + f"[execute_job] Guardrails hard-blocked | job_id={job_id}, " + f"error={outcome.error}" + ) with Session(engine) as session: JobCrud(session=session).update( job_id=job_uuid, @@ -297,9 +326,7 @@ def execute_job( return {"success": False, "error": outcome.error} safe_text = outcome.safe_text if outcome.safe_text is not None else request.text - # response_id is server-minted so callers always have a stable - # correlation handle, independent of whether the upstream guardrails - # service returns one. + # Server-minted so callers always have a stable correlation handle. response_id = str(uuid4()) callback_payload = _build_callback_payload( response_id=response_id, @@ -326,11 +353,20 @@ def execute_job( ), ) + logger.info( + f"[execute_job] Job completed | job_id={job_id}, " + f"warnings={len(warnings)}, callback={callback_url is not None}" + ) + if callback_url: webhook_secret = get_webhook_secret(project_id, organization_id) with tracer.start_as_current_span("guardrails.send_callback") as cb_span: cb_span.set_attribute("callback.url", callback_url) cb_span.set_attribute("callback.status", "success") + logger.debug( + f"[execute_job] Dispatching success callback | job_id={job_id}, " + f"callback_url={callback_url}" + ) send_callback( callback_url=callback_url, data=callback_payload, diff --git a/backend/app/services/llm/guardrails.py b/backend/app/services/llm/guardrails.py index 9aaf409a0..68d99c27f 100644 --- a/backend/app/services/llm/guardrails.py +++ b/backend/app/services/llm/guardrails.py @@ -1,7 +1,9 @@ +import json +import logging +import time from dataclasses import dataclass, field from typing import Any from uuid import UUID -import logging import httpx @@ -77,6 +79,12 @@ def apply_guardrails( ) resolved = output_cfgs if is_output else input_cfgs if not resolved: + logger.info( + f"[apply_guardrails] No validator configs resolved upstream; skipping " + f"POST /guardrails. job_id={job_id}, requested_ids=" + f"{[str(v.validator_config_id) for v in validators]}, " + f"is_output={is_output}" + ) return GuardrailsOutcome( safe_text=text, error=None, bypassed=False, rephrase_needed=False, raw={} ) @@ -173,20 +181,45 @@ def run_guardrails_validation( "Content-Type": "application/json", } + url = f"{settings.KAAPI_GUARDRAILS_URL}" + payload_bytes = json.dumps(payload).encode() + logger.info( + f"[run_guardrails_validation] POST guardrails | job_id={job_id}, url={url}, " + f"validators={len(validators)}, input_len={len(input_text)}, " + f"output_len={len(output_text) if output_text else 0}, " + f"payload_bytes={len(payload_bytes)}" + ) + logger.info( + f"[run_guardrails_validation] Request payload | job_id={job_id}, " + f"payload={json.dumps(payload)}" + ) + + started = time.monotonic() try: with httpx.Client(timeout=45.0) as client: response = client.post( - f"{settings.KAAPI_GUARDRAILS_URL}/", + url, json=payload, params={"suppress_pass_logs": str(suppress_pass_logs).lower()}, headers=headers, ) - + elapsed_ms = int((time.monotonic() - started) * 1000) + logger.info( + f"[run_guardrails_validation] Response received | job_id={job_id}, " + f"status={response.status_code}, elapsed_ms={elapsed_ms}, " + f"response_bytes={len(response.content)}" + ) + logger.info( + f"[run_guardrails_validation] Response body | job_id={job_id}, " + f"body={response.text}" + ) response.raise_for_status() return response.json() except Exception as e: + elapsed_ms = int((time.monotonic() - started) * 1000) logger.warning( - f"[run_guardrails_validation] Service unavailable. Bypassing guardrails. job_id={job_id}. error={e}" + f"[run_guardrails_validation] Service unavailable. Bypassing guardrails. " + f"job_id={job_id}, elapsed_ms={elapsed_ms}, error={e}" ) return { @@ -246,10 +279,25 @@ def _fetch_by_ids(validator_ids: list[UUID]) -> list[dict[str, Any]]: if not validator_ids: return [] - response = client.get( - endpoint, - params=_build_params(validator_ids), - headers=headers, + params = _build_params(validator_ids) + logger.info( + f"[list_validators_config] GET validator configs | " + f"endpoint={endpoint}, ids={len(validator_ids)}, " + f"organization_id={organization_id}, project_id={project_id}" + ) + logger.debug( + f"[list_validators_config] Request params | params={params}" + ) + started = time.monotonic() + response = client.get(endpoint, params=params, headers=headers) + elapsed_ms = int((time.monotonic() - started) * 1000) + logger.info( + f"[list_validators_config] Response received | " + f"status={response.status_code}, elapsed_ms={elapsed_ms}, " + f"response_bytes={len(response.content)}" + ) + logger.info( + f"[list_validators_config] Response body | body={response.text}" ) response.raise_for_status()