datastar.cr

Datastar SDK for Crystal. What else is there to know?

Datastar.cr

Standard Readme Crystal License: MIT

A Crystal SDK for the Datastar hypermedia framework.

Datastar is a lightweight (~10KB) framework that brings reactive UI updates to server-rendered applications using Server-Sent Events (SSE) and HTML data attributes. This SDK provides a type-safe Crystal API for streaming DOM updates, managing reactive signals, and executing scripts in the browser—all from your server-side code.

Table of Contents

Background

Datastar combines the simplicity of server-side rendering with the interactivity of modern frontend frameworks. Instead of sending JSON and rebuilding the UI in JavaScript, Datastar streams HTML fragments directly from the server using SSE.

This SDK implements the Datastar SSE protocol for Crystal, inspired by the official Ruby SDK.

Key features:

  • Stream real-time UI updates via SSE
  • Concurrent streaming with fiber-based concurrency
  • Automatic heartbeat and connection health monitoring
  • Built-in adapters for Kemal, Athena, and Blueprint frameworks
  • Pub/sub system for multi-session synchronization
  • Flexible rendering with the Renderable protocol

Examples

The examples/ directory contains working TodoMVC implementations:

  • kemal-todomvc - TodoMVC with Kemal, demonstrating pub/sub synchronization across browser sessions
  • athena-todomvc - TodoMVC with Athena framework and Blueprint components

Each example demonstrates the full feature set including SSE streaming, pub/sub for multi-session sync, and reactive UI updates.

Install

Add the dependency to your shard.yml:

dependencies:
  datastar:
    github: watzon/datastar.cr

Then run:

shards install

Usage

Quick Start

require "datastar"

def handle_events(request, response)
  sse = Datastar::ServerSentEventGenerator.new(request, response)

  sse.stream do |stream|
    stream.patch_elements(%(<div id="greeting">Hello, Datastar!</div>))
  end
end

Streaming Mode

Use stream for long-lived connections with multiple updates:

