crafka v0.7.0
crafka
Detailed documentation: https://crystaldoc.info/github/BT-OpenSource/crafka/main/index.html
Installation
Add this to your application's shard.yml:
dependencies:
  crafka:
    github: bt-opensource/crafka
Usage
require "crafka"
Producing
producer = Kafka::Producer.new({"bootstrap.servers" => "localhost:9092", "broker.address.family" => "v4"})
producer.produce(topic: "topic_name", payload: "my message".to_slice)
# Optionally
producer.poll # Serves queued callbacks
producer.flush # Wait for outstanding produce requests to complete
All available args to #produce: topic, payload, key, timestamp.
Auto Polling
librdkafka recommends that rd_kafka_poll is called at regular intervals to serve queued callbacks. This functionality is built in to Crafka.
By default after each #produce, a Kafka::Producer will call poll if it hasn't polled in the last 5 seconds.
You can configure this with the poll_interval argument:
producer = Kafka::Producer.new(
  {"bootstrap.servers" => "localhost:9092", "broker.address.family" => "v4"},
  poll_interval: 30
)
To disable auto polling, set poll_interval to 0.
Debug Statistics
To enable capturing of the statistics described here you can pass a stats_path argument to Kafka::Producer.new containing the location of a file to be written to.
Also ensure that you set the statistics.interval.ms in your producer config.
producer = Kafka::Producer.new(
  {"bootstrap.servers" => "localhost:9092", "broker.address.family" => "v4", "statistics.interval.ms" => "5000"},
  stats_path: "/some/directory/librdkafka_stats.json"
)
Consuming
consumer = Kafka::Consumer.new({"bootstrap.servers" => "localhost:9092", "broker.address.family" => "v4", "group.id" => "consumer_group_name"})
consumer.subscribe("topic_name")
consumer.each do |message|
  # message is an instance of Kafka::Message
  puts "#{String.new(message.topic)} -> #{String.new(message.payload)}"
end
consumer.close
Subscribing to multiple topics
consumer.subscribe("topic_name", "another_topic", "more_and_more")
consumer.subscribe("^starts_with") # subscribe to multiple with a regex
Development
Running Tests
make setup
crystal spec
Releasing
- Update shard.yml and 
src/crafka.crwith new version number - Update CHANGELOG.md with changes
 - Commit and tag commit
 
Credits
Originally forked from: https://github.com/CloudKarafka/kafka.cr
crafka
- 18
 - 3
 - 1
 - 0
 - 0
 - 4 months ago
 - December 12, 2023
 
MIT License
Mon, 03 Nov 2025 22:44:24 GMT