In-Process Worker
Overview
Section titled “Overview”This document describes the worker design in Queuert: how workers coordinate job processing, manage concurrency, and handle failures.
A worker runs a main loop that coordinates job processing across multiple slots. Each slot processes one job at a time; the worker manages concurrency and scaling.
Concurrency Model
Section titled “Concurrency Model”Workers process jobs in parallel using slots. See createInProcessWorker TSDoc for configuration options. Default: single slot (concurrency: 1).
Architecture
Section titled “Architecture”How it works:
- Main loop spawns slots up to
concurrency - Each slot acquires a job and processes it independently
- When a slot completes, main loop spawns a replacement
- Slots compete for jobs via database-level locking (
FOR UPDATE SKIP LOCKEDin PostgreSQL)
Worker Lifecycle
Section titled “Worker Lifecycle”Main Loop
Section titled “Main Loop”The worker runs a single coordinating loop:
- Fill: Spawn slots up to
concurrency - Reap: Reclaim one expired lease (if any idle slots remain)
- Wait: Listen for notification, poll timeout, or slot completion
- Repeat
Shutdown
Section titled “Shutdown”On startup, the worker emits a workerStarted observability event.
Calling stop() triggers graceful shutdown:
- Signal abort controller
- Stop spawning new slots
- Wait for all in-flight jobs to complete (or abandon via lease expiry)
- Emit
workerStoppingandworkerStoppedobservability events
Worker Identity
Section titled “Worker Identity”Each worker has a unique identity stored in leasedBy. The worker tracks active jobs internally and routes abort signals by job ID—no per-slot identity is needed.
Reaper
Section titled “Reaper”The reaper reclaims jobs with expired leases, making them available for retry.
When idle slots remain in the main loop:
- Find oldest
runningjob whereleasedUntil < now()and type matches registered types - Transition job:
running→pending, clearleasedByandleasedUntil - Emit
jobReapedobservability event - Notify via
jobScheduled(workers wake up) andjobOwnershipLost(original worker aborts)
Design decisions:
- Integrated with main loop: Runs once per iteration, no separate process needed.
- One job per iteration: Reaps at most one job to avoid blocking slot spawning.
- Type-scoped: Only reaps job types the worker is registered to handle.
- Concurrent-safe: Database locking prevents conflicts between workers.
- Self-aware: When running with multiple slots, the reaper excludes jobs currently being processed by the same worker (via
ignoredJobIds). This prevents a race condition where a worker could reap its own in-progress job if the lease expires before renewal.
Retry and Backoff
Section titled “Retry and Backoff”When a job handler throws, the worker reschedules it with exponential backoff:
delay = min(initialDelayMs * multiplier^(attempt-1), maxDelayMs)Example with defaults: 10s → 20s → 40s → 80s → 160s → 300s → 300s…
See Job Processing for details on error handling and abort signals.
Client-Based Construction
Section titled “Client-Based Construction”createInProcessWorker accepts a client instance and extracts infrastructure (stateAdapter, notifyAdapter, observabilityAdapter, jobTypes, log) from it internally. Worker-specific options (processors, concurrency, pollIntervalMs, recoveryBackoffConfig, defaults, requiredAttemptMiddleware) remain separate parameters. The top-level recoveryBackoffConfig controls the worker’s own main loop retry behavior (e.g., recovery from database connection errors). Per-attempt configuration — backoffConfig, leaseConfig, attemptMiddleware — lives on the processor registry; the worker-level defaults.backoffConfig / defaults.leaseConfig provide a fallback for processors that don’t set their own (resolution order: processor → registry → worker defaults → library default). requiredAttemptMiddleware enforces that every slice merged into the worker includes a fixed set of middleware instances as an in-order subsequence — useful for guaranteeing cross-cutting concerns like auth or tracing are present on every slice. The worker only enforces presence; slices continue to run their own middleware chains.
This is purely a construction convenience — no lifecycle coupling is introduced. The client and worker remain independent after construction.
Extensibility
Section titled “Extensibility”Multi-Type Workers
Section titled “Multi-Type Workers”A single worker can handle multiple job types. Slots poll all registered types and process whichever is available first. Per-type configuration (lease, retry) overrides worker defaults.
Attempt Middlewares
Section titled “Attempt Middlewares”Handler middleware hook into one or more job processing phases via wrapHandler, wrapPrepare, and wrapComplete. Each hook can inject typed context into the inner handler/callback through next(ctx). Middleware compose as an onion: the first middleware’s “before” runs outermost. Middleware enable cross-cutting concerns like contextual logging, audit trails, tracing spans, and resource injection.
Middleware are declared on the processor registry via createProcessors({ attemptMiddleware: [...] }). Handler signatures inside that registry auto-infer their typed context from the middleware tuple. When multiple slices are passed as an array to createInProcessWorker, each processor keeps its slice’s middleware chain.
To share middleware across multiple registries, list them inline at each createProcessors call — the tuple inference narrows without requiring as const. Each slice’s middleware chain is the runtime source of truth; the worker simply dispatches to whichever processor matches the job’s typeName.
Summary
Section titled “Summary”The worker design emphasizes:
- Simplicity: Single main loop coordinating parallel slots
- Efficiency: Slots are self-contained, main loop just manages concurrency
- Reliability: Integrated reaper ensures recovery from failures
- Flexibility: Per-type configuration, multi-type workers
- Extensibility: Handler middleware enable cross-cutting concerns
See Also
Section titled “See Also”- Job Processing — Prepare/complete pattern, abort signals, timeouts
- Adapters — Notification optimization, state provider design