Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions internal/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,9 +320,16 @@ func (d Deps) handleIngestDocument(w http.ResponseWriter, r *http.Request) {
SourceRef: key,
})
if err := d.Queue.Enqueue(ctx, queue.Job{
Kind: queue.KindIngestDocument,
Payload: payload,
DedupeKey: string(docID),
Kind: queue.KindIngestDocument,
Payload: payload,
// Cap retries so a transient failure (e.g. a parse-timeout or a
// not-yet-visible source under heavy concurrent ingestion) gets a
// few chances to recover, without the queue's default 25-attempt
// exponential backoff dragging a genuinely-bad document out for
// hours. The document stays "parsing" across these attempts and
// only flips to "failed" on the last one (see Pipeline.fail).
MaxRetries: 5,
DedupeKey: string(docID),
}); err != nil {
d.Logger.Error("ingest: enqueue failed", "err", err)
writeErr(w, http.StatusInternalServerError, "enqueue failed")
Expand Down
51 changes: 47 additions & 4 deletions pkg/ingest/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,14 +317,38 @@ func (p *Pipeline) acquireGlobalLLM(ctx context.Context) (release func(), ok boo
}
}

// lastAttemptCtxKey carries whether the current ingest job is on its final
// queue attempt. When it isn't, a transient failure must NOT be surfaced as a
// terminal "failed" status — the queue will retry it.
type lastAttemptCtxKeyT struct{}

func withLastAttempt(ctx context.Context, last bool) context.Context {
return context.WithValue(ctx, lastAttemptCtxKeyT{}, last)
}

// isLastAttempt reports whether this is the final attempt. Defaults to true
// when unset (callers that don't track attempts, e.g. tests, keep the
// fail-immediately behaviour).
func isLastAttempt(ctx context.Context) bool {
v, ok := ctx.Value(lastAttemptCtxKeyT{}).(bool)
if !ok {
return true
}
return v
}

// Handler returns a queue.Handler suitable for queue.KindIngestDocument.
func (p *Pipeline) Handler() queue.Handler {
return func(ctx context.Context, j queue.Job) error {
var payload Payload
if err := json.Unmarshal(j.Payload, &payload); err != nil {
return fmt.Errorf("decode payload: %w", err)
}
return p.Run(ctx, payload)
// A job with no attempt tracking (MaxAttempts<=0) is treated as its
// own last attempt; otherwise it's the last attempt only when the
// queue has used up its retries.
last := j.MaxAttempts <= 0 || j.Attempt >= j.MaxAttempts
return p.Run(withLastAttempt(ctx, last), payload)
}
}

Expand Down Expand Up @@ -1005,11 +1029,30 @@ func fallbackSummary(title, body string) string {

func (p *Pipeline) fail(ctx context.Context, store docPersister, id tree.DocumentID, stage string, cause error) {
msg := fmt.Sprintf("%s: %s", stage, cause.Error())
// Use a FRESH context for the failure write — the inbound one is
// almost certainly the reason we're failing (timeout/cancel) and
// reusing it would leave the doc stuck on "parsing" forever.
// Use a FRESH context for the status write — the inbound one is almost
// certainly the reason we're failing (timeout/cancel) and reusing it
// would leave the doc stuck mid-flight.
failCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// If the queue will retry this job, the failure is (so far) transient —
// under heavy concurrent ingestion, parse can hit a parse-timeout or a
// not-yet-visible source and recover on a later attempt. Surfacing
// "failed" now would tell a polling client the document is dead when it
// isn't, so keep it in "parsing" and let the retry run. Only the final
// attempt produces a terminal "failed".
if !isLastAttempt(ctx) {
p.Logger.Warn("ingest: transient failure, will retry",
"document_id", string(id), "stage", stage, "cause", cause.Error())
if err := store.SetDocumentStatus(failCtx, id, db.StatusParsing, ""); err != nil {
p.Logger.Error("ingest: failed to reset document to parsing", "err", err)
}
return
}

// Terminal failure: log it (previously silent — failures only showed up
// in the DB error_message) and mark the document failed.
p.Logger.Error("ingest: failed", "document_id", string(id), "stage", stage, "cause", cause.Error())
if err := store.SetDocumentStatus(failCtx, id, db.StatusFailed, msg); err != nil {
p.Logger.Error("ingest: failed to mark document failed", "err", err, "cause", cause)
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ type Job struct {

// Optional: max retries before dead-lettering.
MaxRetries int `json:"max_retries,omitempty"`

// Attempt is the 1-based current attempt number, set by the queue when
// it dispatches a job to its handler (0 if the queue doesn't track
// attempts). MaxAttempts is the total before dead-lettering. Handlers
// use these to tell a transient, will-be-retried failure apart from a
// terminal one — e.g. so a document isn't marked "failed" while the
// queue will still retry it.
Attempt int `json:"-"`
MaxAttempts int `json:"-"`
}

// Handler processes a single job.
Expand Down
14 changes: 13 additions & 1 deletion pkg/queue/river.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,19 @@ func (w *envelopeWorker) Work(ctx context.Context, job *river.Job[envelopeArgs])
if !ok {
return fmt.Errorf("%w: %q", ErrUnknownKind, job.Args.DomainKind)
}
return h(ctx, Job{Kind: job.Args.DomainKind, Payload: job.Args.Payload})
// Attempt/MaxAttempts live on River's embedded *JobRow, which is always
// populated in production but may be nil in unit tests that construct a
// bare river.Job. Guard the deref.
attempt, maxAttempts := 0, 0
if job.JobRow != nil {
attempt, maxAttempts = job.Attempt, job.MaxAttempts
}
return h(ctx, Job{
Kind: job.Args.DomainKind,
Payload: job.Args.Payload,
Attempt: attempt,
MaxAttempts: maxAttempts,
})
}

// NewRiver constructs a new River-backed Queue. It opens its own pgxpool
Expand Down
Loading