grpc.cr

grpc.cr

Test

A native Crystal gRPC library built on libnghttp2. This library was generated using an AI agent.

Installation

dependencies:
  grpc:
    github: kojix2/grpc

Install libnghttp2:

brew install nghttp2          # macOS
apt install libnghttp2-dev    # Debian/Ubuntu
pacman -S nghttp2             # Arch

Testing

Run the default spec suite (auto-discovers spec/**/*_spec.cr):

crystal spec

Run grpcurl E2E tests explicitly via the non-auto entry file:

crystal spec spec/grpcurl.cr

Run GripMock client E2E tests (requires GripMock at 127.0.0.1:4770 and admin API at 127.0.0.1:4771):

crystal spec spec/gripmock.cr

Code generation

crystal build src/protoc-gen-crystal-grpc_main.cr -o bin/protoc-gen-crystal-grpc

protoc --plugin=protoc-gen-crystal-grpc=bin/protoc-gen-crystal-grpc \
       --crystal-grpc_out=. helloworld.proto

Server

require "grpc"

class GreeterImpl < Helloworld::GreeterService
  def say_hello(req : Helloworld::HelloRequest, ctx : GRPC::ServerContext) : Helloworld::HelloReply
    Helloworld::HelloReply.new(message: "Hello, #{req.name}!")
  end
end

server = GRPC::Server.new
server.handle GreeterImpl.new
server.listen "0.0.0.0", 50051

Client

require "grpc"

channel = GRPC::Channel.new("localhost:50051")
client  = Helloworld::GreeterClient.new(channel)
reply   = client.say_hello(Helloworld::HelloRequest.new(name: "Alice"))
puts reply.message
channel.close

Metadata and deadlines

ctx = GRPC::ClientContext.new(
  metadata: {"authorization" => "Bearer token"},
  deadline: 5.seconds,
)
reply = client.say_hello(req, ctx: ctx)

Error handling

begin
  reply = client.say_hello(req)
rescue ex : GRPC::StatusError
  puts ex.code    # e.g. GRPC::StatusCode::NOT_FOUND
  puts ex.message
end

Server side:

def say_hello(req, ctx)
  raise GRPC::StatusError.new(GRPC::StatusCode::NOT_FOUND, "user not found")
end

Streaming

Generated stubs expose typed stream objects for all four RPC variants. Client-streaming and bidirectional-streaming request messages are decoded and delivered to the server handler incrementally as HTTP/2 DATA frames arrive, without buffering the entire request body first.

# Server streaming — iterate typed responses
stream = client.range(Numbers::Number.new(5))
stream.each { |num| puts num.value }
puts stream.status   # GRPC::Status after iteration

# Client streaming — send then receive
call = client.sum
(1..5).each { |i| call.send(Numbers::Number.new(i)) }
total = call.close_and_recv   # flushes and returns the server response

# Bidirectional streaming
bidi = client.transform
[2, 3, 4].each { |i| bidi.send(Numbers::Number.new(i)) }
bidi.close_send
bidi.each { |num| puts num.value }

You can also cancel a stream early:

stream = client.range(req)
stream.cancel   # sends RST_STREAM to the server

TLS

# Server
server = GRPC::Server.new
server.use_tls(cert: "server.crt", key: "server.key")

# Client — system CA
channel = GRPC::Channel.new("https://host:50443")

# Client — custom context
ctx = OpenSSL::SSL::Context::Client.new
ctx.ca_certificates = "ca.crt"
channel = GRPC::Channel.new("https://host:50443", tls_context: ctx)

Interceptors

class LoggingInterceptor < GRPC::ServerInterceptor
  def call(request : GRPC::RequestEnvelope,
           ctx : GRPC::ServerContext,
           next_call : GRPC::UnaryServerCall) : GRPC::ResponseEnvelope
    STDERR.puts "-> #{request.info.method_path}"
    next_call.call(request.info.method_path, request, ctx)
  end
end

server.intercept LoggingInterceptor.new
class AuthInterceptor < GRPC::ClientInterceptor
  def call(request : GRPC::RequestEnvelope,
           ctx : GRPC::ClientContext,
           next_call : GRPC::UnaryClientCall) : GRPC::ResponseEnvelope
    ctx.metadata.set("authorization", "Bearer #{@token}")
    next_call.call(request.info.method_path, request, ctx)
  end
end

channel = GRPC::Channel.new("localhost:50051",
  interceptors: [AuthInterceptor.new(token)] of GRPC::ClientInterceptor)

RequestEnvelope includes #raw, #info, and #decode(T) so interceptors can decode payloads naturally when needed.

Streaming hooks can also be overridden and default to pass-through:

  • call_server_stream
  • call_live_client_stream
  • call_live_bidi_stream

