cml

concurrent ml in Crystal

Crystal Concurrent ML (CML)

A minimal, composable, and correct Concurrent ML runtime for Crystal - built from first principles using events, channels, and fibers.

Concurrent ML (CML) is a message-passing concurrency model introduced by John Reppy. It extends synchronous channels with first-class events that can be composed, chosen, or canceled safely.

Crystal CI License

Compatibility Note: CML works without compile flags in default fiber mode. Pass -Dpreview_mt -Dexecution_context only when you want multithreaded execution contexts and thread-safe Sync primitives.


1. Overview

This library provides a small but complete CML implementation in pure Crystal. It adds a higher-level event layer on top of Crystal's built-in channels and fibers.

Core features:

  • Event(T) abstraction for synchronization
  • Atomic commit cell (Pick) ensuring only one event in a choice succeeds
  • Chan(T) supporting synchronous rendezvous communication
  • Event combinators: choose, wrap, guard, nack, timeout
  • Fully deterministic, non-blocking registration semantics
  • Fiber-safe cancellation and cleanup

Design principles:

  • One pick, one commit: Exactly one event in a choice succeeds
  • Zero blocking in registration: try_register never blocks
  • Deterministic behavior: Predictable regardless of scheduling
  • Memory safe: No recursion in structs, proper cleanup

2. Recent Stability Updates

Recent runtime fixes focused on kill-safe parity, timeout reliability, and lock contention reduction:

  • TransactionId cleanup/cancel paths are race-safe and cleanup callbacks run at most once.
  • cancel_transactions_for removes transactions under lock and performs cancel outside lock.
  • TimeoutEvent uses per-transaction timer ids and TimerWheel scheduling, so cancellation cannot leak across concurrent waiters.
  • TimerWheel runs callbacks outside its internal mutex to avoid callback-under-lock deadlocks and reduce contention.
  • CVar#set! resumes only active waiters, preventing stale committed/cancelled transactions from being resumed.
  • Blocking specs were hardened with explicit event timeouts to fail fast instead of hanging indefinitely.
  • Distributed tuple transport now uses CML socket events (CML::Socket.send_evt/ recv_evt) instead of direct TCPSocket reads/writes.
  • A repository policy spec (spec/io_safety_policy_spec.cr) enforces that distributed tuple transport does not regress to direct socket IO calls.

2.1 Parallel IO Safety Policy

Crystal IO objects are not generally safe for concurrent access from multiple threads. In this codebase:

  • Concurrent runtime IO paths must use CML event wrappers (CML::Socket.*, CML::PrimitiveIO.*) instead of direct IO/TCPSocket calls.
  • Transport layers should treat each socket as owned by transport fibers and communicate via channels/mailboxes.
  • If you add/modify distributed tuple transport, keep spec/io_safety_policy_spec.cr passing.

3. Installation

Add this to your shard.yml:

dependencies:
  cml:
    github: your-username/cml.cr

Then run:

shards install

4. Quickstart

After/Timeout Helper

CML.after(1.second) { puts "Timeout reached!" }

Spawning a Worker and Waiting for Result

result_evt = CML.spawn_evt { compute_something() }
CML.sync(result_evt)

Pipeline with Channels

ch1 = CML::Chan(Int32).new
ch2 = CML::Chan(String).new

CML.after(0.seconds) { ch1.send(42) }
CML.after(0.seconds) { ch2.send("done") }

CML.sync(CML.choose([ch1.recv, ch2.recv]))

See the Cookbook for more idioms and patterns.

Basic Channel Communication

require "cml"

# Create a channel for integers
ch = CML::Chan(Int32).new

# Spawn a sender
spawn { CML.sync(ch.send_evt(99)) }

# Receiver waits synchronously
val = CML.sync(ch.recv_evt)
puts val  # => 99

Racing Events with Timeout

# Use choose to race a receive against a timeout
evt = CML.choose([
  ch.recv_evt,
  CML.wrap(CML.timeout(1.second)) { |_t| "timeout" }
])
puts CML.sync(evt)  # => "timeout" if no message arrives

Event Composition & DSL Helpers

string_evt = CML.wrap(ch.recv_evt) { |x| "Received: #{x}" }
lazy_evt = CML.guard { expensive_computation_evt }
safe_evt = CML.nack(ch.recv_evt) { puts "Event was cancelled!" }

# After/Timeout helper
CML.after(2.seconds) { puts "done after 2s" }

# Spawn a fiber and get result as event
evt = CML.spawn_evt { 123 }
CML.sync(evt) # => 123

IO & Socket Helpers

# Non-blocking reads/writes as events
line = CML.sync(CML.read_line_evt(STDIN))
CML.sync(CML.write_evt(STDOUT, "ok\n".to_slice))

