grpc.cr
grpc.cr
A native Crystal gRPC library built on libnghttp2. This library was generated using an AI agent.
Installation
dependencies:
grpc:
github: kojix2/grpc
Install libnghttp2:
brew install nghttp2 # macOS
apt install libnghttp2-dev # Debian/Ubuntu
pacman -S nghttp2 # Arch
Testing
Run the default spec suite (auto-discovers spec/**/*_spec.cr):
crystal spec
Run grpcurl E2E tests explicitly via the non-auto entry file:
crystal spec spec/grpcurl.cr
Run GripMock client E2E tests (requires GripMock at 127.0.0.1:4770 and admin API at 127.0.0.1:4771):
crystal spec spec/gripmock.cr
Code generation
crystal build src/protoc-gen-crystal-grpc_main.cr -o bin/protoc-gen-crystal-grpc
protoc --plugin=protoc-gen-crystal-grpc=bin/protoc-gen-crystal-grpc \
--crystal-grpc_out=. helloworld.proto
Server
require "grpc"
class GreeterImpl < Helloworld::GreeterService
def say_hello(req : Helloworld::HelloRequest, ctx : GRPC::ServerContext) : Helloworld::HelloReply
Helloworld::HelloReply.new(message: "Hello, #{req.name}!")
end
end
server = GRPC::Server.new
server.handle GreeterImpl.new
server.listen "0.0.0.0", 50051
Client
require "grpc"
channel = GRPC::Channel.new("localhost:50051")
client = Helloworld::GreeterClient.new(channel)
reply = client.say_hello(Helloworld::HelloRequest.new(name: "Alice"))
puts reply.message
channel.close
Metadata and deadlines
ctx = GRPC::ClientContext.new(
metadata: {"authorization" => "Bearer token"},
deadline: 5.seconds,
)
reply = client.say_hello(req, ctx: ctx)
Error handling
begin
reply = client.say_hello(req)
rescue ex : GRPC::StatusError
puts ex.code # e.g. GRPC::StatusCode::NOT_FOUND
puts ex.message
end
Server side:
def say_hello(req, ctx)
raise GRPC::StatusError.new(GRPC::StatusCode::NOT_FOUND, "user not found")
end
Streaming
Generated stubs expose typed stream objects for all four RPC variants. Client-streaming and bidirectional-streaming request messages are decoded and delivered to the server handler incrementally as HTTP/2 DATA frames arrive, without buffering the entire request body first.
# Server streaming — iterate typed responses
stream = client.range(Numbers::Number.new(5))
stream.each { |num| puts num.value }
puts stream.status # GRPC::Status after iteration
# Client streaming — send then receive
call = client.sum
(1..5).each { |i| call.send(Numbers::Number.new(i)) }
total = call.close_and_recv # flushes and returns the server response
# Bidirectional streaming
bidi = client.transform
[2, 3, 4].each { |i| bidi.send(Numbers::Number.new(i)) }
bidi.close_send
bidi.each { |num| puts num.value }
You can also cancel a stream early:
stream = client.range(req)
stream.cancel # sends RST_STREAM to the server
TLS
# Server
server = GRPC::Server.new
server.use_tls(cert: "server.crt", key: "server.key")
# Client — system CA
channel = GRPC::Channel.new("https://host:50443")
# Client — custom context
ctx = OpenSSL::SSL::Context::Client.new
ctx.ca_certificates = "ca.crt"
channel = GRPC::Channel.new("https://host:50443", tls_context: ctx)
Interceptors
class LoggingInterceptor < GRPC::ServerInterceptor
def call(request : GRPC::RequestEnvelope,
ctx : GRPC::ServerContext,
next_call : GRPC::UnaryServerCall) : GRPC::ResponseEnvelope
STDERR.puts "-> #{request.info.method_path}"
next_call.call(request.info.method_path, request, ctx)
end
end
server.intercept LoggingInterceptor.new
class AuthInterceptor < GRPC::ClientInterceptor
def call(request : GRPC::RequestEnvelope,
ctx : GRPC::ClientContext,
next_call : GRPC::UnaryClientCall) : GRPC::ResponseEnvelope
ctx.metadata.set("authorization", "Bearer #{@token}")
next_call.call(request.info.method_path, request, ctx)
end
end
channel = GRPC::Channel.new("localhost:50051",
interceptors: [AuthInterceptor.new(token)] of GRPC::ClientInterceptor)
RequestEnvelope includes #raw, #info, and #decode(T) so interceptors can decode payloads naturally when needed.
Streaming hooks can also be overridden and default to pass-through:
call_server_streamcall_live_client_streamcall_live_bidi_stream
Transport factory injection
Channel and Server accept optional factory procs so transport creation can be swapped without changing call sites.
Note: the client factory receives an EndpointConfig as its fifth argument.
client_factory = ->(host : String, port : Int32, use_tls : Bool,
tls_ctx : OpenSSL::SSL::Context::Client?,
config : GRPC::EndpointConfig) {
GRPC::Transport::Http2ClientConnection.new(host, port, use_tls, tls_ctx, config)
.as(GRPC::Transport::ClientTransport)
}
server_factory = ->(io : IO,
services : Hash(String, GRPC::Service),
interceptors : Array(GRPC::ServerInterceptor),
peer : String,
tls_sock : OpenSSL::SSL::Socket::Server?) {
GRPC::Transport::Http2ServerConnection.new(io, services, interceptors, peer, tls_sock)
.as(GRPC::Transport::ServerTransport)
}
channel = GRPC::Channel.new("localhost:50051", transport_factory: client_factory)
server = GRPC::Server.new(transport_factory: server_factory)
Channel configuration
EndpointConfig groups all transport and lifecycle settings for a channel. Pass it as endpoint_config: when creating a Channel.
Connect timeout
Fail fast when the server is unreachable:
config = GRPC::EndpointConfig.new(connect_timeout: 5.seconds)
channel = GRPC::Channel.new("localhost:50051", endpoint_config: config)
TCP keepalive
Keep long-lived connections alive at the OS level:
config = GRPC::EndpointConfig.new(
tcp_keepalive: 30.seconds, # enable OS keepalive (idle hint)
keepalive: GRPC::KeepaliveParams.new(
interval: 30.seconds, # application-level hint
timeout: 10.seconds,
permit_without_calls: false,
),
)
channel = GRPC::Channel.new("localhost:50051", endpoint_config: config)
Concurrency limit
Cap the number of in-flight RPCs. Excess callers block until a slot is free. The slot is released automatically when the RPC finishes (including streaming calls):
config = GRPC::EndpointConfig.new(concurrency_limit: 10)
channel = GRPC::Channel.new("localhost:50051", endpoint_config: config)
Rate limit
Throttle request starts to at most n requests per period. Callers that exceed the limit sleep until the next available slot:
config = GRPC::EndpointConfig.new(
rate_limit: GRPC::RateLimitConfig.new(100_u64, 1.second), # 100 req/s
)
channel = GRPC::Channel.new("localhost:50051", endpoint_config: config)
Combining options
config = GRPC::EndpointConfig.new(
connect_timeout: 5.seconds,
tcp_keepalive: 30.seconds,
concurrency_limit: 20,
rate_limit: GRPC::RateLimitConfig.new(200_u64, 1.second),
)
channel = GRPC::Channel.new("https://api.example.com:443", endpoint_config: config)
Endpoint struct
Address strings are parsed into GRPC::Endpoint automatically, but you can also build one explicitly:
endpoint = GRPC::Endpoint.new("api.example.com", 50051, tls: false)
channel = GRPC::Channel.new(endpoint)
Message compression
framed = GRPC::Codec.encode(bytes, compress: true)
decoded, _ = GRPC::Codec.decode(framed) # decompresses automatically
Examples
Helloworld — unary RPC
crystal run examples/helloworld/server.cr
crystal run examples/helloworld/client.cr -- localhost 50051 Alice
# Hello, Alice!
Streaming — all four RPC variants
crystal run examples/streaming/server.cr
crystal run examples/streaming/client.cr
# Square(7) = 49
# Range(5) = 1 2 3 4 5
# [server] sum recv: 1
# [server] sum recv: 2
# [server] sum recv: 3
# [server] sum recv: 4
# [server] sum recv: 5
# Sum(1..5) = 15
# Transform([2,3,4]²) = 4 9 16
Interceptors — logging + auth
crystal run examples/interceptors/server.cr
crystal run examples/interceptors/client.cr -- localhost 50053 Alice secret
# Hello, Alice!
TLS
Generate a self-signed certificate for local testing:
openssl req -x509 -newkey rsa:2048 -keyout server.key -out server.crt \
-days 365 -nodes -subj "/CN=localhost"
crystal run examples/tls/server.cr -- server.crt server.key
crystal run examples/tls/client.cr -- localhost 50443 Alice
# Hello over TLS, Alice!
License
MIT
grpc.cr
- 3
- 0
- 0
- 0
- 0
- 42 minutes ago
- April 15, 2026
MIT License
Thu, 16 Apr 2026 22:53:01 GMT