Transport factory injection

Channel and Server accept optional factory procs so transport creation can be swapped without changing call sites.

Note: the client factory receives an EndpointConfig as its fifth argument.

client_factory = ->(host : String, port : Int32, use_tls : Bool,
                    tls_ctx : OpenSSL::SSL::Context::Client?,
                    config : GRPC::EndpointConfig) {
  GRPC::Transport::Http2ClientConnection.new(host, port, use_tls, tls_ctx, config)
    .as(GRPC::Transport::ClientTransport)
}

server_factory = ->(io : IO,
                    services : Hash(String, GRPC::Service),
                    interceptors : Array(GRPC::ServerInterceptor),
                    peer : String,
                    tls_sock : OpenSSL::SSL::Socket::Server?) {
  GRPC::Transport::Http2ServerConnection.new(io, services, interceptors, peer, tls_sock)
    .as(GRPC::Transport::ServerTransport)
}

channel = GRPC::Channel.new("localhost:50051", transport_factory: client_factory)
server = GRPC::Server.new(transport_factory: server_factory)

Channel configuration

EndpointConfig groups all transport and lifecycle settings for a channel. Pass it as endpoint_config: when creating a Channel.

Connect timeout

Fail fast when the server is unreachable:

config = GRPC::EndpointConfig.new(connect_timeout: 5.seconds)
channel = GRPC::Channel.new("localhost:50051", endpoint_config: config)

TCP keepalive

Keep long-lived connections alive at the OS level:

config = GRPC::EndpointConfig.new(
  tcp_keepalive: 30.seconds,           # enable OS keepalive (idle hint)
  keepalive: GRPC::KeepaliveParams.new(
    interval: 30.seconds,              # application-level hint
    timeout: 10.seconds,
    permit_without_calls: false,
  ),
)
channel = GRPC::Channel.new("localhost:50051", endpoint_config: config)

Concurrency limit

Cap the number of in-flight RPCs. Excess callers block until a slot is free. The slot is released automatically when the RPC finishes (including streaming calls):

config = GRPC::EndpointConfig.new(concurrency_limit: 10)
channel = GRPC::Channel.new("localhost:50051", endpoint_config: config)

Rate limit

Throttle request starts to at most n requests per period. Callers that exceed the limit sleep until the next available slot:

config = GRPC::EndpointConfig.new(
  rate_limit: GRPC::RateLimitConfig.new(100_u64, 1.second),  # 100 req/s
)
channel = GRPC::Channel.new("localhost:50051", endpoint_config: config)

Combining options

config = GRPC::EndpointConfig.new(
  connect_timeout:   5.seconds,
  tcp_keepalive:     30.seconds,
  concurrency_limit: 20,
  rate_limit:        GRPC::RateLimitConfig.new(200_u64, 1.second),
)
channel = GRPC::Channel.new("https://api.example.com:443", endpoint_config: config)

Endpoint struct

Address strings are parsed into GRPC::Endpoint automatically, but you can also build one explicitly:

endpoint = GRPC::Endpoint.new("api.example.com", 50051, tls: false)
channel  = GRPC::Channel.new(endpoint)

Message compression

framed  = GRPC::Codec.encode(bytes, compress: true)
decoded, _ = GRPC::Codec.decode(framed)   # decompresses automatically

Examples

Helloworld — unary RPC

crystal run examples/helloworld/server.cr
crystal run examples/helloworld/client.cr -- localhost 50051 Alice
# Hello, Alice!

Streaming — all four RPC variants

crystal run examples/streaming/server.cr
crystal run examples/streaming/client.cr
# Square(7)    = 49
# Range(5)     = 1 2 3 4 5
# [server] sum recv: 1
# [server] sum recv: 2
# [server] sum recv: 3
# [server] sum recv: 4
# [server] sum recv: 5
# Sum(1..5)    = 15
# Transform([2,3,4]²) = 4 9 16

Interceptors — logging + auth

crystal run examples/interceptors/server.cr
crystal run examples/interceptors/client.cr -- localhost 50053 Alice secret
# Hello, Alice!

TLS

Generate a self-signed certificate for local testing:

openssl req -x509 -newkey rsa:2048 -keyout server.key -out server.crt \
        -days 365 -nodes -subj "/CN=localhost"
crystal run examples/tls/server.cr -- server.crt server.key
crystal run examples/tls/client.cr -- localhost 50443 Alice
# Hello over TLS, Alice!

License

MIT

Repository

grpc.cr

Owner
Statistic
  • 3
  • 0
  • 0
  • 0
  • 0
  • 42 minutes ago
  • April 15, 2026
License

MIT License

Links
Synced at

Thu, 16 Apr 2026 22:53:01 GMT

Languages