# Channel-backed streams (in-process piping)
ch = CML.channel(String)
reader = CML.open_chan_in(ch)
writer = CML.open_chan_out(ch)
CML.sync(CML.write_line_evt(writer, "hello"))
puts CML.sync(CML.read_line_evt(reader)) # => "hello\n"

# TCP/UDP helpers with cancellation support
sock_evt = CML::Socket.connect_evt("example.com", 80)
resp = CML.sync(CML.choose([
  CML.wrap(sock_evt) { |sock| CML.sync(CML::Socket.send_evt(sock, "ping".to_slice)) },
  CML.wrap(CML.timeout(100.milliseconds)) { :timeout },
]))

5. Core API

Events

  • CML.sync(evt) - Synchronize on an event
  • CML.always(value) - Event that always succeeds
  • CML.never - Event that never succeeds
  • CML.timeout(duration) - Time-based event
  • CML.after(span) { ... } - Run a block after a delay (helper)
  • CML.spawn_evt { ... } - Run a block in a fiber, return result as event (helper)

Combinators

  • CML.choose(events) - Race multiple events
  • CML.wrap(evt, &block) - Transform event result
  • CML.guard(&block) - Lazy event construction
  • CML.nack(evt, &block) - Cancellation cleanup

Channels

  • CML::Chan(T).new - Create a synchronous channel
  • chan.send_evt(value) - Send event
  • chan.recv_evt - Receive event

Optional helpers

  • CML.after(span) { ... }, CML.spawn_evt { ... }, CML.sleep(span)
  • IO helpers: read_evt, read_line_evt, read_all_evt, write_evt, flush_evt
  • Socket helpers: TCP Socket.accept_evt/Socket.connect_evt/Socket.recv_evt/Socket.send_evt, UDP Socket::UDP.send_evt/Socket::UDP.recv_evt
  • Channel-backed IO: open_chan_in, open_chan_out
  • Linda tuple-space implementation in src/cml/tuple.cr (CML::TupleLib) with distributed join_tuple_space(local_port:, remote_hosts:) support using default port 7001; remote_hosts accepts host, host:port, and bracketed IPv6 forms such as [::1]:7001

6. Advanced Usage

Nested Choices

inner = CML.choose([evt1, evt2])
outer = CML.choose([inner, evt3])
result = CML.sync(outer)

Multiple Concurrent Channels

ch1 = CML::Chan(Int32).new
ch2 = CML::Chan(String).new

evt = CML.choose([
  CML.wrap(ch1.recv_evt) { |x| "Number: #{x}" },
  CML.wrap(ch2.recv_evt) { |s| "String: #{s}" }
])

Re-entrant Guards

evt = CML.guard do
  if some_condition
    CML.always(:ready)
  else
    CML.timeout(1.second)
  end
end

7. Documentation


8. Running Tests

CRYSTAL_CACHE_DIR=$PWD/.crystal-cache crystal spec
# Optional: enable multithreaded execution-context specs
CRYSTAL_CACHE_DIR=$PWD/.crystal-cache crystal spec -Dpreview_mt -Dexecution_context

9. Contributing

See CONTRIBUTING.md for development guidelines and AGENTS.md for AI agent contribution rules.


10. License

MIT License - see LICENSE file for details.


11. Tracing & Instrumentation

CML includes a macro-based tracing system for debugging and performance analysis:

  • Zero-overhead when disabled: Tracing is compiled out unless -Dtrace is passed.
  • Event IDs: Every event and pick has a unique ID for correlation.
  • Fiber context: Trace output includes the current fiber (or user-assigned fiber name).
  • Outcome tracing: Commit/cancel outcomes are logged for key CML operations.
  • User-defined tags: CML.trace accepts an optional tag: argument for grouping/filtering.
  • Flexible output: Trace output can be redirected to any IO (file, pipe, etc) via CML::Tracer.set_output(io).
  • Filtering: Tracer can filter by tag, event type, or fiber using set_filter_tags, set_filter_events, and set_filter_fibers.

Example:

CML.trace "Chan.register_send", value, pick, tag: "chan"
CML::Tracer.set_output(File.open("trace.log", "w"))
CML::Tracer.set_filter_tags(["chan", "pick"])

See src/trace_macro.cr for details.


12. Debugging Guide

See docs/debugging_guide.md for practical tips on using tracing, tags, and filtering to debug slow, stuck, or incorrect code in CML.

Repository

cml

Owner
Statistic
  • 1
  • 0
  • 0
  • 3
  • 1
  • 15 days ago
  • October 27, 2025
License

MIT License

Links
Synced at

Mon, 13 Apr 2026 14:41:25 GMT

Languages