-
Notifications
You must be signed in to change notification settings - Fork 10
feat: guardrails endpoint #939
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
Prajna1999
wants to merge
8
commits into
main
Choose a base branch
from
feature/guardrails-standalone-api
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
2a4f0dc
feat: guardrails endpoint
Prajna1999 7433691
Merge branch 'main' into feature/guardrails-standalone-api
Prajna1999 2b13613
Merge branch 'main' into feature/guardrails-standalone-api
Ayush8923 a5485de
Update backend/app/services/guardrails/jobs.py
Prajna1999 14ba055
Apply suggestion from @Ayush8923
Prajna1999 e2520d5
Merge branch 'main' into feature/guardrails-standalone-api
Prajna1999 176679b
fix: fix comments and add logging
Prajna1999 3aaea2c
Merge branch 'main' into feature/guardrails-standalone-api
Ayush8923 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
46 changes: 46 additions & 0 deletions
46
backend/app/alembic/versions/068_add_meta_and_guardrails_jobtype.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,46 @@ | ||
| """add job.meta JSONB column and LLM_GUARDRAILS jobtype enum value | ||
|
|
||
| Revision ID: 068 | ||
| Revises: 067 | ||
| Create Date: 2026-06-15 00:00:00.000000 | ||
|
|
||
| """ | ||
|
|
||
| import sqlalchemy as sa | ||
| from alembic import op | ||
| from sqlalchemy.dialects import postgresql | ||
|
|
||
| revision = "068" | ||
| down_revision = "067" | ||
| branch_labels = None | ||
| depends_on = None | ||
|
|
||
|
|
||
| def upgrade() -> None: | ||
| # Add new enum value for guardrails-only jobs. ALTER TYPE ... ADD VALUE | ||
| # cannot run inside a transaction block, hence the autocommit guard. | ||
| with op.get_context().autocommit_block(): | ||
| op.execute("ALTER TYPE jobtype ADD VALUE IF NOT EXISTS 'LLM_GUARDRAILS'") | ||
|
|
||
| # Add nullable meta JSONB column on job for per-job-type tracking payloads | ||
| # (e.g. guardrails request/response). Nullable + no default so it imposes | ||
| # zero cost on existing rows and on job types that do not write to it. | ||
| op.add_column( | ||
| "job", | ||
| sa.Column( | ||
| "meta", | ||
| postgresql.JSONB(astext_type=sa.Text()), | ||
| nullable=True, | ||
| comment=( | ||
| "Per-job-type tracking payload. For LLM_GUARDRAILS this stores " | ||
| "{'request': {...}, 'response': {...}} capturing the inbound " | ||
| "guardrails request and the upstream guardrails service response." | ||
| ), | ||
| ), | ||
| ) | ||
|
|
||
|
|
||
| def downgrade() -> None: | ||
| op.drop_column("job", "meta") | ||
| # NOTE: Postgres has no clean way to remove a single enum value. Leaving | ||
| # 'LLM_GUARDRAILS' on the type on downgrade is intentional and harmless. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,65 @@ | ||
| Apply guardrails to a piece of text and deliver the sanitised result via a webhook callback. | ||
|
|
||
| This endpoint exists for callers who manage their own LLM workflow but want to | ||
| reuse Kaapi's guardrails service. It is symmetric for input and output | ||
| guardrails: send the text that needs sanitisation in `text` along with one or | ||
| more `validator_config_id`s, and receive the sanitised text on your | ||
| `callback_url`. | ||
|
|
||
| ### Flow | ||
|
|
||
| 1. Caller POSTs `{text, config, callback_url}` to `/api/v1/guardrails`. | ||
| 2. Kaapi creates a job (`job_type=LLM_GUARDRAILS`), returns `job_id` with HTTP 200 | ||
| immediately. | ||
| 3. A Celery worker resolves the validators, calls the guardrails service, and | ||
| POSTs the sanitised text (or a hard-block error) to `callback_url`. | ||
| 4. The full upstream guardrails response and the original request body are | ||
| persisted on `job.meta` for traceability and can be inspected via | ||
| `GET /api/v1/guardrails/{job_id}`. | ||
|
|
||
| ### Webhook payload | ||
|
|
||
| The webhook receives a standard `APIResponse` envelope: | ||
|
|
||
| ```json | ||
| { | ||
| "success": true, | ||
| "data": { | ||
| "response": { | ||
| "response_id": "<guardrails-response-id-or-null>", | ||
| "output": { | ||
| "type": "text", | ||
| "content": { "format": "text", "value": "<sanitised text>" } | ||
| } | ||
| }, | ||
| "usage": { | ||
| "input_tokens": 0, | ||
| "output_tokens": 0, | ||
| "total_tokens": 0, | ||
| "reasoning_tokens": 0 | ||
| }, | ||
| "provider_raw_response": null | ||
| }, | ||
| "error": null, | ||
| "metadata": { "<your request_metadata>": "...", "warnings": [] } | ||
| } | ||
| ``` | ||
|
|
||
| If the guardrails service hard-blocks the text, `success` is `false`, `error` | ||
| carries the upstream message, and `data` is `null`. If the guardrails service | ||
| is unreachable the job still succeeds but the webhook carries the original | ||
| text unchanged and `metadata.warnings` carries a human-readable note (e.g. | ||
| `"Guardrails service was unavailable; original text was returned unchanged."`). | ||
| Other warnings may surface for duplicate validator IDs, an empty validator | ||
| list, or a missing sanitised text in the upstream response. | ||
|
|
||
| ### Notes | ||
|
|
||
| - `config[].type` and `config[].tag` are caller-side | ||
| bookkeeping. They are not interpreted by the server but are useful for your | ||
| own correlation (echoed back via `request_metadata` if you include them | ||
| there). | ||
| - For output-guardrail flows that need the original prompt paired with the | ||
| LLM output, this endpoint v1 sends only `text`; pairing is not exposed. | ||
| - The same webhook signing scheme as `/llm/call` is used when a webhook secret | ||
| is configured. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,203 @@ | ||
| import logging | ||
| from uuid import UUID | ||
|
|
||
| from fastapi import APIRouter, Depends, HTTPException | ||
| from opentelemetry import trace | ||
|
|
||
| from app.api.deps import AuthContextDep, SessionDep | ||
| from app.api.permissions import Permission, require_permission | ||
| from app.core.rate_monitor import monitor_rate | ||
| from app.core.telemetry import log_context | ||
| from app.crud.jobs import JobCrud | ||
| from app.models import JobStatus, JobType | ||
| from app.models.guardrails import ( | ||
| GuardrailsCallbackData, | ||
| GuardrailsJobImmediatePublic, | ||
| GuardrailsJobPublic, | ||
| GuardrailsRequest, | ||
| ) | ||
| from app.services.guardrails.jobs import start_job | ||
| from app.utils import APIResponse, load_description, validate_callback_url | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| router = APIRouter(tags=["Guardrails"]) | ||
| guardrails_callback_router = APIRouter() | ||
|
|
||
|
|
||
| @guardrails_callback_router.post( | ||
| "{$callback_url}", | ||
| name="guardrails_callback", | ||
| ) | ||
| def guardrails_callback_notification(body: APIResponse[GuardrailsCallbackData]): | ||
| """ | ||
| Callback endpoint specification for /guardrails completion. | ||
|
|
||
| The callback will receive: | ||
| - On success: APIResponse with success=True and data containing | ||
| GuardrailsCallbackData (sanitised text under data.response.output). | ||
| - On hard-block / failure: APIResponse with success=False and error set. | ||
| - metadata field will always include any request_metadata supplied with | ||
| the original request, plus a `warnings` list. | ||
| """ | ||
| ... | ||
|
|
||
|
|
||
| @router.post( | ||
| "/guardrails", | ||
| description=load_description("guardrails/apply_guardrails.md"), | ||
| response_model=APIResponse[GuardrailsJobImmediatePublic], | ||
| callbacks=guardrails_callback_router.routes, | ||
| dependencies=[ | ||
| Depends(require_permission(Permission.REQUIRE_PROJECT)), | ||
| Depends(monitor_rate("llm_call")), | ||
| ], | ||
| ) | ||
| def apply_guardrails_endpoint( | ||
| _current_user: AuthContextDep, | ||
| session: SessionDep, | ||
| request: GuardrailsRequest, | ||
| ) -> APIResponse[GuardrailsJobImmediatePublic]: | ||
| """Initiate a guardrails-only job. Returns the job_id immediately; the | ||
| sanitised text is delivered via callback_url (or polled via GET).""" | ||
| project_id = _current_user.project_.id | ||
| organization_id = _current_user.organization_.id | ||
|
|
||
| with log_context( | ||
| tag="guardrails", | ||
| system="guardrails", | ||
| lifecycle="api.guardrails.apply", | ||
| project_id=project_id, | ||
| organization_id=organization_id, | ||
| callback_enabled=request.callback_url is not None, | ||
| ): | ||
| span = trace.get_current_span() | ||
| if span.is_recording(): | ||
| span.set_attribute("kaapi.project_id", project_id) | ||
| span.set_attribute("kaapi.organization_id", organization_id) | ||
| span.set_attribute( | ||
| "guardrails.callback_enabled", request.callback_url is not None | ||
| ) | ||
|
|
||
| if request.callback_url: | ||
| validate_callback_url(str(request.callback_url)) | ||
|
|
||
| job_id = start_job( | ||
| db=session, | ||
| request=request, | ||
| project_id=project_id, | ||
| organization_id=organization_id, | ||
| ) | ||
|
|
||
| if span.is_recording(): | ||
| span.set_attribute("guardrails.job_id", str(job_id)) | ||
|
|
||
| job = JobCrud(session=session).get(job_id=job_id, project_id=project_id) | ||
| if not job: | ||
| raise HTTPException(status_code=404, detail="Job not found") | ||
|
|
||
| message = ( | ||
| "Guardrails are being applied; the sanitised text will be delivered via callback." | ||
| if request.callback_url | ||
| else "Guardrails are being applied; poll GET /guardrails/{job_id} for the result." | ||
| ) | ||
|
|
||
| return APIResponse.success_response( | ||
| data=GuardrailsJobImmediatePublic( | ||
| job_id=job.id, | ||
| status=job.status.value, | ||
| message=message, | ||
| job_inserted_at=job.inserted_at, | ||
| job_updated_at=job.updated_at, | ||
| ) | ||
| ) | ||
|
|
||
|
|
||
| @router.get( | ||
| "/guardrails/{job_id}", | ||
| response_model=APIResponse[GuardrailsJobPublic], | ||
| dependencies=[Depends(require_permission(Permission.REQUIRE_PROJECT))], | ||
| ) | ||
| def get_guardrails_job_status( | ||
| _current_user: AuthContextDep, | ||
| session: SessionDep, | ||
| job_id: UUID, | ||
| ) -> APIResponse[GuardrailsJobPublic]: | ||
| """Poll for a /guardrails job's status and result. | ||
|
|
||
| On SUCCESS the sanitised text is rehydrated from the persisted upstream | ||
| response stored on ``job.meta``. | ||
| """ | ||
| project_id = _current_user.project_.id | ||
|
|
||
| with log_context( | ||
| tag="guardrails", | ||
| system="guardrails", | ||
| lifecycle="api.guardrails.status", | ||
| job_id=job_id, | ||
| project_id=project_id, | ||
| organization_id=_current_user.organization_.id, | ||
| ): | ||
| job = JobCrud(session=session).get(job_id=job_id, project_id=project_id) | ||
| if not job or job.job_type != JobType.LLM_GUARDRAILS: | ||
| # 404 (not 403) to avoid leaking existence of non-guardrails jobs. | ||
| raise HTTPException(status_code=404, detail="Job not found") | ||
|
|
||
| meta = job.meta if isinstance(job.meta, dict) else {} | ||
| callback_blob = meta.get("callback") if isinstance(meta, dict) else None | ||
|
|
||
| warnings: list[str] = [] | ||
| if isinstance(callback_blob, dict): | ||
| raw_warnings = callback_blob.get("warnings") | ||
| if isinstance(raw_warnings, list): | ||
| warnings = [w for w in raw_warnings if isinstance(w, str)] | ||
|
|
||
| guardrails_response: GuardrailsCallbackData | None = None | ||
| if job.status == JobStatus.SUCCESS: | ||
| response_blob = meta.get("response") or {} | ||
| data_blob = ( | ||
| response_blob.get("data") if isinstance(response_blob, dict) else None | ||
| ) or {} | ||
| safe_text = ( | ||
| data_blob.get("safe_text") if isinstance(data_blob, dict) else None | ||
| ) | ||
| request_blob = meta.get("request") or {} | ||
| original_text = ( | ||
| request_blob.get("text") if isinstance(request_blob, dict) else None | ||
| ) | ||
| value = safe_text if isinstance(safe_text, str) else (original_text or "") | ||
|
|
||
| response_id: str | None = None | ||
| if isinstance(callback_blob, dict): | ||
| rid = callback_blob.get("response_id") | ||
| if isinstance(rid, str): | ||
| response_id = rid | ||
|
|
||
| guardrails_response = GuardrailsCallbackData.model_validate( | ||
| { | ||
| "response": { | ||
| "response_id": response_id, | ||
| "output": { | ||
| "type": "text", | ||
| "content": {"format": "text", "value": value}, | ||
| }, | ||
| }, | ||
| "usage": ( | ||
| data_blob.get("usage") | ||
| if isinstance(data_blob, dict) | ||
| and isinstance(data_blob.get("usage"), dict) | ||
| else {} | ||
| ), | ||
| "provider_raw_response": None, | ||
| } | ||
| ) | ||
|
|
||
| return APIResponse.success_response( | ||
| data=GuardrailsJobPublic( | ||
| job_id=job.id, | ||
| status=job.status.value, | ||
| guardrails_response=guardrails_response, | ||
| error_message=job.error_message, | ||
| warnings=warnings, | ||
| ) | ||
| ) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing: currently the traffic volume is expected to be maybe low from TAP, but I agree that guardrails jobs could eventually compete with interactive LLM requests. Let’s track this as a follow-up and evaluate a dedicated queue/lower priority once usage increases.