crystal-es v0.6.6
crystal-es
An event sourcing library for Crystal
Installation
-
Add the dependency to your
shard.yml:dependencies: crystal-es: github: tristanholl/crystal-es -
Run
shards install
Overview
crystal-es is an event sourcing foundation for Crystal applications. It is optimized for PostgreSQL as event store, queue, and projection database, and was originally extracted from an open-source core-banking project.
The library provides:
- Aggregates — reconstruct domain state by replaying events
- Commands — enforce business logic and emit events
- Events — immutable facts with a type-safe DSL
- Projections — read models built from event streams, with a schema DSL and schema drift detection
- Event Bus — fan-out published events to registered handlers
- Adapters — PostgreSQL and in-memory implementations for event stores and queues
A complete working example lives in ./examples/financial-transaction.
Components
Event
ES::Event is the base class for all domain events. Each event carries a Header (metadata) and a Body (event-specific payload).
class OrderPlaced < ES::Event
aggregate_type "order"
event_handle "order_placed"
class Body
include JSON::Serializable
getter order_id : UUID
getter amount : Int64
def initialize(@order_id, @amount); end
end
end
Event DSL
The define_event macro removes the boilerplate. It generates the full event class, the Body struct with JSON serialization, and registers the handle automatically.
define_event("order", "order_placed") do
attribute :order_id, UUID
attribute :amount, Int64
attribute :currency, String, "EUR" # optional default
end
attribute(name, type, default?) — declare a typed field on the event body. A default value makes the field optional at construction time.
Aggregate
ES::Aggregate reconstructs domain state from its event history. Each aggregate defines a State struct and a set of apply overloads — one per event type.
class Order < ES::Aggregate
struct State
property placed : Bool = false
property amount : Int64 = 0_i64
end
def apply(event : OrderPlaced)
@state.placed = true
@state.amount = event.body.amount
end
end
Call Order.hydrate(aggregate_id, event_store) to reconstruct an aggregate from persisted events.
Command
ES::Command encapsulates a single business operation. It hydrates the relevant aggregate, evaluates business rules, and appends new events.
class PlaceOrder < ES::Command
def call
order = Order.hydrate(@aggregate_id, event_store)
raise ES::InvalidState.new("already placed") if order.state.placed
append OrderPlaced.new(
aggregate_id: @aggregate_id,
body: OrderPlaced::Body.new(order_id: @aggregate_id, amount: @amount)
)
end
end
Projection
ES::Projection maintains a read model by consuming events in order. It can be replayed from scratch at any time.
Projection DSL
define_projection generates the full projection class — table creation, column definitions, index setup, and event handlers — from a concise block.
define_projection("ledger", "postings") do
column :id, UUID, primary_key: true
column :account_id, UUID
column :amount, Int64
column :posted_at, Time
index [:account_id]
index [:id], unique: true
apply(OrderPlaced) do |event|
# insert into postings table
end
end
column(name, type, **options) maps Crystal types to PostgreSQL column types:
| Crystal type | PostgreSQL type |
|---|---|
String |
TEXT |
Int64 |
BIGINT |
UUID |
UUID |
Time |
TIMESTAMPTZ |
Bool |
BOOLEAN |
index(columns, unique: false, name: nil) — add an index to the projection table.
apply(EventClass) { |event| ... } — handle an event to update the read model.
Schema Drift Detection
Every projection schema is immutable. When setup_table is called, the library computes a SHA-256 fingerprint of the compiled schema (columns, types, nullability, defaults, indexes) and compares it against the fingerprint stored in _crystal_es_projection_metadata. If they diverge, a ES::Exception::SchemaDrift exception is raised before the projection can run.
Breaking changes (raise SchemaDrift):
- Column added, removed, or reordered
- Column type or Crystal type changed
- Nullability or default value changed
- Primary key changed
Non-breaking changes (logged as a warning, metadata updated):
- Index added, removed, or modified
The error message tells you exactly what changed:
Schema drift detected for 'Ledger' (table: finance.ledger).
Stored fingerprint: abc123...
Compiled fingerprint: def456...
Changes:
breaking column_type_changed: column "amount" type changed from TEXT to BIGINT
Projection schemas are immutable. Define a new projection class with a new table
name, populate it from the event store, then rewire the application to the new projection.
To evolve a projection, create a new projection class targeting a new table name, replay the event store into it, then cut the application over. There is no in-place migration path — this is by design.
You can also inspect drift status without triggering an exception:
status = Ledger.drift_status(db)
# => ES::ProjectionMeta::DriftStatus with fingerprints and list of SchemaChange objects
Event Bus
ES::EventBus fans out published events to all registered handlers (commands and projections).
bus = ES::EventBus(ES::Command | ES::Projection).new
bus.subscribe(OrderPlaced, PlaceOrderHandler)
bus.subscribe(OrderPlaced, OrdersProjection)
bus.publish(event)
Event Store
ES::EventStore is the persistence layer for events. Two implementations are provided:
ES::Adapters::EventStores::Postgres— stores events as JSONB rows with unique(aggregate_id, version)constraints. Provides a flattened view for stream queries and cursor-based pagination for batch replay.ES::Adapters::EventStores::InMemory— lightweight implementation for tests.
Queue
ES::Queue provides asynchronous command processing. Two implementations:
ES::Adapters::Queues::Postgres— durable queue backed by a PostgreSQL table.ES::Adapters::Queues::InMemory— for tests and simple scenarios.
Configuration
ES::Config is a global singleton that wires dependencies together:
ES::Config.configure do |c|
c.event_store = ES::Adapters::EventStores::Postgres.new(db)
c.queue = ES::Adapters::Queues::Postgres.new(db)
c.event_bus = ES::EventBus(ES::Command | ES::Projection).new
end
Project Structure
For larger projects, the following vertical-slice layout works well:
src/
domains/
orders/
aggregates/
order.cr
commands/
place_order.cr
cancel_order.cr
events/
order_placed.cr
order_cancelled.cr
projections/
orders_list.cr
payments/
...
shared/
...
Example: Financial Transaction
Below is an abridged version of the financial-transaction example that shows the full event sourcing flow.
1. Define Events
# events/transaction_initiated.cr
define_event("transaction", "transaction_initiated") do
attribute :amount, Int64
attribute :creditor_account, UUID
attribute :debtor_account, UUID
end
# events/transaction_accepted.cr
define_event("transaction", "transaction_accepted") do
end
# events/transaction_rejected.cr
define_event("transaction", "transaction_rejected") do
end
2. Define the Aggregate
# aggregates/transaction.cr
class Transaction < ES::Aggregate
struct State
property amount : Int64 = 0_i64
property creditor_account : UUID? = nil
property debtor_account : UUID? = nil
property accepted : Bool = false
property rejected : Bool = false
end
def apply(event : TransactionInitiated)
@state.amount = event.body.amount
@state.creditor_account = event.body.creditor_account
@state.debtor_account = event.body.debtor_account
end
def apply(event : TransactionAccepted)
@state.accepted = true
end
def apply(event : TransactionRejected)
@state.rejected = true
end
end
3. Define a Command
# commands/process_transaction.cr
class ProcessTransaction < ES::Command
LIMIT = 10_000_i64
def call
tx = Transaction.hydrate(@aggregate_id, event_store)
if tx.state.amount <= LIMIT
append TransactionAccepted.new(aggregate_id: @aggregate_id)
else
append TransactionRejected.new(aggregate_id: @aggregate_id)
end
end
end
4. Define a Projection
# projections/ledger.cr
define_projection("finance", "ledger") do
column :id, UUID, primary_key: true
column :transaction_id, UUID
column :creditor_account, UUID
column :debtor_account, UUID
column :amount, Int64
column :accepted_at, Time, nullable: true
column :rejected_at, Time, nullable: true
index [:transaction_id], unique: true
apply(TransactionInitiated) do |event|
db.exec(
"INSERT INTO finance.ledger (id, transaction_id, creditor_account, debtor_account, amount)
VALUES ($1, $2, $3, $4, $5)",
UUID.random, event.header.aggregate_id,
event.body.creditor_account, event.body.debtor_account, event.body.amount
)
end
apply(TransactionAccepted) do |event|
db.exec(
"UPDATE finance.ledger SET accepted_at = $1 WHERE transaction_id = $2",
Time.utc, event.header.aggregate_id
)
end
apply(TransactionRejected) do |event|
db.exec(
"UPDATE finance.ledger SET rejected_at = $1 WHERE transaction_id = $2",
Time.utc, event.header.aggregate_id
)
end
end
5. Wire Everything Together
require "crystal-es"
require "db"
require "pg"
db = DB.open(ENV["DATABASE_URL"])
ES::Config.configure do |c|
c.event_store = ES::Adapters::EventStores::Postgres.new(db)
c.queue = ES::Adapters::Queues::Postgres.new(db)
c.event_bus = ES::EventBus(ES::Command | ES::Projection).new
c.event_handlers = ES::EventHandlers.new
end
# Register event types for deserialization
ES::Config.event_handlers.register(TransactionInitiated)
ES::Config.event_handlers.register(TransactionAccepted)
ES::Config.event_handlers.register(TransactionRejected)
# Subscribe handlers to events
bus = ES::Config.event_bus
bus.subscribe(TransactionInitiated, ProcessTransaction)
bus.subscribe(TransactionInitiated, Ledger)
bus.subscribe(TransactionAccepted, Ledger)
bus.subscribe(TransactionRejected, Ledger)
# Initiate a transaction
aggregate_id = UUID.random
event = TransactionInitiated.new(
aggregate_id: aggregate_id,
body: TransactionInitiated::Body.new(
amount: 5_000_i64,
creditor_account: UUID.random,
debtor_account: UUID.random
)
)
ES::Config.event_store.append(event)
bus.publish(event)
Development
Start the development environment with Docker:
docker-compose up -d # starts PostgreSQL
make test # run the spec suite
Contributing
- Fork it (https://github.com/tristanholl/crystal-es/fork)
- Create your feature branch (
git checkout -b my-new-feature) - Commit your changes (
git commit -am 'Add some feature') - Push to the branch (
git push origin my-new-feature) - Create a new Pull Request
Contributors
- Tristan Holl - creator and maintainer
crystal-es
- 5
- 0
- 0
- 0
- 1
- 13 days ago
- October 10, 2024
MIT License
Fri, 05 Jun 2026 19:48:14 GMT