Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""add job.meta JSONB column and LLM_GUARDRAILS jobtype enum value

Revision ID: 068
Revises: 067
Create Date: 2026-06-15 00:00:00.000000

"""

import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql

revision = "068"
down_revision = "067"
branch_labels = None
depends_on = None


def upgrade() -> None:
# Add new enum value for guardrails-only jobs. ALTER TYPE ... ADD VALUE
# cannot run inside a transaction block, hence the autocommit guard.
with op.get_context().autocommit_block():
op.execute("ALTER TYPE jobtype ADD VALUE IF NOT EXISTS 'LLM_GUARDRAILS'")

# Add nullable meta JSONB column on job for per-job-type tracking payloads
# (e.g. guardrails request/response). Nullable + no default so it imposes
# zero cost on existing rows and on job types that do not write to it.
op.add_column(
"job",
sa.Column(
"meta",
postgresql.JSONB(astext_type=sa.Text()),
nullable=True,
comment=(
"Per-job-type tracking payload. For LLM_GUARDRAILS this stores "
"{'request': {...}, 'response': {...}} capturing the inbound "
"guardrails request and the upstream guardrails service response."
),
),
)


def downgrade() -> None:
op.drop_column("job", "meta")
# NOTE: Postgres has no clean way to remove a single enum value. Leaving
# 'LLM_GUARDRAILS' on the type on downgrade is intentional and harmless.
65 changes: 65 additions & 0 deletions backend/app/api/docs/guardrails/apply_guardrails.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
Apply guardrails to a piece of text and deliver the sanitised result via a webhook callback.

This endpoint exists for callers who manage their own LLM workflow but want to
reuse Kaapi's guardrails service. It is symmetric for input and output
guardrails: send the text that needs sanitisation in `text` along with one or
more `validator_config_id`s, and receive the sanitised text on your
`callback_url`.

### Flow

1. Caller POSTs `{text, config, callback_url}` to `/api/v1/guardrails`.
2. Kaapi creates a job (`job_type=LLM_GUARDRAILS`), returns `job_id` with HTTP 200
immediately.
3. A Celery worker resolves the validators, calls the guardrails service, and
POSTs the sanitised text (or a hard-block error) to `callback_url`.
4. The full upstream guardrails response and the original request body are
persisted on `job.meta` for traceability and can be inspected via
`GET /api/v1/guardrails/{job_id}`.

### Webhook payload

The webhook receives a standard `APIResponse` envelope:

```json
{
"success": true,
"data": {
"response": {
"response_id": "<guardrails-response-id-or-null>",
"output": {
"type": "text",
"content": { "format": "text", "value": "<sanitised text>" }
}
},
"usage": {
"input_tokens": 0,
"output_tokens": 0,
"total_tokens": 0,
"reasoning_tokens": 0
},
"provider_raw_response": null
},
"error": null,
"metadata": { "<your request_metadata>": "...", "warnings": [] }
}
```

If the guardrails service hard-blocks the text, `success` is `false`, `error`
carries the upstream message, and `data` is `null`. If the guardrails service
is unreachable the job still succeeds but the webhook carries the original
text unchanged and `metadata.warnings` carries a human-readable note (e.g.
`"Guardrails service was unavailable; original text was returned unchanged."`).
Other warnings may surface for duplicate validator IDs, an empty validator
list, or a missing sanitised text in the upstream response.

### Notes

- `config[].type` and `config[].tag` are caller-side
bookkeeping. They are not interpreted by the server but are useful for your
own correlation (echoed back via `request_metadata` if you include them
there).
- For output-guardrail flows that need the original prompt paired with the
LLM output, this endpoint v1 sends only `text`; pairing is not exposed.
- The same webhook signing scheme as `/llm/call` is used when a webhook secret
is configured.
2 changes: 2 additions & 0 deletions backend/app/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
evaluations,
features,
fine_tuning,
guardrails,
languages,
llm,
llm_chain,
Expand Down Expand Up @@ -55,6 +56,7 @@
api_router.include_router(languages.router)
api_router.include_router(llm.router)
api_router.include_router(llm_chain.router)
api_router.include_router(guardrails.router)
api_router.include_router(llm_sts.router)
api_router.include_router(login.router)
api_router.include_router(model_config.router)
Expand Down
203 changes: 203 additions & 0 deletions backend/app/api/routes/guardrails.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
import logging
from uuid import UUID

from fastapi import APIRouter, Depends, HTTPException
from opentelemetry import trace

from app.api.deps import AuthContextDep, SessionDep
from app.api.permissions import Permission, require_permission
from app.core.rate_monitor import monitor_rate
from app.core.telemetry import log_context
from app.crud.jobs import JobCrud
from app.models import JobStatus, JobType
from app.models.guardrails import (
GuardrailsCallbackData,
GuardrailsJobImmediatePublic,
GuardrailsJobPublic,
GuardrailsRequest,
)
from app.services.guardrails.jobs import start_job
from app.utils import APIResponse, load_description, validate_callback_url

logger = logging.getLogger(__name__)

router = APIRouter(tags=["Guardrails"])
guardrails_callback_router = APIRouter()


@guardrails_callback_router.post(
"{$callback_url}",
name="guardrails_callback",
)
def guardrails_callback_notification(body: APIResponse[GuardrailsCallbackData]):
"""
Callback endpoint specification for /guardrails completion.

