Skip to content

talon-one/anyqueue

Repository files navigation

anyqueue

A generic, transactional queue-draining engine for Go.

anyqueue discovers a set of topics (independent logical queues, each with its own destination), and for each topic it continuously pulls batches of pending items out of a persistent store, fans them out across a pool of workers, hands each item to a caller-supplied handler, and records the result back into the store — all within a single unit of work per batch.

The engine is storage- and transport-agnostic: it knows nothing about SQL, HTTP, or any concrete item type. Consumers wire it up by implementing Store, Batch, Topic, Metrics, and a Handler. Each topic runs in its own goroutine, so a slow or backed-up topic never blocks the others.

Usage

err := anyqueue.Run[MyTopic, MyItem, MyResult](
    ctx,
    logger,   // *slog.Logger
    metrics,  // anyqueue.Metrics
    anyqueue.Config{},
    store,    // anyqueue.Store[MyTopic, MyItem, MyResult]
    handler,  // anyqueue.Handler[MyItem, MyResult]
)

The engine is generic over three consumer-defined types:

  • TopicType — the logical queue (implements Topic)
  • ItemType — a unit of work handed to the Handler
  • ResultType — the Handler's per-item output, persisted by Batch.Record

Run blocks until ctx is cancelled and returns nil on graceful shutdown.

Logging

The engine logs via the standard library log/slog. Adapt your own logger by passing an *slog.Logger backed by a custom slog.Handler.

Development

go test -race ./...        # run tests
go tool golangci-lint run  # lint (config in .golangci.yml)

Mocks under mock_*_test.go are generated with mockery from .mockery.yaml:

go run github.com/vektra/mockery/v2@latest

About

Package anyqueue implements a generic transactional queue-draining engine.

Resources

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages