jobs

Crystal port of River — a fast, reliable job processing system for Postgres and SQLite.

jobs — Crystal port of River

River is a robust high-performance job processing system for Postgres (and SQLite). This repository is a behavior-faithful Crystal port preserving upstream semantics exactly.

Upstream ref: v0.37.1, pinned via git submodule at vendor/river/

Being built for Postgres, River encourages the use of the same database for application data and job queue. By enqueueing jobs transactionally along with other database changes, whole classes of distributed systems problems are avoided. Jobs are guaranteed to be enqueued if their transaction commits, are removed if their transaction rolls back, and aren't visible for work until commit.

Installation

# shard.yml
dependencies:
  jobs:
    github: dsisnero/jobs

Requires Crystal >= 1.20.2 and PostgreSQL (or SQLite).

Quick Start

require "jobs"
require "db"
require "pg"

# Define a job
struct SortArgs
  include JSON::Serializable
  include Jobs::Core::JobArgs

  property strings : Array(String)

  def initialize(@strings : Array(String) = [] of String); end
  def kind : String; "sort"; end
end

class SortWorker
  include Jobs::Core::Worker(SortArgs)

  def work(job : Jobs::Core::Job(SortArgs)) : Nil
    job.args.strings.sort!
    puts "Sorted: #{job.args.strings}"
  end
end

# Setup
db = DB.open("postgres://localhost/river_test?sslmode=disable")
driver = RealPostgresDriver.new(db)

workers = Jobs::Core::Workers.new
workers.add(SortWorker.new)

config = Jobs::Client::Config.new(
  queues: {"default" => Jobs::Client::QueueConfig.new(max_workers: 100)},
  workers: workers,
).with_defaults

client = Jobs::Client::Client.new(config, driver)

# Insert and work
client.insert("sort", SortArgs.new(strings: ["whale", "tiger", "bear"]).to_json.to_slice)
client.start

# Subscribe to events
ch, cancel = client.subscribe(Jobs::Core::EventKind::JobCompleted)
spawn do
  loop do
    event = ch.receive
    puts "Job ##{event.job.try(&.id)} completed"
  end
end

# Graceful shutdown
client.stop
cancel.call

CLI Tool

# Build
crystal build src/cli/jobs.cr -o bin/jobs

# Run migrations
./bin/jobs up --database-url postgres://localhost/river_test?sslmode=disable
./bin/jobs list --database-url postgres://localhost/river_test?sslmode=disable
./bin/jobs validate --database-url postgres://localhost/river_test?sslmode=disable
./bin/jobs version

See CLI Tools for full usage.

Documentation

Document Description
Getting Started First job, client setup, subscriptions
Job Insertion Batch insert, unique jobs, scheduled, listing
Workers Workers, hooks, middleware, retry policies, snoozing
Periodic Jobs Cron-style scheduling, intervals
Subscriptions Event monitoring and metrics
Error Handling Error handlers, retries, cancellation
Graceful Shutdown Soft/hard stop, shutdown sequences
CLI Tools Migration commands
State Machine Job state transitions diagram
Architecture Package structure and design
Development Setup, testing, contributing
Testing Test patterns and helpers
Coding Guidelines Code conventions
PR Workflow Pull request process

Architecture

src/jobs/
├── client.cr          # Config, QueueConfig, validation
├── client_cls.cr      # Client: insert, subscribe, start/stop
├── core/              # Job, Worker, RetryPolicy, Event, etc.
├── driver/            # Driver interface, params, real_pg, sqlite
├── internal/          # Producer, JobExecutor, JobCompleter, etc.
│   ├── maintenance.cr # QueueCleaner, JobCleaner, JobRescuer, Reindexer, etc.
│   └── leadership.cr  # Elector (leader election)
├── migrate/           # Migration framework + SQL files
├── shared/            # StartStop, CircuitBreaker, ServiceUtil, TimeUtil
└── types/             # JobRow, JobState, error types

Features

  • Transactional enqueueing — Jobs enqueued in a transaction are guaranteed to exist after commit and not before.
  • Job Args + Workers — Define jobs with JobArgs for serialization and Worker for execution.
  • Multiple queues, priority, scheduling — Queue isolation, periodic/cron jobs, scheduled jobs, and snoozing.
  • Middleware + Hooks — Global and per-job lifecycle interception.
  • Error handling — Error handlers, retry policies (exponential backoff: attempt^4 seconds), and cancellation support.
  • Graceful shutdown — Soft stop (wait for jobs) and hard stop modes.
  • Subscriptions — Event-driven monitoring for logging, metrics, and alerts.
  • Maintenance services — All 7 services: QueueCleaner, JobCleaner, JobRescuer, Reindexer, JobScheduler, PeriodicJobEnqueuer, QueueMaintainer.
  • Leader election — Postgres advisory lock-based leader election.
  • Unique jobs — By args, period, queue, and state (SHA256 key generation).
  • Migrations — CLI tool (jobs) and programmatic migration API.
  • Dual drivers — PostgreSQL (production) and SQLite (in-memory testing).

Drivers

Driver File Status
PostgreSQL src/jobs/driver/real_pg.cr Complete (993 lines)
SQLite src/jobs/driver/sqlite.cr Complete (48 methods, 25 TDD specs)

Development

make install    # Install dependencies
make format     # Format Crystal source
make lint       # Run ameba linter
make test       # Run Crystal specs
make clean      # Remove build artifacts

License

MIT — Port of River upstream code.

Repository

jobs

Owner
Statistic
  • 0
  • 0
  • 0
  • 0
  • 3
  • about 3 hours ago
  • May 26, 2026
License

MIT License

Links
Synced at

Tue, 26 May 2026 09:09:48 GMT

Languages