couchdb.cr v0.2.1
couchdb.cr
A Crystal shard for CouchDB — local-first storage backed by SQLite3 that replicates to and from a remote CouchDB server. Inspired by PouchDB.
Queries are answered instantly from local SQLite storage. Replication syncs with a remote CouchDB instance in the background, making it well-suited for offline-capable applications.
Features
- Local-first: all reads and writes go to a local SQLite3 database — no network required
- CouchDB replication: 7-step protocol with resumable checkpoints; sync to/from any CouchDB 3.x server
- Continuous sync:
Database.local_replicasets up live bidirectional replication in the background; choose local or upstream write semantics - Typed documents: subclass
CouchDB::Documentto add strongly-typed fields to your models - Open schema: unknown fields are preserved transparently through
json_unmapped - Auto-routing: pass a file path for SQLite, pass an
http://URL for a remote CouchDB
Installation
Add to your shard.yml:
dependencies:
couchdb:
github: threez/couchdb.cr
version: ~> 0.2
Then run:
shards install
Quick Start
require "couchdb"
db = CouchDB::Database.new("notes.db")
# Create a document
doc = CouchDB::Document.new
doc.id = "hello"
doc["message"] = JSON::Any.new("world")
result = db.put(doc)
puts result[:rev] # => "1-5d41402abc4b2a76b9719d911017c592"
# Read it back
fetched = db.get("hello")
puts fetched["message"].as_s # => "world"
puts fetched.rev # => "1-5d41402abc4b2a76b9719d911017c592"
# Update — provide the current rev
fetched["message"] = JSON::Any.new("updated")
fetched.rev = result[:rev]
db.put(fetched)
# Delete
db.remove("hello", fetched.rev!)
Typed Documents
Subclass CouchDB::Document to define strongly-typed models. Subclass fields are serialized as top-level JSON keys — they live alongside _id, _rev, and any other dynamic fields.
class Note < CouchDB::Document
property title : String = ""
property body : String = ""
property tags : Array(String) = [] of String
end
db = CouchDB::Database.new("notes.db")
note = Note.new
note.id = "note-1"
note.title = "Shopping list"
note.body = "Milk, eggs, bread"
db.put(note)
# Retrieve as the typed subclass
fetched = db.get("note-1", as: Note)
puts fetched.title # => "Shopping list"
puts fetched.id # => "note-1"
puts fetched.rev # => "1-..."
# List all notes as typed objects
result = db.all_docs(as: Note)
result[:rows].each { |note| puts note.title }
Extra fields not declared on the subclass are still preserved in json_unmapped and round-trip through replication without loss.
Document API
CouchDB::Document provides:
| Member | Type | Description |
|---|---|---|
id |
String |
Maps to _id in JSON |
rev |
String? |
Maps to _rev; nil for new documents |
deleted |
Bool? |
Maps to _deleted; nil for normal documents |
deleted? |
Bool |
Predicate — returns true when deleted == true |
next_rev |
String |
Computes what the next revision string would be |
json_unmapped |
Hash(String, JSON::Any) |
All fields not covered by declared properties |
doc["key"] |
JSON::Any |
Hash-style read (routes _id/_rev/_deleted to typed fields) |
doc["key"] = v |
— | Hash-style write |
doc["key"]? |
JSON::Any? |
Hash-style read, returns nil if absent |
Database API
db = CouchDB::Database.new(location)
location is auto-detected:
"http://..."or"https://..."→ remote CouchDB via HTTP- anything else → local SQLite (
.dbextension appended if not present;":memory:"for in-memory)
CRUD
db.get(id) # => Document (raises NotFound)
db.get(id, as: MyDoc) # => MyDoc (typed subclass)
db.put(doc) # => {ok:, id:, rev:} (raises Conflict)
db.remove(id, rev) # => {ok:}
db.bulk_docs(docs) # => [{id:, rev:, ok:}, ...]
db.bulk_docs(docs, new_edits: false) # replication write path
Query
db.all_docs # all non-deleted docs
db.all_docs(include_docs: true, limit: 50, skip: 0)
db.all_docs(startkey: "a", endkey: "m") # range [a, m] inclusive
db.all_docs(startkey: "note-", endkey: "note-\uffff") # prefix scan
db.all_docs(as: Note) # typed rows — implies include_docs: true
db.all_docs(as: Note, limit: 50, startkey: "note-") # typed + range/pagination
db.changes(since: "0") # changes feed (snapshot)
db.changes(since: seq, limit: 100, include_docs: true)
db.info # => {db_name:, doc_count:, update_seq:}
Changes Feed
changes_feed opens a persistent streaming connection, yielding each change to a block. Call break to stop.
# Stream all changes from the beginning
db.changes_feed(since: "0") do |change|
puts "#{change["id"]} changed (seq #{change["seq"]})"
break if done?
end
# Pick up only changes after a known sequence, embedding full doc bodies
db.changes_feed(since: last_seq, include_docs: true) do |change|
process(change["doc"])
save_checkpoint(change["seq"].as_s)
break if shutting_down?
end
| Parameter | Default | Description |
|---|---|---|
since |
"0" |
Starting sequence (exclusive). "0" yields all changes. |
heartbeat |
1000 |
Polling interval in ms (SQLite) or CouchDB heartbeat in ms (HTTP). |
include_docs |
false |
Embed full document bodies in each change entry. |
SQLite: polls update_seq in a loop, sleeping heartbeat ms between polls. HTTP: opens a feed=continuous connection to CouchDB and reads the response body line by line.
Query (map/reduce)
query runs an in-memory map/reduce over all documents, PouchDB-style. Pass a block that calls emit for each key/value pair you want in the result; rows are sorted by key using CouchDB collation order (null < false < true < numbers < strings < arrays < objects).
Basic emit:
result = db.query do |doc, emit|
emit.call(JSON::Any.new(doc["type"].as_s), JSON::Any.new(1_i64))
end
result[:rows].each { |r| puts "#{r["key"]} → #{r["value"]}" }
# result[:total_rows] — total after filtering, before skip/limit
# result[:offset] — the skip value used
Key filtering:
# Exact key
db.query(key: JSON::Any.new("note")) { |doc, emit| ... }
# Multiple exact keys
db.query(keys: [JSON::Any.new("note"), JSON::Any.new("task")]) { |doc, emit| ... }
# Inclusive range
db.query(startkey: JSON::Any.new("b"), endkey: JSON::Any.new("d")) { |doc, emit| ... }
Pagination and ordering:
db.query(limit: 10, skip: 20) { |doc, emit| ... }
db.query(descending: true) { |doc, emit| ... }
# descending with bounds — pass the higher key as startkey:
db.query(descending: true, startkey: JSON::Any.new("z"), endkey: JSON::Any.new("a")) { |doc, emit| ... }
Embedding full documents:
result = db.query(include_docs: true) do |doc, emit|
emit.call(JSON::Any.new(doc["type"].as_s), JSON::Any.new(nil))
end
result[:rows].each { |r| puts r["doc"]["title"] }
Reduce functions:
# _count — total number of emitted rows
db.query(reduce: "_count") { |doc, emit| emit.call(..., ...) }
# => [{key: null, value: 42}]
# _count with grouping — one row per key
db.query(reduce: "_count", group: true) { |doc, emit| emit.call(JSON::Any.new(doc["type"].as_s), ...) }
# => [{key: "note", value: 10}, {key: "task", value: 5}]
# _sum — sums numeric values (raises ArgumentError on non-numeric)
db.query(reduce: "_sum") { |doc, emit| emit.call(JSON::Any.new(nil), JSON::Any.new(doc["score"].as_i64)) }
# _stats — returns sum/count/min/max/sumsq (raises ArgumentError on non-numeric)
db.query(reduce: "_stats") { |doc, emit| emit.call(JSON::Any.new(nil), JSON::Any.new(doc["score"].as_i64)) }
# => [{key: null, value: {sum: 60.0, count: 3, min: 2.0, max: 30.0, sumsq: 1400.0}}]
group_level with array keys — truncates composite keys to the first N elements before grouping:
# Emit [year, month] keys, then group by year only
db.query(reduce: "_count", group_level: 1) do |doc, emit|
key = JSON::Any.new([JSON::Any.new(doc["year"].as_i64), JSON::Any.new(doc["month"].as_i64)])
emit.call(key, JSON::Any.new(nil))
end
# => [{key: [2024], value: 12}, {key: [2025], value: 3}]
query performs a full in-memory scan — it is suited for small-to-medium datasets and ad-hoc indexing. For very large databases, use all_docs range queries instead.
Find (Mango selectors)
find runs an in-memory Mango-style selector query over all documents, PouchDB/CouchDB-style. Instead of a Crystal block, pass a JSON hash describing the conditions; find handles filtering, sorting, projection, and pagination.
Basic usage:
result = db.find(JSON.parse(%({"type": "note"})))
result[:docs].each { |doc| puts doc["title"] }
# result[:docs] — Array(JSON::Any) of matching documents
# result[:warning] — always present; full scan, no index used
Field projection — restrict keys returned per document:
result = db.find(JSON.parse(%({"type": "note"})), fields: ["_id", "title", "author"])
result[:docs].first.as_h.keys # => ["_id", "title", "author"]
# Dot-notation paths are stored flat: "address.city" becomes a top-level key in the result
Sorting — pass an array of field names (ascending) or single-key hashes with "asc"/"desc":
# Ascending (bare string)
db.find(sel, sort: [JSON::Any.new("name")])
# Descending (single-key hash)
db.find(sel, sort: [JSON.parse(%({"score": "desc"}))])
# Multi-key: primary sort by group, secondary by rank
db.find(sel, sort: [JSON::Any.new("group"), JSON::Any.new("rank")])
Pagination:
db.find(sel, limit: 10, skip: 20)
Operator reference:
| Operator | Description | Example condition |
|---|---|---|
$eq |
Equal (default for bare values) | {"$eq": "note"} |
$ne |
Not equal | {"$ne": "deleted"} |
$lt |
Less than | {"$lt": 100} |
$lte |
Less than or equal | {"$lte": 100} |
$gt |
Greater than | {"$gt": 0} |
$gte |
Greater than or equal | {"$gte": 0} |
$exists |
Field presence | {"$exists": true} |
$type |
JSON type check | {"$type": "string"} |
$in |
Value in set | {"$in": ["a", "b"]} |
$nin |
Value not in set | {"$nin": ["x"]} |
$all |
Array contains all | {"$all": ["a", "b"]} |
$size |
Array length | {"$size": 3} |
$mod |
Integer modulo | {"$mod": [2, 0]} (even) |
$regex |
String matches regex | {"$regex": "^Al"} |
$elemMatch |
Array element matches sub-selector | {"$elemMatch": {"score": {"$gt": 5}}} |
$not |
Negate field condition | {"$not": {"$gt": 10}} |
$and |
All sub-selectors match | {"$and": [{"a": 1}, {"b": 2}]} |
$or |
Any sub-selector matches | {"$or": [{"type": "a"}, {"type": "b"}]} |
$nor |
No sub-selector matches | {"$nor": [{"deleted": true}]} |
Valid $type values: "null", "boolean", "number", "string", "array", "object".
Comparisons ($lt, $gt, etc.) use the same CouchDB collation order as query (null < false < true < numbers < strings < arrays < objects), so mixed-type fields sort predictably.
Note:
warningis always present in the result becausefindalways does a full scan — there is no index. The message prompts you to create an index if performance matters.
Conflict Resolution
Register a hook on a Database instance to handle put or remove conflicts automatically instead of rescuing Conflict manually.
on_conflict — invoked when put raises Conflict (stale _rev):
db.on_conflict do |existing, attempted|
# existing — the current document in the database (fresh rev)
# attempted — the document you tried to write
# Return a Document to retry with (rev is set automatically), or nil to re-raise.
attempted # last-write-wins
end
on_remove_conflict — invoked when remove raises Conflict:
db.on_remove_conflict do |existing, attempted_rev|
# existing — the current document in the database
# attempted_rev — the stale rev you passed to remove
# Return true to retry the delete with the current rev, or nil to re-raise.
true
end
Field-merge example:
db.on_conflict do |existing, attempted|
merged = CouchDB::Document.new
merged.id = existing.id
merged["count"] = JSON::Any.new(existing["count"].as_i + attempted["count"].as_i)
merged
end
- A second conflict on retry propagates without re-invoking the hook (no infinite loop).
- Raise inside the hook to propagate a custom exception.
- Hooks apply to
putandremoveonly;bulk_docsand attachment methods are unaffected.
Attachments
# Store a binary attachment (creates a new document revision)
rev = db.put(doc)[:rev]
db.put_attachment("doc-id", "photo.jpg", rev, bytes, "image/jpeg") # => {ok:, id:, rev:}
# Retrieve raw bytes
att = db.get_attachment("doc-id", "photo.jpg") # => {data: Bytes, content_type:}
File.write("photo.jpg", att[:data])
# Delete an attachment (creates a new document revision)
db.delete_attachment("doc-id", "photo.jpg", att_rev) # => {ok:, id:, rev:}
Attachment metadata (content type, length) is stored as a stub in the document's _attachments field. Binary data is stored separately (in an attachments SQLite table for the local adapter, or via native CouchDB attachment endpoints for the HTTP adapter).
Replication
local = CouchDB::Database.new("myapp.db")
remote = CouchDB::Database.new("https://admin:secret@db.example.com/myapp")
local.sync(remote) # pull then push — full bidirectional sync
local.replicate_from(remote) # pull only
local.replicate_to(remote) # push only
sync and replicate_* return a CouchDB::Replication::Session:
session = local.replicate_to(remote)
puts session.ok? # true / false
puts session.docs_written # number of documents transferred
puts session.docs_read # number of documents fetched from source
puts session.last_seq # last sequence number processed
puts session.error # error message if ok == false
Replication is resumable — a checkpoint is written to both source and target after every batch of 100 documents, so interrupted replications restart from where they left off.
Continuous Sync
Database.local_replica creates a local SQLite database that continuously syncs bidirectionally with a remote CouchDB. Two background fibers (push and pull) run for the lifetime of the object. Checkpoints are stored on the remote only, so deleting and recreating the local file resumes from where replication left off.
db = CouchDB::Database.local_replica("notes.db",
"https://admin:secret@db.example.com/notes")
# Use db exactly like a regular Database — reads/writes go to local SQLite.
db.put(note)
db.get("note-1")
db.close # stops sync fibers and closes both adapters
Write modes
Local writes (default) — put/remove are instant (local SQLite only). The background push fiber syncs them to the remote asynchronously.
db = CouchDB::Database.local_replica("notes.db", remote_url)
result = db.put(note) # returns immediately — written to local SQLite
db.get(result[:id]) # available locally right away
# … push fiber syncs to remote in the background
Upstream writes (write_upstream: true) — put/remove write to the remote first, then block until the change has been replicated locally. Reads always come from local.
db = CouchDB::Database.local_replica("notes.db", remote_url,
write_upstream: true)
result = db.put(note) # writes to remote, then waits for local copy
db.get(result[:id]) # guaranteed to be present immediately
Use upstream mode when you need read-your-own-writes consistency across devices, or when the local store is treated as a cache rather than the source of truth.
Timeout: upstream
put/removeraiseCouchDB::Errorif the change does not replicate locally within 5 seconds.
Heartbeat
heartbeat controls how often (in ms) the sync fibers poll for new changes. Lower values mean lower latency but more frequent network/DB activity.
db = CouchDB::Database.local_replica("notes.db", remote_url,
heartbeat: 500) # poll every 500 ms (default: 2000)
Error handling
Register on_sync_error to be notified when a background replication run fails (network drop, auth error, etc.). The callback receives the direction ("push" or "pull") and the exception. Without a registered callback, sync errors are silently swallowed so the background fibers keep running.
db = CouchDB::Database.local_replica("notes.db", remote_url)
db.on_sync_error do |direction, ex|
case ex
when CouchDB::Unauthorized
puts "#{direction}: credentials rejected"
db.bearer_token = refresh_token()
else
puts "#{direction} sync error: #{ex.message}"
end
end
Conflict resolution
Concurrent edits on both sides can produce Conflict errors during the push sync. Register on_conflict on the replica to resolve them automatically:
db = CouchDB::Database.local_replica("notes.db", remote_url)
# Last-write-wins: the local (attempted) version always wins
db.on_conflict do |existing, attempted|
attempted
end
# Field-merge: combine numeric fields from both sides
db.on_conflict do |existing, attempted|
merged = attempted.dup
merged["count"] = JSON::Any.new(
existing["count"].as_i64 + attempted["count"].as_i64
)
merged
end
In upstream write mode, conflicts happen on the remote during put. The same on_conflict hook applies — returning a Document retries the remote write and then waits for the resolved document to replicate locally.
db = CouchDB::Database.local_replica("notes.db", remote_url,
write_upstream: true)
db.on_conflict { |existing, _attempted| existing } # remote always wins
Error Handling
begin
db.get("missing")
rescue CouchDB::NotFound => e
puts e.message # "Document not found: missing"
end
begin
db.put(doc_with_wrong_rev)
rescue CouchDB::Conflict => e
# Fetch the latest rev and retry
end
| Exception | When |
|---|---|
CouchDB::NotFound |
get on a non-existent or deleted document |
CouchDB::Conflict |
put/remove with a stale or missing revision |
CouchDB::Unauthorized |
HTTP 401 from remote CouchDB |
CouchDB::BadRequest |
Missing _id, missing _rev on replication write, etc. |
CouchDB::ReplicationError |
Unrecoverable failure during replication |
All exceptions inherit from CouchDB::Error < Exception.
Architecture
CouchDB::Database public facade — auto-detects adapter
└── LocalReplica continuous bidirectional sync (local_replica)
|
CouchDB::Adapter abstract interface
├── Adapter::SQLite local storage via crystal-db + crystal-sqlite3
└── Adapter::HTTP remote CouchDB via HTTP::Client
|
CouchDB::Replication::Replicator 7-step CouchDB protocol
├── Replication::Checkpoint _local/ checkpoint read/write (remote only for LocalReplica)
└── Replication::Session result object for one replication run
SQLite schema
Four tables underpin the local adapter:
| Table | Purpose |
|---|---|
docs |
Every revision of every document (enables revs_diff) |
revs |
Parent-revision linkage tree |
local_docs |
_local/ documents — checkpoints, never replicated |
update_seq |
Append-only sequence log; ROWID is the update_seq |
attachments |
Current binary attachment data keyed by (doc_id, name) |
The "winning" revision is the one with the highest seq for a given id. Deleted documents are soft-deleted (a deleted=1 row is stored) so their revisions remain queryable for replication.
Development
shards install
crystal spec # all specs run without a CouchDB instance
To run e2e tests against a live server (optional):
# Option A: locally installed goydb
make goydb # starts goydb on :7070 (foreground)
make e2e # in another terminal
# Option B: Docker
docker run -d -p 7070:7070 ghcr.io/goydb/goydb:latest
make e2e
COUCHDB_URL defaults to http://admin:secret@localhost:7070. Override to point at any CouchDB-compatible server.
Contributing
- Fork it (https://github.com/threez/couchdb.cr/fork)
- Create your feature branch (
git checkout -b my-new-feature) - Add specs for your change and make sure
crystal specpasses - Commit your changes (
git commit -am 'Add some feature') - Push to the branch (
git push origin my-new-feature) - Open a Pull Request
Contributors
- Vincent Landgraf — creator and maintainer
couchdb.cr
- 0
- 0
- 0
- 0
- 4
- about 1 hour ago
- March 18, 2026
MIT License
Mon, 23 Mar 2026 19:38:12 GMT