cml
Crystal Concurrent ML (CML)
A minimal, composable, and correct Concurrent ML runtime for Crystal - built from first principles using events, channels, and fibers.
Concurrent ML (CML) is a message-passing concurrency model introduced by John Reppy. It extends synchronous channels with first-class events that can be composed, chosen, or canceled safely.
Compatibility Note: CML works without compile flags in default fiber mode. Pass -Dpreview_mt -Dexecution_context only when you want multithreaded execution contexts and thread-safe Sync primitives.
1. Overview
This library provides a small but complete CML implementation in pure Crystal. It adds a higher-level event layer on top of Crystal's built-in channels and fibers.
Core features:
Event(T)abstraction for synchronization- Atomic commit cell (
Pick) ensuring only one event in a choice succeeds Chan(T)supporting synchronous rendezvous communication- Event combinators:
choose,wrap,guard,nack,timeout - Fully deterministic, non-blocking registration semantics
- Fiber-safe cancellation and cleanup
Design principles:
- One pick, one commit: Exactly one event in a choice succeeds
- Zero blocking in registration:
try_registernever blocks - Deterministic behavior: Predictable regardless of scheduling
- Memory safe: No recursion in structs, proper cleanup
2. Recent Stability Updates
Recent runtime fixes focused on kill-safe parity, timeout reliability, and lock contention reduction:
TransactionIdcleanup/cancel paths are race-safe and cleanup callbacks run at most once.cancel_transactions_forremoves transactions under lock and performs cancel outside lock.TimeoutEventuses per-transaction timer ids andTimerWheelscheduling, so cancellation cannot leak across concurrent waiters.TimerWheelruns callbacks outside its internal mutex to avoid callback-under-lock deadlocks and reduce contention.CVar#set!resumes only active waiters, preventing stale committed/cancelled transactions from being resumed.- Blocking specs were hardened with explicit event timeouts to fail fast instead of hanging indefinitely.
- Distributed tuple transport now uses CML socket events (
CML::Socket.send_evt/recv_evt) instead of directTCPSocketreads/writes. - A repository policy spec (
spec/io_safety_policy_spec.cr) enforces that distributed tuple transport does not regress to direct socket IO calls.
2.1 Parallel IO Safety Policy
Crystal IO objects are not generally safe for concurrent access from multiple threads. In this codebase:
- Concurrent runtime IO paths must use CML event wrappers (
CML::Socket.*,CML::PrimitiveIO.*) instead of directIO/TCPSocketcalls. - Transport layers should treat each socket as owned by transport fibers and communicate via channels/mailboxes.
- If you add/modify distributed tuple transport, keep
spec/io_safety_policy_spec.crpassing.
3. Installation
Add this to your shard.yml:
dependencies:
cml:
github: your-username/cml.cr
Then run:
shards install
4. Quickstart
After/Timeout Helper
CML.after(1.second) { puts "Timeout reached!" }
Spawning a Worker and Waiting for Result
result_evt = CML.spawn_evt { compute_something() }
CML.sync(result_evt)
Pipeline with Channels
ch1 = CML::Chan(Int32).new
ch2 = CML::Chan(String).new
CML.after(0.seconds) { ch1.send(42) }
CML.after(0.seconds) { ch2.send("done") }
CML.sync(CML.choose([ch1.recv, ch2.recv]))
See the Cookbook for more idioms and patterns.
Basic Channel Communication
require "cml"
# Create a channel for integers
ch = CML::Chan(Int32).new
# Spawn a sender
spawn { CML.sync(ch.send_evt(99)) }
# Receiver waits synchronously
val = CML.sync(ch.recv_evt)
puts val # => 99
Racing Events with Timeout
# Use choose to race a receive against a timeout
evt = CML.choose([
ch.recv_evt,
CML.wrap(CML.timeout(1.second)) { |_t| "timeout" }
])
puts CML.sync(evt) # => "timeout" if no message arrives
Event Composition & DSL Helpers
string_evt = CML.wrap(ch.recv_evt) { |x| "Received: #{x}" }
lazy_evt = CML.guard { expensive_computation_evt }
safe_evt = CML.nack(ch.recv_evt) { puts "Event was cancelled!" }
# After/Timeout helper
CML.after(2.seconds) { puts "done after 2s" }
# Spawn a fiber and get result as event
evt = CML.spawn_evt { 123 }
CML.sync(evt) # => 123
IO & Socket Helpers
# Non-blocking reads/writes as events
line = CML.sync(CML.read_line_evt(STDIN))
CML.sync(CML.write_evt(STDOUT, "ok\n".to_slice))
# Channel-backed streams (in-process piping)
ch = CML.channel(String)
reader = CML.open_chan_in(ch)
writer = CML.open_chan_out(ch)
CML.sync(CML.write_line_evt(writer, "hello"))
puts CML.sync(CML.read_line_evt(reader)) # => "hello\n"
# TCP/UDP helpers with cancellation support
sock_evt = CML::Socket.connect_evt("example.com", 80)
resp = CML.sync(CML.choose([
CML.wrap(sock_evt) { |sock| CML.sync(CML::Socket.send_evt(sock, "ping".to_slice)) },
CML.wrap(CML.timeout(100.milliseconds)) { :timeout },
]))
5. Core API
Events
CML.sync(evt)- Synchronize on an eventCML.always(value)- Event that always succeedsCML.never- Event that never succeedsCML.timeout(duration)- Time-based eventCML.after(span) { ... }- Run a block after a delay (helper)CML.spawn_evt { ... }- Run a block in a fiber, return result as event (helper)
Combinators
CML.choose(events)- Race multiple eventsCML.wrap(evt, &block)- Transform event resultCML.guard(&block)- Lazy event constructionCML.nack(evt, &block)- Cancellation cleanup
Channels
CML::Chan(T).new- Create a synchronous channelchan.send_evt(value)- Send eventchan.recv_evt- Receive event
Optional helpers
CML.after(span) { ... },CML.spawn_evt { ... },CML.sleep(span)- IO helpers:
read_evt,read_line_evt,read_all_evt,write_evt,flush_evt - Socket helpers: TCP
Socket.accept_evt/Socket.connect_evt/Socket.recv_evt/Socket.send_evt, UDPSocket::UDP.send_evt/Socket::UDP.recv_evt - Channel-backed IO:
open_chan_in,open_chan_out - Linda tuple-space implementation in
src/cml/tuple.cr(CML::TupleLib) with distributedjoin_tuple_space(local_port:, remote_hosts:)support using default port7001;remote_hostsacceptshost,host:port, and bracketed IPv6 forms such as[::1]:7001
6. Advanced Usage
Nested Choices
inner = CML.choose([evt1, evt2])
outer = CML.choose([inner, evt3])
result = CML.sync(outer)
Multiple Concurrent Channels
ch1 = CML::Chan(Int32).new
ch2 = CML::Chan(String).new
evt = CML.choose([
CML.wrap(ch1.recv_evt) { |x| "Number: #{x}" },
CML.wrap(ch2.recv_evt) { |s| "String: #{s}" }
])
Re-entrant Guards
evt = CML.guard do
if some_condition
CML.always(:ready)
else
CML.timeout(1.second)
end
end
7. Documentation
- Overview & Architecture - Deep dive into event semantics
- CML Manual (Crystal) - Detailed reference mirroring the SML/NJ CML docs
- Examples - Working code examples
- Cookbook - Common idioms and patterns
8. Running Tests
CRYSTAL_CACHE_DIR=$PWD/.crystal-cache crystal spec
# Optional: enable multithreaded execution-context specs
CRYSTAL_CACHE_DIR=$PWD/.crystal-cache crystal spec -Dpreview_mt -Dexecution_context
9. Contributing
See CONTRIBUTING.md for development guidelines and AGENTS.md for AI agent contribution rules.
10. License
MIT License - see LICENSE file for details.
11. Tracing & Instrumentation
CML includes a macro-based tracing system for debugging and performance analysis:
- Zero-overhead when disabled: Tracing is compiled out unless
-Dtraceis passed. - Event IDs: Every event and pick has a unique ID for correlation.
- Fiber context: Trace output includes the current fiber (or user-assigned fiber name).
- Outcome tracing: Commit/cancel outcomes are logged for key CML operations.
- User-defined tags:
CML.traceaccepts an optionaltag:argument for grouping/filtering. - Flexible output: Trace output can be redirected to any IO (file, pipe, etc) via
CML::Tracer.set_output(io). - Filtering: Tracer can filter by tag, event type, or fiber using
set_filter_tags,set_filter_events, andset_filter_fibers.
Example:
CML.trace "Chan.register_send", value, pick, tag: "chan"
CML::Tracer.set_output(File.open("trace.log", "w"))
CML::Tracer.set_filter_tags(["chan", "pick"])
See src/trace_macro.cr for details.
12. Debugging Guide
See docs/debugging_guide.md for practical tips on using tracing, tags, and filtering to debug slow, stuck, or incorrect code in CML.
cml
- 1
- 0
- 0
- 3
- 1
- 15 days ago
- October 27, 2025
MIT License
Mon, 13 Apr 2026 14:41:25 GMT