datastar.cr
Datastar.cr
A Crystal SDK for the Datastar hypermedia framework.
Datastar is a lightweight (~10KB) framework that brings reactive UI updates to server-rendered applications using Server-Sent Events (SSE) and HTML data attributes. This SDK provides a type-safe Crystal API for streaming DOM updates, managing reactive signals, and executing scripts in the browser—all from your server-side code.
Table of Contents
- Datastar.cr
Background
Datastar combines the simplicity of server-side rendering with the interactivity of modern frontend frameworks. Instead of sending JSON and rebuilding the UI in JavaScript, Datastar streams HTML fragments directly from the server using SSE.
This SDK implements the Datastar SSE protocol for Crystal, inspired by the official Ruby SDK.
Key features:
- Stream real-time UI updates via SSE
- Concurrent streaming with fiber-based concurrency
- Automatic heartbeat and connection health monitoring
- Built-in adapters for Kemal, Athena, and Blueprint frameworks
- Pub/sub system for multi-session synchronization
- Flexible rendering with the
Renderableprotocol
Examples
The examples/ directory contains working TodoMVC implementations:
- kemal-todomvc - TodoMVC with Kemal, demonstrating pub/sub synchronization across browser sessions
- athena-todomvc - TodoMVC with Athena framework and Blueprint components
Each example demonstrates the full feature set including SSE streaming, pub/sub for multi-session sync, and reactive UI updates.
Install
Add the dependency to your shard.yml:
dependencies:
datastar:
github: watzon/datastar.cr
Then run:
shards install
Usage
Quick Start
require "datastar"
def handle_events(request, response)
sse = Datastar::ServerSentEventGenerator.new(request, response)
sse.stream do |stream|
stream.patch_elements(%(<div id="greeting">Hello, Datastar!</div>))
end
end
Streaming Mode
Use stream for long-lived connections with multiple updates:
sse.stream do |stream|
10.times do |i|
sleep 1.second
stream.patch_elements(%(<div id="count">#{i}</div>))
end
end
The stream block sets SSE headers, manages concurrency, and handles cleanup automatically.
One-Off Mode
For single updates without persistent connections:
sse.patch_elements(%(<div id="notification">Task completed!</div>))
sse.finish
API
DOM Manipulation
#patch_elements
Patch HTML fragments into the DOM:
# Basic usage
sse.patch_elements(%(<div id="message">Hello!</div>))
# Target a specific element
sse.patch_elements(%(<p>Updated</p>), selector: "#target")
# Append to a list
sse.patch_elements(%(<li>New item</li>), selector: "#list", mode: Datastar::FragmentMergeMode::Append)
# Multiple fragments
sse.patch_elements([%(<div id="a">A</div>), %(<div id="b">B</div>)])
Merge modes: Outer (default), Inner, Replace, Prepend, Append, Before, After, Remove
#remove_elements
sse.remove_elements("#notification")
Signal Management
#patch_signals
Update reactive signals:
sse.patch_signals(count: 42, user: {name: "Alice"})
sse.patch_signals({enabled: true}, only_if_missing: true)
#remove_signals
sse.remove_signals(["user.name", "user.email"])
Reading Signals
# As JSON::Any
signals = sse.signals
count = signals["count"].as_i
# As typed struct
user = sse.signals(UserSignals)
Script Execution
#execute_script
sse.execute_script(%(console.log("Hello!")))
sse.execute_script("initWidget()", auto_remove: false)
sse.execute_script(%(import('./mod.js')), attributes: {"type" => "module"})
#redirect
sse.redirect("/dashboard")
Connection Management
# Lifecycle callbacks
sse.on_connect { puts "Connected" }
sse.on_client_disconnect { puts "Client left" }
sse.on_server_disconnect { puts "Done streaming" }
sse.on_error { |ex| Log.error { ex.message } }
# Manual connection check
sse.check_connection! # Raises IO::Error if closed
# Check connection state
sse.closed?
Framework Integration
Kemal
require "kemal"
require "datastar/adapters/kemal"
# Streaming endpoint
get "/events" do |env|
env.datastar_stream do |sse|
10.times do |i|
sleep 1.second
sse.patch_elements(%(<div id="count">#{i}</div>))
end
end
end
# HTML response
get "/" do |env|
env.datastar_render("<h1>Hello, Datastar!</h1>")
end
# Check if request is from Datastar
get "/page" do |env|
if env.datastar_request?
env.datastar_render("<div>Fragment</div>")
else
env.datastar_render("<html><body>Full page</body></html>")
end
end
# Broadcast to all subscribed clients
post "/update" do |env|
env.datastar_broadcast("my-topic") do |sse|
sse.patch_elements("<div id='content'>Updated!</div>")
end
end
Kemal.run
Athena
require "athena"
require "datastar"
require "datastar/adapters/athena"
class EventsController < ATH::Controller
include Datastar::Athena::LiveController
@[ARTA::Get("/events")]
def stream_events(request : ATH::Request) : ATH::StreamedResponse
datastar_stream(request) do |stream|
10.times do |i|
sleep 1.second
stream.patch_elements(%(<div id="count">#{i}</div>))
end
end
end
end
The LiveController mixin also provides datastar_render for HTML responses and datastar_broadcast for pub/sub:
@[ARTA::Get("/")]
def index : ATH::Response
datastar_render("<h1>Hello</h1>")
end
@[ARTA::Post("/update")]
def update : ATH::Response
datastar_broadcast("my-topic") do |sse|
sse.patch_elements("<div id='content'>Updated!</div>")
end
ATH::Response.new(status: :ok)
end
Blueprint
Use Blueprint components with Datastar:
require "datastar"
require "datastar/adapters/blueprint"
class GreetingCard
include Blueprint::HTML
def initialize(@name : String); end
def blueprint
div id: "greeting" do
h1 { "Hello, #{@name}!" }
end
end
end
sse.patch_elements(GreetingCard.new("World"))
Request Detection
Use Datastar::RequestDetection to tell whether a request came from Datastar:
request = HTTP::Request.new("GET", "/?datastar=%7B%7D")
Datastar.datastar_request?(request) # => true
The Athena adapter exposes the same helper:
if datastar_request?(request)
datastar_render("<div>Datastar response</div>")
else
datastar_render("<html>Full page</html>")
end
Custom Components
Implement Datastar::Renderable:
class MyComponent
include Datastar::Renderable
def initialize(@title : String); end
def to_datastar_html : String
%(<h1>#{@title}</h1>)
end
end
sse.patch_elements(MyComponent.new("Hello"))
Pub/Sub for Multi-Session Sync
Enable real-time synchronization across multiple browser sessions. When one client makes a change, all clients subscribed to the same topic receive updates automatically.
Setup
require "datastar/pubsub"
# Configure at app startup
Datastar::PubSub.configure
# With lifecycle callbacks
Datastar::PubSub.configure do |config|
config.on_subscribe do |topic, conn_id|
Log.info { "Client #{conn_id} joined #{topic}" }
end
config.on_unsubscribe do |topic, conn_id|
Log.info { "Client #{conn_id} left #{topic}" }
end
end
Subscribe to Topics
get "/subscribe/:list_id" do |env|
list_id = env.params.url["list_id"]
env.datastar_stream do |sse|
# Subscribe to receive broadcasts for this list
sse.subscribe("todos:#{list_id}")
# Send initial state (fragment includes its own ID)
sse.patch_elements(render_todos(list_id))
# Connection stays open, broadcasts arrive automatically
end
end
Broadcast Updates
post "/todos/:list_id" do |env|
list_id = env.params.url["list_id"]
todo = create_todo(env.params.json)
# All subscribed clients receive this update
Datastar::PubSub.broadcast("todos:#{list_id}") do |sse|
sse.patch_elements(render_todos(list_id))
end
env.response.status_code = 201
end
Custom Backend
For multi-server deployments, implement a custom backend:
class RedisBackend < Datastar::PubSub::Backend
def initialize(@redis : Redis::PooledClient)
end
def publish(topic : String, payload : String) : Nil
@redis.publish("datastar:#{topic}", payload)
end
def subscribe(topic : String, &block : String ->) : String
id = UUID.random.to_s
spawn do
@redis.subscribe("datastar:#{topic}") do |on|
on.message { |_, msg| block.call(msg) }
end
end
id
end
def unsubscribe(subscription_id : String) : Nil
# Cancel the subscription fiber
end
end
Datastar::PubSub.configure(backend: RedisBackend.new(redis))
Configuration
Global
Datastar.configure do |config|
config.heartbeat = 5.seconds
config.on_error = ->(ex : Exception) { Log.error { ex.message } }
end
Per-Instance
sse = Datastar::ServerSentEventGenerator.new(request, response, heartbeat: 10.seconds)
sse = Datastar::ServerSentEventGenerator.new(request, response, heartbeat: false)
Maintainers
Contributing
PRs accepted.
- Fork it (https://github.com/watzon/datastar.cr/fork)
- Create your feature branch (
git checkout -b my-new-feature) - Write tests for your changes
- Ensure all tests pass (
crystal spec) - Format your code (
crystal tool format) - Commit your changes (
git commit -am 'Add some feature') - Push to the branch (
git push origin my-new-feature) - Create a new Pull Request
See the Datastar documentation for more information about the protocol.
License
MIT © Chris Watson
datastar.cr
- 4
- 0
- 0
- 0
- 2
- about 8 hours ago
- December 24, 2025
MIT License
Wed, 24 Dec 2025 22:02:13 GMT