sse.stream do |stream|
  10.times do |i|
    sleep 1.second
    stream.patch_elements(%(<div id="count">#{i}</div>))
  end
end

The stream block sets SSE headers, manages concurrency, and handles cleanup automatically.

One-Off Mode

For single updates without persistent connections:

sse.patch_elements(%(<div id="notification">Task completed!</div>))
sse.finish

API

DOM Manipulation

#patch_elements

Patch HTML fragments into the DOM:

# Basic usage
sse.patch_elements(%(<div id="message">Hello!</div>))

# Target a specific element
sse.patch_elements(%(<p>Updated</p>), selector: "#target")

# Append to a list
sse.patch_elements(%(<li>New item</li>), selector: "#list", mode: Datastar::FragmentMergeMode::Append)

# Multiple fragments
sse.patch_elements([%(<div id="a">A</div>), %(<div id="b">B</div>)])

Merge modes: Outer (default), Inner, Replace, Prepend, Append, Before, After, Remove

#remove_elements

sse.remove_elements("#notification")

Signal Management

#patch_signals

Update reactive signals:

sse.patch_signals(count: 42, user: {name: "Alice"})
sse.patch_signals({enabled: true}, only_if_missing: true)

#remove_signals

sse.remove_signals(["user.name", "user.email"])

Reading Signals

# As JSON::Any
signals = sse.signals
count = signals["count"].as_i

# As typed struct
user = sse.signals(UserSignals)

Script Execution

#execute_script

sse.execute_script(%(console.log("Hello!")))
sse.execute_script("initWidget()", auto_remove: false)
sse.execute_script(%(import('./mod.js')), attributes: {"type" => "module"})

#redirect

sse.redirect("/dashboard")

Connection Management

# Lifecycle callbacks
sse.on_connect { puts "Connected" }
sse.on_client_disconnect { puts "Client left" }
sse.on_server_disconnect { puts "Done streaming" }
sse.on_error { |ex| Log.error { ex.message } }

# Manual connection check
sse.check_connection!  # Raises IO::Error if closed

# Check connection state
sse.closed?

Framework Integration

Kemal

require "kemal"
require "datastar/adapters/kemal"

# Streaming endpoint
get "/events" do |env|
  env.datastar_stream do |sse|
    10.times do |i|
      sleep 1.second
      sse.patch_elements(%(<div id="count">#{i}</div>))
    end
  end
end

# HTML response
get "/" do |env|
  env.datastar_render("<h1>Hello, Datastar!</h1>")
end

# Check if request is from Datastar
get "/page" do |env|
  if env.datastar_request?
    env.datastar_render("<div>Fragment</div>")
  else
    env.datastar_render("<html><body>Full page</body></html>")
  end
end

# Broadcast to all subscribed clients
post "/update" do |env|
  env.datastar_broadcast("my-topic") do |sse|
    sse.patch_elements("<div id='content'>Updated!</div>")
  end
end

Kemal.run

Athena

require "athena"
require "datastar"
require "datastar/adapters/athena"

class EventsController < ATH::Controller
  include Datastar::Athena::LiveController

  @[ARTA::Get("/events")]
  def stream_events(request : ATH::Request) : ATH::StreamedResponse
    datastar_stream(request) do |stream|
      10.times do |i|
        sleep 1.second
        stream.patch_elements(%(<div id="count">#{i}</div>))
      end
    end
  end
end

The LiveController mixin also provides datastar_render for HTML responses and datastar_broadcast for pub/sub:

@[ARTA::Get("/")]
def index : ATH::Response
  datastar_render("<h1>Hello</h1>")
end

@[ARTA::Post("/update")]
def update : ATH::Response
  datastar_broadcast("my-topic") do |sse|
    sse.patch_elements("<div id='content'>Updated!</div>")
  end
  ATH::Response.new(status: :ok)
end

Blueprint

Use Blueprint components with Datastar:

require "datastar"
require "datastar/adapters/blueprint"

class GreetingCard
  include Blueprint::HTML

  def initialize(@name : String); end

  def blueprint
    div id: "greeting" do
      h1 { "Hello, #{@name}!" }
    end
  end
end

sse.patch_elements(GreetingCard.new("World"))

Request Detection

Use Datastar::RequestDetection to tell whether a request came from Datastar:

request = HTTP::Request.new("GET", "/?datastar=%7B%7D")
Datastar.datastar_request?(request) # => true

The Athena adapter exposes the same helper:

if datastar_request?(request)
  datastar_render("<div>Datastar response</div>")
else
  datastar_render("<html>Full page</html>")
end

Custom Components

Implement Datastar::Renderable:

class MyComponent
  include Datastar::Renderable

  def initialize(@title : String); end

  def to_datastar_html : String
    %(<h1>#{@title}</h1>)
  end
end

sse.patch_elements(MyComponent.new("Hello"))

Pub/Sub for Multi-Session Sync

Enable real-time synchronization across multiple browser sessions. When one client makes a change, all clients subscribed to the same topic receive updates automatically.

Setup

require "datastar/pubsub"

# Configure at app startup
Datastar::PubSub.configure

# With lifecycle callbacks
Datastar::PubSub.configure do |config|
  config.on_subscribe do |topic, conn_id|
    Log.info { "Client #{conn_id} joined #{topic}" }
  end
  config.on_unsubscribe do |topic, conn_id|
    Log.info { "Client #{conn_id} left #{topic}" }
  end
end

Subscribe to Topics

get "/subscribe/:list_id" do |env|
  list_id = env.params.url["list_id"]

  env.datastar_stream do |sse|
    # Subscribe to receive broadcasts for this list
    sse.subscribe("todos:#{list_id}")

    # Send initial state (fragment includes its own ID)
    sse.patch_elements(render_todos(list_id))

    # Connection stays open, broadcasts arrive automatically
  end
end

Broadcast Updates

post "/todos/:list_id" do |env|
  list_id = env.params.url["list_id"]
  todo = create_todo(env.params.json)

  # All subscribed clients receive this update
  Datastar::PubSub.broadcast("todos:#{list_id}") do |sse|
    sse.patch_elements(render_todos(list_id))
  end

  env.response.status_code = 201
end

Custom Backend

For multi-server deployments, implement a custom backend:

class RedisBackend < Datastar::PubSub::Backend
  def initialize(@redis : Redis::PooledClient)
  end

  def publish(topic : String, payload : String) : Nil
    @redis.publish("datastar:#{topic}", payload)
  end

  def subscribe(topic : String, &block : String ->) : String
    id = UUID.random.to_s
    spawn do
      @redis.subscribe("datastar:#{topic}") do |on|
        on.message { |_, msg| block.call(msg) }
      end
    end
    id
  end

  def unsubscribe(subscription_id : String) : Nil
    # Cancel the subscription fiber
  end
end

Datastar::PubSub.configure(backend: RedisBackend.new(redis))

Configuration

Global

Datastar.configure do |config|
  config.heartbeat = 5.seconds
  config.on_error = ->(ex : Exception) { Log.error { ex.message } }
end

Per-Instance

sse = Datastar::ServerSentEventGenerator.new(request, response, heartbeat: 10.seconds)
sse = Datastar::ServerSentEventGenerator.new(request, response, heartbeat: false)

Maintainers

@watzon

Contributing

PRs accepted.

  1. Fork it (https://github.com/watzon/datastar.cr/fork)
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Write tests for your changes
  4. Ensure all tests pass (crystal spec)
  5. Format your code (crystal tool format)
  6. Commit your changes (git commit -am 'Add some feature')
  7. Push to the branch (git push origin my-new-feature)
  8. Create a new Pull Request

See the Datastar documentation for more information about the protocol.

License

MIT © Chris Watson

Repository

datastar.cr

Owner
Statistic
  • 4
  • 0
  • 0
  • 0
  • 2
  • about 8 hours ago
  • December 24, 2025
License

MIT License

Links
Synced at

Wed, 24 Dec 2025 22:02:13 GMT

Languages