Skip to content

In-Process Worker

This document describes the worker design in Queuert: how workers coordinate job processing, manage concurrency, and handle failures.

A worker runs a main loop that coordinates job processing across multiple slots. Each slot processes one job at a time; the worker manages concurrency and scaling.

Workers process jobs in parallel using slots. See createInProcessWorker TSDoc for configuration options. Default: single slot (concurrency: 1).

Diagram

How it works:

  1. Main loop spawns slots up to concurrency
  2. Each slot acquires a job and processes it independently
  3. When a slot completes, main loop spawns a replacement
  4. Slots compete for jobs via database-level locking (FOR UPDATE SKIP LOCKED in PostgreSQL)

The worker runs a single coordinating loop:

  1. Fill: Spawn slots up to concurrency
  2. Reap: Reclaim one expired lease (if any idle slots remain)
  3. Wait: Listen for notification, poll timeout, or slot completion
  4. Repeat

On startup, the worker emits a workerStarted observability event.

Calling stop() triggers graceful shutdown:

  1. Signal abort controller
  2. Stop spawning new slots
  3. Wait for all in-flight jobs to complete (or abandon via lease expiry)
  4. Emit workerStopping and workerStopped observability events

Each worker has a unique identity stored in leasedBy. The worker tracks active jobs internally and routes abort signals by job ID—no per-slot identity is needed.

The reaper reclaims jobs with expired leases, making them available for retry.

When idle slots remain in the main loop:

  1. Find oldest running job where leasedUntil < now() and type matches registered types
  2. Transition job: runningpending, clear leasedBy and leasedUntil
  3. Emit jobReaped observability event
  4. Notify via jobScheduled (workers wake up) and jobOwnershipLost (original worker aborts)

Design decisions:

  • Integrated with main loop: Runs once per iteration, no separate process needed.
  • One job per iteration: Reaps at most one job to avoid blocking slot spawning.
  • Type-scoped: Only reaps job types the worker is registered to handle.
  • Concurrent-safe: Database locking prevents conflicts between workers.
  • Self-aware: When running with multiple slots, the reaper excludes jobs currently being processed by the same worker (via ignoredJobIds). This prevents a race condition where a worker could reap its own in-progress job if the lease expires before renewal.

When a job handler throws, the worker reschedules it with exponential backoff:

delay = min(initialDelayMs * multiplier^(attempt-1), maxDelayMs)

Example with defaults: 10s → 20s → 40s → 80s → 160s → 300s → 300s…

See Job Processing for details on error handling and abort signals.

createInProcessWorker accepts a client instance and extracts infrastructure (stateAdapter, notifyAdapter, observabilityAdapter, jobTypes, log) from it internally. Worker-specific options (processors, concurrency, pollIntervalMs, recoveryBackoffConfig, defaults, requiredAttemptMiddleware) remain separate parameters. The top-level recoveryBackoffConfig controls the worker’s own main loop retry behavior (e.g., recovery from database connection errors). Per-attempt configuration — backoffConfig, leaseConfig, attemptMiddleware — lives on the processor registry; the worker-level defaults.backoffConfig / defaults.leaseConfig provide a fallback for processors that don’t set their own (resolution order: processor → registry → worker defaults → library default). requiredAttemptMiddleware enforces that every slice merged into the worker includes a fixed set of middleware instances as an in-order subsequence — useful for guaranteeing cross-cutting concerns like auth or tracing are present on every slice. The worker only enforces presence; slices continue to run their own middleware chains.

This is purely a construction convenience — no lifecycle coupling is introduced. The client and worker remain independent after construction.

A single worker can handle multiple job types. Slots poll all registered types and process whichever is available first. Per-type configuration (lease, retry) overrides worker defaults.

Handler middleware hook into one or more job processing phases via wrapHandler, wrapPrepare, and wrapComplete. Each hook can inject typed context into the inner handler/callback through next(ctx). Middleware compose as an onion: the first middleware’s “before” runs outermost. Middleware enable cross-cutting concerns like contextual logging, audit trails, tracing spans, and resource injection.

Middleware are declared on the processor registry via createProcessors({ attemptMiddleware: [...] }). Handler signatures inside that registry auto-infer their typed context from the middleware tuple. When multiple slices are passed as an array to createInProcessWorker, each processor keeps its slice’s middleware chain.

To share middleware across multiple registries, list them inline at each createProcessors call — the tuple inference narrows without requiring as const. Each slice’s middleware chain is the runtime source of truth; the worker simply dispatches to whichever processor matches the job’s typeName.

The worker design emphasizes:

  1. Simplicity: Single main loop coordinating parallel slots
  2. Efficiency: Slots are self-contained, main loop just manages concurrency
  3. Reliability: Integrated reaper ensures recovery from failures
  4. Flexibility: Per-type configuration, multi-type workers
  5. Extensibility: Handler middleware enable cross-cutting concerns
  • Job Processing — Prepare/complete pattern, abort signals, timeouts
  • Adapters — Notification optimization, state provider design