Adapter Architecture
Overview
Section titled “Overview”This document describes the design philosophy behind Queuert’s adapter system, including factory patterns, context management, and notification optimization.
Provider vs Adapter
Section titled “Provider vs Adapter”Queuert uses a two-layer abstraction for external integrations:
- Provider — a minimal interface that users implement to wrap their chosen database or messaging client. It contains only low-level operations (
executeSql,withTransaction,publish/subscribe). Each driver library (pg, better-sqlite3, ioredis, etc.) gets its own provider implementation. - Adapter — a high-level interface that Queuert builds from a provider via a
create*factory function. Adapters contain the full domain logic (job lifecycle, state transitions, notification semantics) and are whatcreateClientandcreateInProcessWorkerconsume.
The factory transforms a provider into an adapter:
PgStateProvider → createPgStateAdapter() → StateAdapterSqliteStateProvider → createSqliteStateAdapter() → StateAdapterPgNotifyProvider → createPgNotifyAdapter() → NotifyAdapterRedisNotifyProvider → createRedisNotifyAdapter() → NotifyAdapter createNatsNotifyAdapter() → NotifyAdapterThis separation keeps driver-specific code isolated in the provider while the adapter layer remains database-agnostic. Users only implement the provider; they never implement the adapter interface directly.
Conformance
Section titled “Conformance”Because each create* factory produces an adapter with the same contract regardless of the provider underneath, Queuert ships a conformance suite that validates any provider-built adapter against that contract.
The suite is exposed as a framework-agnostic runner under the queuert/conformance subpath. Users wire it into a single test() block from their framework of choice; internal Queuert specs go through the same case list via a thin vitest binding so there’s no drift between end-user validation and internal coverage.
See the Conformance reference for the API and the Custom Adapters guide for a walkthrough.
Async Factory Pattern
Section titled “Async Factory Pattern”Public-facing adapter factories that may perform I/O are async for consistency. In-process and internal-only factories remain sync since they have no I/O.
Rationale
Section titled “Rationale”- Consistency: All public factories follow the same async pattern, reducing cognitive load
- Future-proofing: Factories can add initialization I/O without breaking API
- Explicit async: Callers know to
awaitand handle potential errors
StateAdapter Design
Section titled “StateAdapter Design”Atomic Operations Principle
Section titled “Atomic Operations Principle”All StateAdapter methods must complete in a single database round-trip, where the database engine supports it. This is a core design principle:
- O(1) round trips: Each method—regardless of how many jobs it affects—executes exactly one database operation
- O(n) is incorrect: If an adapter implementation requires multiple round trips proportional to input size, the implementation is wrong
- Batch operations: Methods accepting arrays (e.g.,
deleteJobChains,addJobBlockers) must use batch SQL (multi-row INSERT, UPDATE with IN clause, CTEs) rather than loops
This principle ensures predictable performance and proper atomicity. Use batch SQL (multi-row INSERT, UPDATE with IN/ANY clause, CTEs) rather than loops.
SQLite exception: SQLite does not support writeable CTEs with RETURNING in the same way as PostgreSQL. Operations like addJobBlockers and deleteJobChains use multiple sequential queries within a single transaction instead of a single CTE. This is safe under SQLite’s exclusive transaction locking model (which serializes all writes), but results in more round-trips per operation. This is an accepted trade-off for SQLite support.
Context Architecture
Section titled “Context Architecture”The StateAdapter type accepts two generic parameters: TTxContext (transaction context containing database client/session info) and TJobId (the job ID type for input parameters).
The context is named TTxContext (transaction context) because it’s exclusively used within transactions. When you call withTransaction, the callback receives a context that represents an active transaction.
StateProvider Interface
Section titled “StateProvider Interface”Users create a StateProvider implementation to integrate with their database client. The concrete interfaces live in @queuert/postgres and @queuert/sqlite; the shape below is an illustrative reduction — see the TSDoc on PgStateProvider and SqliteStateProvider for the authoritative signatures (including paramTypes/columnTypes annotations required by the typed-SQL layer).
interface PgStateProvider<TTxContext> { // Manages connection and transaction - called for transactional operations withTransaction: <T>(fn: (txCtx: TTxContext) => Promise<T>) => Promise<T>;
// Execute SQL - when txCtx is provided uses it, when omitted manages own connection executeSql: (options: { txCtx?: TTxContext; sql: string; params?: unknown[]; paramTypes: Record<number, RuntimeType>; columnTypes: Record<string, RuntimeType>; }) => Promise<unknown[]>;}Optional txCtx Semantics
Section titled “Optional txCtx Semantics”All StateAdapter operation methods accept an optional txCtx parameter:
- With txCtx: Uses the provided transaction connection (must come from a
withTransactioncallback) - Without txCtx: Acquires its own connection from the pool, executes, and releases
This enables transactional operations, standalone operations, and DDL operations (like CREATE INDEX CONCURRENTLY) that cannot run inside transactions.
NotifyProvider Interface
Section titled “NotifyProvider Interface”NotifyProvider implementations manage connections internally - no context parameters:
interface PgNotifyProvider { publish: (channel: string, message: string) => Promise<void>; subscribe: ( channel: string, onMessage: (message: string) => void, ) => Promise<() => Promise<void>>;}The provider maintains a dedicated connection for subscriptions and acquires/releases connections for publish operations automatically.
Reaper Support
Section titled “Reaper Support”The reapExpiredJobLease method supports an ignoredJobIds parameter to prevent race conditions when a worker runs with multiple concurrent slots (concurrency > 1). Without it, a worker could reap its own in-progress job if the lease expires before renewal, causing corrupted state. Custom adapter implementations must filter out these job IDs when selecting expired leases.
Internal Type Design
Section titled “Internal Type Design”StateJob is a non-generic type with string for all ID fields. The StateAdapter methods accept TJobId for input parameters but return plain StateJob. This simplifies internal code while allowing adapters to expose typed IDs to consumers via type helpers like GetStateAdapterJobId<TStateAdapter>.
NotifyAdapter Design
Section titled “NotifyAdapter Design”Broadcast Semantics
Section titled “Broadcast Semantics”All notifications use broadcast (pub/sub) semantics with three notify/listen pairs: job scheduling, chain completion, and ownership loss. See the NotifyAdapter type TSDoc for method details.
Hint-Based Optimization
Section titled “Hint-Based Optimization”To prevent thundering herd when many workers are idle, notifications include a hint count:
- Scheduling:
notifyJobScheduled(typeName, count)creates a hint key with the count and publishes with a unique hintId - Receiving: Workers atomically decrement the hint count. Only workers that successfully decrement (hint > 0) proceed to query the database
- Effect: When N jobs are scheduled, exactly N workers query the database; others skip and wait for the next notification
Implementation varies by adapter:
- Redis: Lua scripts for atomic decrement
- NATS with JetStream KV: Revision-based CAS operations
- PostgreSQL/NATS without KV: No optimization (all listeners query database)
- In-process: Synchronous counter operations
Callback Pattern
Section titled “Callback Pattern”All listen* methods accept a callback and return a dispose function. Subscription is active when the promise resolves, and the callback is called synchronously when notifications arrive (no race condition).
ObservabilityAdapter Design
Section titled “ObservabilityAdapter Design”The ObservabilityAdapter provides two observability mechanisms:
-
Metrics: Methods accept primitive data types (not domain objects) for decoupling and stability. Counters, histograms, and gauges track worker lifecycle, job events, and durations.
-
Tracing:
startJobSpanandstartAttemptSpanmethods return handles for managing span lifecycle. Spans follow OpenTelemetry messaging conventions with PRODUCER spans for job creation and CONSUMER spans for processing.
When no adapter is provided, a noop implementation is used automatically, making observability opt-in. See OTEL Tracing for span hierarchy and OTEL Metrics for available metrics. See OTEL Internals for adapter architecture and trace context propagation.
Transactional Buffering
Section titled “Transactional Buffering”Observability events emitted inside database transactions are buffered and only flushed after the transaction commits. If the transaction rolls back, buffered events are discarded — no misleading metrics or spans leak out. Buffering uses TransactionHooks — the same mechanism that flushes notify events on commit.
Buffered — events that represent write claims inside transactions:
- Creation:
jobChainCreated,jobCreated,jobBlocked, and PRODUCER span ends fromcreateStateJobs - Completion:
jobCompleted,jobDuration,completeJobSpan(workerless),jobChainCompleted,jobChainDuration,completeBlockerSpan,jobUnblockedfromfinishJob - Worker complete:
jobAttemptCompletedand continuation PRODUCER span ends from the complete transaction injob-process - Error handling:
jobAttemptFailedfrom the error-handling transaction injob-process
Not buffered — events that either need immediate context or occur outside transactions:
- Span starts: Need trace context immediately for DB writes that store trace IDs
- Events outside transactions:
jobAttemptStarted,jobAttemptDuration,jobAttemptLeaseRenewed, attempt span ends (these occur outside the guarded transaction) - Read-only observations:
refetchJobForUpdateevents observe state without making write claims
Self-Cleaning
Section titled “Self-Cleaning”Both createStateJobs and finishJob use TransactionHooks savepoints (via withSavepoint) to automatically roll back buffered observability events on throw, ensuring partial events from a failed operation don’t accumulate in the buffer. The checkpoint callback on each hook definition captures the buffer position, and the savepoint restores it on rollback.
See Also
Section titled “See Also”- OTEL Metrics — Counters, histograms, and gauges
- OTEL Tracing — Span hierarchy and messaging conventions
- OTEL Internals — Adapter architecture, W3C context propagation, and transactional buffering
- Client API — Mutation and query methods
- In-Process Worker — Worker lifecycle and lease management