The callback will receive:
- On success: APIResponse with success=True and data containing
GuardrailsCallbackData (sanitised text under data.response.output).
- On hard-block / failure: APIResponse with success=False and error set.
- metadata field will always include any request_metadata supplied with
the original request, plus a `warnings` list.
"""
...


@router.post(
"/guardrails",
description=load_description("guardrails/apply_guardrails.md"),
response_model=APIResponse[GuardrailsJobImmediatePublic],
callbacks=guardrails_callback_router.routes,
dependencies=[
Depends(require_permission(Permission.REQUIRE_PROJECT)),
Depends(monitor_rate("llm_call")),
],
)
def apply_guardrails_endpoint(
_current_user: AuthContextDep,
session: SessionDep,
request: GuardrailsRequest,
) -> APIResponse[GuardrailsJobImmediatePublic]:
"""Initiate a guardrails-only job. Returns the job_id immediately; the
sanitised text is delivered via callback_url (or polled via GET)."""
project_id = _current_user.project_.id
organization_id = _current_user.organization_.id

with log_context(
tag="guardrails",
system="guardrails",
lifecycle="api.guardrails.apply",
project_id=project_id,
organization_id=organization_id,
callback_enabled=request.callback_url is not None,
):
span = trace.get_current_span()
if span.is_recording():
span.set_attribute("kaapi.project_id", project_id)
span.set_attribute("kaapi.organization_id", organization_id)
span.set_attribute(
"guardrails.callback_enabled", request.callback_url is not None
)

if request.callback_url:
validate_callback_url(str(request.callback_url))

job_id = start_job(
db=session,
request=request,
project_id=project_id,
organization_id=organization_id,
)

if span.is_recording():
span.set_attribute("guardrails.job_id", str(job_id))

job = JobCrud(session=session).get(job_id=job_id, project_id=project_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")

message = (
"Guardrails are being applied; the sanitised text will be delivered via callback."
if request.callback_url
else "Guardrails are being applied; poll GET /guardrails/{job_id} for the result."
)

return APIResponse.success_response(
data=GuardrailsJobImmediatePublic(
job_id=job.id,
status=job.status.value,
message=message,
job_inserted_at=job.inserted_at,
job_updated_at=job.updated_at,
)
)


@router.get(
"/guardrails/{job_id}",
response_model=APIResponse[GuardrailsJobPublic],
dependencies=[Depends(require_permission(Permission.REQUIRE_PROJECT))],
)
def get_guardrails_job_status(
_current_user: AuthContextDep,
session: SessionDep,
job_id: UUID,
) -> APIResponse[GuardrailsJobPublic]:
"""Poll for a /guardrails job's status and result.

On SUCCESS the sanitised text is rehydrated from the persisted upstream
response stored on ``job.meta``.
"""
project_id = _current_user.project_.id

with log_context(
tag="guardrails",
system="guardrails",
lifecycle="api.guardrails.status",
job_id=job_id,
project_id=project_id,
organization_id=_current_user.organization_.id,
):
job = JobCrud(session=session).get(job_id=job_id, project_id=project_id)
if not job or job.job_type != JobType.LLM_GUARDRAILS:
# 404 (not 403) to avoid leaking existence of non-guardrails jobs.
raise HTTPException(status_code=404, detail="Job not found")

meta = job.meta if isinstance(job.meta, dict) else {}
callback_blob = meta.get("callback") if isinstance(meta, dict) else None

warnings: list[str] = []
if isinstance(callback_blob, dict):
raw_warnings = callback_blob.get("warnings")
if isinstance(raw_warnings, list):
warnings = [w for w in raw_warnings if isinstance(w, str)]

guardrails_response: GuardrailsCallbackData | None = None
if job.status == JobStatus.SUCCESS:
response_blob = meta.get("response") or {}
data_blob = (
response_blob.get("data") if isinstance(response_blob, dict) else None
) or {}
safe_text = (
data_blob.get("safe_text") if isinstance(data_blob, dict) else None
)
request_blob = meta.get("request") or {}
original_text = (
request_blob.get("text") if isinstance(request_blob, dict) else None
)
value = safe_text if isinstance(safe_text, str) else (original_text or "")

response_id: str | None = None
if isinstance(callback_blob, dict):
rid = callback_blob.get("response_id")
if isinstance(rid, str):
response_id = rid

guardrails_response = GuardrailsCallbackData.model_validate(
{
"response": {
"response_id": response_id,
"output": {
"type": "text",
"content": {"format": "text", "value": value},
},
},
"usage": (
data_blob.get("usage")
if isinstance(data_blob, dict)
and isinstance(data_blob.get("usage"), dict)
else {}
),
"provider_raw_response": None,
}
)

return APIResponse.success_response(
data=GuardrailsJobPublic(
job_id=job.id,
status=job.status.value,
guardrails_response=guardrails_response,
error_message=job.error_message,
warnings=warnings,
)
)
18 changes: 18 additions & 0 deletions backend/app/celery/tasks/job_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,24 @@ def run_llm_chain_job(self, project_id: int, job_id: str, trace_id: str, **kwarg
)


@celery_app.task(bind=True, queue="high_priority", priority=9)
@gevent_timeout(settings.CELERY_TASK_SOFT_TIME_LIMIT, "run_guardrails_job")
Comment on lines +97 to +98

@Ayush8923 Ayush8923 Jun 16, 2026

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing: currently the traffic volume is expected to be maybe low from TAP, but I agree that guardrails jobs could eventually compete with interactive LLM requests. Let’s track this as a follow-up and evaluate a dedicated queue/lower priority once usage increases.

def run_guardrails_job(self, project_id: int, job_id: str, trace_id: str, **kwargs):
from app.services.guardrails.jobs import execute_job

_set_trace(trace_id)
return _run_with_otel_parent(
self,
lambda: execute_job(
project_id=project_id,
job_id=job_id,
task_id=current_task.request.id,
task_instance=self,
**kwargs,
),
)


@celery_app.task(bind=True, queue="high_priority", priority=9)
@gevent_timeout(settings.CELERY_TASK_SOFT_TIME_LIMIT, "run_response_job")
def run_response_job(self, project_id: int, job_id: str, trace_id: str, **kwargs):
Expand Down
18 changes: 18 additions & 0 deletions backend/app/celery/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,24 @@ def start_llm_chain_job(
return task_id


def start_guardrails_job(
project_id: int, job_id: str, trace_id: str = "N/A", **kwargs
) -> str:
from app.celery.tasks.job_execution import run_guardrails_job

task_id = _enqueue_with_trace_context(
run_guardrails_job,
project_id=project_id,
job_id=job_id,
trace_id=trace_id,
**kwargs,
)
logger.info(
f"[start_guardrails_job] Started job {job_id} with Celery task {task_id}"
)
return task_id


def start_response_job(
project_id: int, job_id: str, trace_id: str = "N/A", **kwargs
) -> str:
Expand Down
Loading
Loading