jobs
jobs — Crystal port of River
River is a robust high-performance job processing system for Postgres (and SQLite). This repository is a behavior-faithful Crystal port preserving upstream semantics exactly.
Upstream ref: v0.37.1, pinned via git submodule at vendor/river/
Being built for Postgres, River encourages the use of the same database for application data and job queue. By enqueueing jobs transactionally along with other database changes, whole classes of distributed systems problems are avoided. Jobs are guaranteed to be enqueued if their transaction commits, are removed if their transaction rolls back, and aren't visible for work until commit.
Installation
# shard.yml
dependencies:
jobs:
github: dsisnero/jobs
Requires Crystal >= 1.20.2 and PostgreSQL (or SQLite).
Quick Start
require "jobs"
require "db"
require "pg"
# Define a job
struct SortArgs
include JSON::Serializable
include Jobs::Core::JobArgs
property strings : Array(String)
def initialize(@strings : Array(String) = [] of String); end
def kind : String; "sort"; end
end
class SortWorker
include Jobs::Core::Worker(SortArgs)
def work(job : Jobs::Core::Job(SortArgs)) : Nil
job.args.strings.sort!
puts "Sorted: #{job.args.strings}"
end
end
# Setup
db = DB.open("postgres://localhost/river_test?sslmode=disable")
driver = RealPostgresDriver.new(db)
workers = Jobs::Core::Workers.new
workers.add(SortWorker.new)
config = Jobs::Client::Config.new(
queues: {"default" => Jobs::Client::QueueConfig.new(max_workers: 100)},
workers: workers,
).with_defaults
client = Jobs::Client::Client.new(config, driver)
# Insert and work
client.insert("sort", SortArgs.new(strings: ["whale", "tiger", "bear"]).to_json.to_slice)
client.start
# Subscribe to events
ch, cancel = client.subscribe(Jobs::Core::EventKind::JobCompleted)
spawn do
loop do
event = ch.receive
puts "Job ##{event.job.try(&.id)} completed"
end
end
# Graceful shutdown
client.stop
cancel.call
CLI Tool
# Build
crystal build src/cli/jobs.cr -o bin/jobs
# Run migrations
./bin/jobs up --database-url postgres://localhost/river_test?sslmode=disable
./bin/jobs list --database-url postgres://localhost/river_test?sslmode=disable
./bin/jobs validate --database-url postgres://localhost/river_test?sslmode=disable
./bin/jobs version
See CLI Tools for full usage.
Documentation
| Document | Description |
|---|---|
| Getting Started | First job, client setup, subscriptions |
| Job Insertion | Batch insert, unique jobs, scheduled, listing |
| Workers | Workers, hooks, middleware, retry policies, snoozing |
| Periodic Jobs | Cron-style scheduling, intervals |
| Subscriptions | Event monitoring and metrics |
| Error Handling | Error handlers, retries, cancellation |
| Graceful Shutdown | Soft/hard stop, shutdown sequences |
| CLI Tools | Migration commands |
| State Machine | Job state transitions diagram |
| Architecture | Package structure and design |
| Development | Setup, testing, contributing |
| Testing | Test patterns and helpers |
| Coding Guidelines | Code conventions |
| PR Workflow | Pull request process |
Architecture
src/jobs/
├── client.cr # Config, QueueConfig, validation
├── client_cls.cr # Client: insert, subscribe, start/stop
├── core/ # Job, Worker, RetryPolicy, Event, etc.
├── driver/ # Driver interface, params, real_pg, sqlite
├── internal/ # Producer, JobExecutor, JobCompleter, etc.
│ ├── maintenance.cr # QueueCleaner, JobCleaner, JobRescuer, Reindexer, etc.
│ └── leadership.cr # Elector (leader election)
├── migrate/ # Migration framework + SQL files
├── shared/ # StartStop, CircuitBreaker, ServiceUtil, TimeUtil
└── types/ # JobRow, JobState, error types
Features
- Transactional enqueueing — Jobs enqueued in a transaction are guaranteed to exist after commit and not before.
- Job Args + Workers — Define jobs with
JobArgsfor serialization andWorkerfor execution. - Multiple queues, priority, scheduling — Queue isolation, periodic/cron jobs, scheduled jobs, and snoozing.
- Middleware + Hooks — Global and per-job lifecycle interception.
- Error handling — Error handlers, retry policies (exponential backoff: attempt^4 seconds), and cancellation support.
- Graceful shutdown — Soft stop (wait for jobs) and hard stop modes.
- Subscriptions — Event-driven monitoring for logging, metrics, and alerts.
- Maintenance services — All 7 services: QueueCleaner, JobCleaner, JobRescuer, Reindexer, JobScheduler, PeriodicJobEnqueuer, QueueMaintainer.
- Leader election — Postgres advisory lock-based leader election.
- Unique jobs — By args, period, queue, and state (SHA256 key generation).
- Migrations — CLI tool (
jobs) and programmatic migration API. - Dual drivers — PostgreSQL (production) and SQLite (in-memory testing).
Drivers
| Driver | File | Status |
|---|---|---|
| PostgreSQL | src/jobs/driver/real_pg.cr |
Complete (993 lines) |
| SQLite | src/jobs/driver/sqlite.cr |
Complete (48 methods, 25 TDD specs) |
Development
make install # Install dependencies
make format # Format Crystal source
make lint # Run ameba linter
make test # Run Crystal specs
make clean # Remove build artifacts
License
MIT — Port of River upstream code.
jobs
- 0
- 0
- 0
- 0
- 3
- about 3 hours ago
- May 26, 2026
MIT License
Tue, 26 May 2026 09:09:48 GMT