amqp-ng
amqp
AMQP 0-9-1 client for Crystal, written for RabbitMQ/LavinMQ and for real worker-service integration. The project is no longer pre-implementation: it has a working client under src/, executable specs under spec/, a local benchmark harness, and a v0.1.0 release branch.
Current status: v0.1.0 local release branch. It is usable as a path dependency today. Publishing a public shard release should still re-run the TLS live-broker, manual release-gate, and performance checks listed below.
Why This Exists
The existing Crystal AMQP clients either lag current Crystal internals or expose migration friction for this codebase. This shard keeps the v0 surface small:
- AMQP 0-9-1 over TCP or
amqps://. - RabbitMQ 3.13+ and LavinMQ 2.x as target brokers.
- Publisher confirms, mandatory returns, typed errors, and heartbeats.
- Queue/exchange topology operations and opt-in recovery.
- Practical
amqp-client.crmigration helpers. - No runtime dependencies outside Crystal stdlib.
AMQP 1.0 is planned as a future side-by-side protocol, not part of v0.1.0 runtime.
Install
For local integration:
dependencies:
amqp:
path: ../amqp-ng
For a Git dependency after pushing this branch:
dependencies:
amqp:
github: skuznetsov/amqp-ng
branch: main
Then:
shards install
Quick Start
require "amqp"
Amqp.connect("amqp://guest:guest@localhost:5672") do |conn|
ch = conn.channel
q = ch.queue_declare("", exclusive: true, auto_delete: true)
ch.publish("", q.name, "hello".to_slice)
msg = ch.get(q.name)
puts String.new(msg.not_nil!.body)
end
Durable Publish With Confirms
Amqp.connect(ENV["AMQP_URL"]? || "amqp://guest:guest@localhost:5672") do |conn|
ch = conn.channel
ch.confirm_select
ch.queue_declare("jobs.analyze", durable: true)
props = Amqp::Properties.new(
content_type: "application/json",
persistence: Amqp::Persistence::Persistent,
message_id: "job-123",
)
ok = ch.publish_confirm(
Amqp::Message.new(%({"id":123}), props),
"",
"jobs.analyze",
mandatory: true,
timeout: 5.seconds,
)
raise "publish was not acked" unless ok
end
For higher-throughput producer paths, publish confirmed messages in bounded windows instead of waiting for one broker round trip per message:
messages = jobs.map do |job|
Amqp::Message.new(job.to_json, props)
end
ok = ch.publish_confirm_batch(
messages,
"",
"jobs.analyze",
window_size: 100,
timeout: 10.seconds,
)
raise "confirm window failed" unless ok
Worker Consumer
Amqp.connect(ENV["AMQP_URL"]? || "amqp://guest:guest@localhost:5672") do |conn|
ch = conn.channel
ch.prefetch(1_u16)
ch.consume("jobs.analyze", auto_ack: false) do |msg|
begin
# process message
ch.ack(msg.delivery_tag)
rescue
ch.nack(msg.delivery_tag, requeue: true)
end
end
end
Queue/Exchange Wrappers
Amqp.connect("amqp://guest:guest@localhost:5672") do |conn|
ch = conn.channel
ex = ch.direct_exchange("amq.direct")
q = ch.queue("jobs.analyze", durable: true)
q.bind(ex.name, "jobs.analyze")
ex.publish(%({"kind":"analyze"}), "jobs.analyze")
end
Implemented Surface
| Area | Status |
|---|---|
| Connection lifecycle | connect, block form, close, server properties, blocked?, blocked/unblocked callbacks |
| Channel lifecycle | open/close, broker close mapping, channel.flow, close callbacks |
| Publishing | fire-and-forget, batch publish, sync confirms, windowed confirms, async confirms, mandatory returns |
| Consuming | get, consume, subscribe, basic_consume(work_pool:), ack, nack, reject, qos |
| Topology | queue/exchange declare, bind, unbind, purge, delete, wrapper objects |
| Transactions | tx_select, tx_commit, tx_rollback, transaction helper |
| Recovery | opt-in topology, consumer, and unconfirmed-publish replay; new operations fail fast during recovery |
| TLS | amqps:// via OpenSSL::SSL::Context::Client; live spec gated by env vars |
| Compatibility | basic_* aliases, queue/exchange wrappers, IO publish overloads, return/cancel/close callbacks |
| Observability | reduced Connection#stats counters plus typed exceptions |
Broker Compatibility
| Broker | Current evidence |
|---|---|
| RabbitMQ 3.13.x | Default suite green locally; manual CI covers plain-AMQP, TLS, backpressure, and Docker chaos gates |
| LavinMQ 2.4.0 | Default suite green locally; manual CI covers plain-AMQP, TLS, backpressure, and Docker chaos gates |
The default suite leaves broker-destructive or environment-specific specs pending unless you set the corresponding env vars:
AMQP_TLS_URL='amqps://guest:guest@localhost:5671/' \
AMQP_TLS_CA_CERT='.tmp/rabbitmq_tls/certs/ca_certificate.pem' \
AMQP_BACKPRESSURE_LIVE=1 \
AMQP_CHAOS_DOCKER_CONTAINER=amqp-ng-rabbit \
crystal spec --error-trace
Benchmarks
tools/perf_publish.cr is a checked-in local witness harness. The manual Perf Smoke workflow executes it on RabbitMQ/LavinMQ and validates that required JSON lanes are present with positive medians via tools/perf_smoke_assert.cr. This is still not a normative performance contract.
tools/perf_threshold_assert.cr can compare a saved benchmark JSON file against an explicit threshold profile. This is useful for local release gates and host-specific baselines, but docs/14-performance-contract.md remains a roadmap until the default-off spec/perf/ harness grows into a complete reproducible benchmark suite.
tools/perf_compare.cr <baseline.json> <current.json> compares two saved benchmark artifacts lane-by-lane across metrics and stages. It prints regressions, improvements, missing lanes, and new lanes. Set AMQP_BENCH_COMPARE_FAIL_REGRESSION_PCT to make the command exit nonzero when a matching lane regresses past a local threshold; set AMQP_BENCH_COMPARE_STRICT=1 to fail on metadata drift, missing baseline lanes, and new current-only lanes in one switch; or set AMQP_BENCH_COMPARE_FAIL_MISSING_CURRENT=1 to fail when a baseline lane is absent from the current artifact; set AMQP_BENCH_COMPARE_FAIL_NEW_LANE=1 to require explicit review for current-only lanes. New benchmark artifacts include the benchmark schema version, per-lane units, Crystal description, release/threading compile flags, and workload-shape fields such as message counts, body size, stage iterations, route fanout, and confirm windows; compare prints warnings when those stable context fields differ or when matching lanes use incompatible units. Set AMQP_BENCH_COMPARE_FAIL_METADATA=1 to turn those warnings into a nonzero exit for strict local gates.
Broad paired local release-compiler run:
- Crystal:
/opt/homebrew/bin/crystal 1.20.1 --release - Broker: local RabbitMQ 3.13.x
- Body: 256 bytes
- Shape:
AMQP_BENCH_PUBLISH_N=9000,AMQP_BENCH_CONFIRM_N=900,AMQP_BENCH_SAMPLES=3
| Workload | amqp-ng | amqp-client.cr | Notes |
|---|---|---|---|
| single fire-and-forget publish | ~83.6k msg/s | ~83.0k msg/s | same queue |
| batch publish | ~85.1k msg/s | n/a | publish_batch, batch size 100 |
| 2 connections, separate queues | ~705k msg/s | ~682k msg/s | queue-sharded producers |
| 4 connections, separate queues | ~625k msg/s | ~621k msg/s | queue-sharded producers |
| sync confirm per publish | ~2.48k msg/s | ~2.52k msg/s | one round trip per publish |
Latest paired local release-compiler runs after the confirm batching and heartbeat-stamp hot-path cleanup:
- Crystal:
/opt/homebrew/bin/crystal 1.20.1 --release - Brokers: local RabbitMQ 3.13.x and LavinMQ 2.4.0
- Body: 256 bytes
- Shape:
AMQP_BENCH_PUBLISH_N=15000,AMQP_BENCH_CONFIRM_N=1500,AMQP_BENCH_SAMPLES=5, channels/connections1,2,4
| Broker | Workload | amqp-ng | amqp-client.cr | Delta |
|---|---|---|---|---|
| RabbitMQ | single fire-and-forget publish | ~82.3k msg/s | ~72.6k msg/s | +13.5% |
| RabbitMQ | same-connection 4-channel publish | ~77.0k msg/s | ~67.1k msg/s | +14.8% |
| RabbitMQ | sync confirm per publish | ~2.68k msg/s | ~2.52k msg/s | +6.4% |
| RabbitMQ | windowed confirm, size 500 | ~65.0k msg/s | n/a | n/a |
| LavinMQ | single fire-and-forget publish | ~744.6k msg/s | ~771.5k msg/s | -3.5% |
| LavinMQ | same-connection 2-channel publish | ~822.4k msg/s | ~722.8k msg/s | +13.8% |
| LavinMQ | 4 connections, separate queues | ~694.0k msg/s | ~678.7k msg/s | +2.2% |
| LavinMQ | sync confirm per publish | ~3.46k msg/s | ~3.37k msg/s | +2.7% |
| LavinMQ | batch confirm + wait | ~33.6k msg/s | n/a | n/a |
Run the local harness:
AMQP_BENCH_PUBLISH_N=9000 \
AMQP_BENCH_CONFIRM_N=900 \
AMQP_BENCH_BATCH_SIZE=100 \
AMQP_BENCH_SAMPLES=3 \
AMQP_BENCH_CHANNELS=1,2,4 \
AMQP_BENCH_CONNECTIONS=1,2,4 \
crystal run tools/perf_publish.cr --release --error-trace
The largest measured throughput lane is multi-connection publishing to separate queues. Single-connection multi-channel publishing is bounded by the intentional connection write mutex.
The harness also emits publish_batch_bytes for the public publish_batch(Array(Bytes)) overload. That lane is measured separately from publish_batch, which uses prebuilt Amqp::Message values. It also emits synthetic stage-attribution lanes such as encode_empty_publish_frames, parse_basic_ack_frame_generic, and parse_basic_ack_frame_direct; set AMQP_BENCH_STAGE_N to control the number of synthetic stage iterations. These stage lanes are diagnostic triggers, not live throughput claims. For live single-publish attribution, the harness also emits publish_single_empty_body, publish_single_bytes_empty_body, publish_prepared_empty_body, publish_single_alternating_routes, and publish_single_repeat. These lanes help separate body-size cost, public API wrapper cost, prepared-route reuse, route-cache stability, and lane-order noise. Set AMQP_BENCH_ROUTE_COUNTS to add round-robin route-fanout lanes and AMQP_BENCH_BODY_SIZES to sweep live single-publish body sizes plus matching synthetic encode_empty_publish_frames_body_<n> stages. Set AMQP_BENCH_CONFIRM_WINDOWS to sweep confirm-window sizes; the tool emits confirm_window_<n> lanes that publish n messages, wait for confirms, and repeat. These lanes show the transport benefit of a ladder/windowed confirm style without changing application code. The harness also emits confirm_async and confirm_async_bytes so the ordinary Amqp::Message async-confirm path and the direct Bytes async-confirm path remain separately measurable. After a run, tools/perf_recommend.cr <bench.json> prints conservative run-local guidance, such as when batch publishing, prepared fixed-route publishers, route sharding, or body-size reduction are the highest leverage follow-ups. When comparing a branch against a saved baseline, run tools/perf_compare.cr <baseline.json> <current.json> first; it is a longitudinal drift detector, not a substitute for paired broker runs. Metadata warnings from that command mean the run is not an apples-to-apples throughput comparison until the compiler/build-mode/workload/lane-unit difference is explained. Use AMQP_BENCH_COMPARE_STRICT=1 for strict local release gates that must reject mixed contexts and lane schema drift in one switch. Use AMQP_BENCH_COMPARE_FAIL_METADATA=1 when a release gate only needs to reject mixed contexts instead of warning. Use AMQP_BENCH_COMPARE_FAIL_MISSING_CURRENT=1 when a release gate must reject disappeared baseline lanes. Use AMQP_BENCH_COMPARE_FAIL_NEW_LANE=1 when a release gate must reject unexpected benchmark schema expansion until the new lane is reviewed.
Executable spec/perf/ carriers are default-off.
Plain and TLS handshake p99 latency:
AMQP_PERF_LIVE=1 \
AMQP_PERF_HANDSHAKE_CONNECTIONS=1000 \
AMQP_PERF_HANDSHAKE_P99_MS=10 \
AMQP_PERF_TLS_HANDSHAKE_P99_MS=50 \
crystal spec spec/perf/t_perf_handshake_001_spec.cr --release --error-trace
Set AMQP_TLS_URL and optionally AMQP_TLS_CA_CERT to include the TLS variant; otherwise the TLS example remains pending.
Fire-and-forget single-channel throughput:
AMQP_PERF_LIVE=1 \
AMQP_PERF_WINDOW_SECONDS=10 \
AMQP_PERF_WARMUP_SECONDS=2 \
AMQP_PERF_PUB_001_MIN=200000 \
crystal spec spec/perf/t_perf_pub_001_spec.cr --release --error-trace
T-PERF-PUB-002 has the same default-off shape for async publisher-confirm throughput:
AMQP_PERF_LIVE=1 \
AMQP_PERF_WINDOW_SECONDS=10 \
AMQP_PERF_WARMUP_SECONDS=2 \
AMQP_PERF_PUB_002_MIN=50000 \
crystal spec spec/perf/t_perf_pub_002_spec.cr --release --error-trace
Same-connection multi-channel publish throughput:
AMQP_PERF_LIVE=1 \
AMQP_PERF_WINDOW_SECONDS=10 \
AMQP_PERF_WARMUP_SECONDS=2 \
AMQP_PERF_MULTICHAN_COUNT=8 \
AMQP_PERF_MULTICHAN_001_MIN=800000 \
crystal spec spec/perf/t_perf_multichan_001_spec.cr --release --error-trace
Preloaded manual-ack consume throughput:
AMQP_PERF_LIVE=1 \
AMQP_PERF_CONS_MESSAGES=1000000 \
AMQP_PERF_CONS_WARMUP_MESSAGES=10000 \
AMQP_PERF_CONS_PREFETCH=1000 \
AMQP_PERF_CONS_001_MIN=100000 \
crystal spec spec/perf/t_perf_cons_001_spec.cr --release --error-trace
Stats snapshot overhead:
AMQP_PERF_LIVE=1 \
AMQP_PERF_STATS_READS=1000000 \
AMQP_PERF_STATS_001_MAX_US=1 \
crystal spec spec/perf/t_perf_stats_001_spec.cr --release --error-trace
Idle connection and buffered-delivery heap deltas:
AMQP_PERF_LIVE=1 \
AMQP_PERF_MEM_CONNECTIONS=100 \
AMQP_PERF_MEM_DELIVERIES=100 \
AMQP_PERF_MEM_001_MAX_BYTES=65536 \
AMQP_PERF_MEM_002_MAX_BYTES=640 \
crystal spec spec/perf/t_perf_mem_001_002_spec.cr --release --error-trace
Fire-and-forget publish allocation pressure:
AMQP_PERF_LIVE=1 \
AMQP_PERF_GC_MESSAGES=1000000 \
AMQP_PERF_GC_001_MAX_BYTES_PER_MESSAGE=512 \
crystal spec spec/perf/t_perf_gc_001_spec.cr --release --error-trace
Broker-restart recovery dead-window median:
AMQP_PERF_LIVE=1 \
AMQP_CHAOS_DOCKER_CONTAINER=rabbitmq \
AMQP_PERF_RECOV_SAMPLES=5 \
AMQP_PERF_RECOV_001_MAX_MS=2000 \
crystal spec spec/perf/t_perf_recov_001_spec.cr --release --error-trace
It writes local result artifacts under spec/perf/results/, which are ignored by git. Keep AMQP_PERF_*_MIN values host-specific until perf CI owns a normalized baseline.
Latest cross-broker release-compiler smoke after the bytes-batch fast path:
- Crystal:
/opt/homebrew/bin/crystal 1.20.1 --release - Brokers:
rabbitmq:3.13-managementon5672,cloudamqp/lavinmq:2.4.0on5673 - Full shape:
AMQP_BENCH_PUBLISH_N=30000,AMQP_BENCH_CONFIRM_N=3000,AMQP_BENCH_SAMPLES=3, batch size100, channels/connections1,2,4 - Batch-focused shape: same counts with
AMQP_BENCH_SAMPLES=7, channels/connections1
| Workload | RabbitMQ | LavinMQ | Notes |
|---|---|---|---|
| single fire-and-forget publish | ~79.3k msg/s | ~754.0k msg/s | batch-focused run |
| prebuilt-message batch | ~87.5k msg/s | ~736.9k msg/s | Array(Amqp::Message), batch-focused run |
| bytes batch | ~85.4k msg/s | ~845.2k msg/s | public Array(Bytes) overload, batch-focused run |
| 4 connections, separate queues | ~795.5k msg/s | ~777.2k msg/s | queue-sharded producers, full run |
| sync confirm per publish | ~2.96k msg/s | ~4.29k msg/s | one round trip per publish, batch-focused run |
| batch confirm + wait | ~74.1k msg/s | ~53.7k msg/s | batch size 100, batch-focused run |
The bytes-batch fast path is a lower-allocation public API path, not a proved live-throughput win over callers that already prebuild Amqp::Message batches. On these local runs, socket/broker behavior dominates the publish lanes. Batch publish also reuses a prebuilt basic.publish method frame for every message in a batch, guarded by a wire-level byte-equivalence spec.
Verification
Current default local gate:
crystal spec --error-trace
Latest no-broker result: 229 examples, 0 failures, 0 errors, 92 pending. Most pending examples are live-broker specs that intentionally skip when AMQP_URL is unreachable. Checked-in no-broker CI runs this gate, format checks, and local tool type-checks on pinned Crystal 1.19.2 and 1.20.2. Checked-in plain-AMQP broker CI also runs the live spec surface against RabbitMQ 3.13.7 and LavinMQ 2.4.0 on Crystal 1.20.2. A manual Perf Smoke workflow runs the benchmark harness with tiny default counts, validates required positive JSON lanes, and stores JSON artifacts; it is a harness health check, not a throughput contract.
Latest focused LavinMQ publish/confirm smoke: 72 examples, 0 failures, 0 errors, 0 pending for spec/message_spec.cr, spec/api_surface_spec.cr, spec/channel_spec.cr, and spec/confirms_spec.cr. Latest focused LavinMQ no-wait/API smoke: 39 examples, 0 failures, 0 errors, 0 pending for spec/wire/no_wait_methods_spec.cr, spec/api_surface_spec.cr, and spec/channel_spec.cr.
Additional gates used for this branch:
crystal tool format --check src spec tools/perf_publish.cr tools/capture_proxy.cr tools/perf_smoke_assert.cr tools/perf_smoke_assertions.cr tools/perf_thresholds.cr tools/perf_threshold_assert.cr tools/perf_recommendations.cr tools/perf_recommend.cr tools/perf_compare_report.cr tools/perf_compare.cr
crystal build tools/perf_publish.cr --no-codegen --error-trace
crystal build tools/capture_proxy.cr --no-codegen --error-trace
crystal build tools/perf_smoke_assert.cr --no-codegen --error-trace
crystal build tools/perf_smoke_assertions.cr --no-codegen --error-trace
crystal build tools/perf_threshold_assert.cr --no-codegen --error-trace
crystal build tools/perf_recommend.cr --no-codegen --error-trace
crystal build tools/perf_compare.cr --no-codegen --error-trace
git diff --check
With a plain AMQP broker available, the remaining default pending specs are environment gated:
- TLS broker handshake via
AMQP_TLS_URL. - Subscription backpressure timing via
AMQP_BACKPRESSURE_LIVE. - Broker pause/restart chaos via
AMQP_CHAOS_DOCKER_CONTAINER.
The checked-in broker CI covers the plain-AMQP live surface. Manual Release Gates workflow jobs cover TLS, backpressure, and Docker chaos gates for RabbitMQ/LavinMQ. The manual Perf Smoke workflow covers benchmark harness execution plus positive JSON lane validation. Thresholded performance profile checks are available locally through tools/perf_threshold_assert.cr, but normative reproducible benchmark runners remain opt-in/local release checks.
URI And Config
Supported URI query keys in v0:
heartbeatchannel_maxframe_maxmax_body_sizemax_inflight_body_bytesmax_subscription_mailbox_bytesconnect_timeoutrecoveryproductinformationtcp_nodelaybuffer_size
TLS policy is supplied by the caller through OpenSSL::SSL::Context::Client. SASL EXTERNAL, AMQPLAIN, OAuth, and TLS client-certificate auth are deferred.
Migration From amqp-client.cr
Implemented compatibility helpers include:
basic_publish, callback publish,basic_publish_confirmbasic_get,basic_consume,basic_cancelbasic_ack,basic_reject,basic_nack,basic_qos,basic_recoverChannel#queue,#exchange,#default_exchange,#direct_exchange,#topic_exchange,#fanout_exchange,#header_exchangeAmqp::QueueandAmqp::Exchangewrapperson_return,on_cancel,on_close- IO + explicit byte-size publish overloads
Compatibility notes:
no_waitcompatibility overloads on queue declare, queue bind, exchange bind/unbind, and basic cancel emit broker no-wait frames.- NamedTuple
argssugar is accepted for topology/consumer helpers and coerced intoAmqp::Arguments; use explicitAmqp::Argumentswhen you need to make AMQP field widths visually obvious. - WebSocket transport is a future transport project, not a v0 Crystal-shard parity blocker.
Design Tradeoffs
- The connection has one write mutex. This keeps publish frame groups atomic and simple. For high aggregate publish throughput, use multiple connections.
- Recovery is opt-in. During recovery, new caller operations fail fast with
RecoveryInProgress; the client does not queue arbitrary new work behind a reconnect. - Consumer backpressure is per-channel first. If a subscription inbox fills, that channel handler stalls; whole-connection pressure requires the channel frame inbox to fill too.
- The stdlib-only constraint is intentional. The shard avoids reaching into Crystal stdlib internals.
Deferred
- AMQP 1.0 runtime. See
docs/22-amqp-1-0-sdd.md. - SASL EXTERNAL and other non-PLAIN auth mechanisms.
- Complete
spec/perf/reproducible benchmark suite. - Checked-in CI for normative thresholded performance gates.
- Broader reliability transcript corpus.
- WebSocket transport.
Documentation Map
docs/02-public-api.md- public API contract.docs/05-wire-0-9-1/- AMQP 0-9-1 wire codec.docs/07-channel-lifecycle.md- channel state, confirms, flow, close semantics.docs/08-publisher-confirms.md- confirm tracker and return/nack behavior.docs/09-consumer.md- consume/subscribe/get and backpressure.docs/12-recovery.md- opt-in recovery contract.docs/14-performance-contract.md- non-normative performance roadmap.docs/16-falsifier-matrix.md- falsifier/test matrix.TODO.mdandLANDMARKS.md- current working ledger and verified anchors.
amqp-ng
- 2
- 0
- 0
- 0
- 0
- 15 days ago
- May 14, 2026
Sat, 23 May 2026 22:42:31 GMT