crystal-concurrency-demo

Crystal 并发编程示例

这个 demo 展示了 Crystal 的并发编程特性,包括 Fiber、Channel 和并行处理。

概念

Crystal 使用 Fiber(纤程)实现轻量级并发,配合 Channel 实现消息传递。

main.cr

require "channel"

# ===== 1. 使用 Fiber =====
puts "===== 1. 使用 Fiber ====="

# 创建 Fiber
fiber1 = Fiber.new do
  5.times do |i|
    puts "Fiber 1: #{i}"
    Fiber.yield  # 让出执行权
  end
end

fiber2 = Fiber.new do
  5.times do |i|
    puts "Fiber 2: #{i}"
    Fiber.yield
  end
end

# 调度执行
while fiber1.alive? || fiber2.alive?
  fiber1.resume
  fiber2.resume
end

# ===== 2. 使用 Channel 进行通信 =====
puts "\n===== 2. 使用 Channel 进行通信 ====="

# 创建 Channel
channel = Channel(Int32).new

# 生产者 Fiber
producer = Fiber.new do
  5.times do |i|
    channel.send(i * 10)  # 发送数据
    puts "发送: #{i * 10}"
  end
  channel.close  # 关闭 Channel
end

# 消费者 Fiber
consumer = Fiber.new do
  while value = channel.receive?  # 非阻塞接收
    puts "接收: #{value}"
  end
end

# 执行
producer.resume
consumer.resume

# ===== 3. 多个 Channel =====
puts "\n===== 3. 多个 Channel ====="

channel1 = Channel(String).new
channel2 = Channel(String).new

# 两个生产者
Fiber.new do
  ["Apple", "Banana", "Cherry"].each do |fruit|
    channel1.send(fruit)
  end
end.resume

Fiber.new do
  ["Red", "Green", "Blue"].each do |color|
    channel2.send(color)
  end
end.resume

# 消费者 - 公平地从两个 Channel 接收
3.times do
  Fiber.yield  # 让生产者先运行
end

# 接收所有消息
while !channel1.closed? || !channel2.closed?
  select
  when value = channel1.receive?
    puts "Channel1: #{value}"
  when value = channel2.receive?
    puts "Channel2: #{value}"
  else
    break
  end
end

# ===== 4. 并行执行任务 =====
puts "\n===== 4. 并行执行任务 ====="

# 使用 spawn 创建并行任务
results = Channel(Int32).new

3.times do |i|
  spawn do
    sleep rand(0.1..0.5)  # 模拟耗时操作
    results.send(i * i)
  end
end

# 收集结果
sum = 0
3.times do
  sum += results.receive
end

puts "平方和: 0^2 + 1^2 + 2^2 = #{sum}"

# ===== 5. 并发累加器 =====
puts "\n===== 5. 并发累加器 ====="

counter = 0
mutex = Mutex.new
counter_channel = Channel(Int32).new

# 多个线程同时增加计数器
10.times do
  spawn do
    1000.times do
      mutex.lock
      counter += 1
      mutex.unlock
    end
  end
end

# 等待所有 Fiber 完成
sleep 1

puts "计数器最终值: #{counter} (预期: 10000)"
Repository

crystal-concurrency-demo

Owner
Statistic
  • 0
  • 0
  • 0
  • 0
  • 0
  • about 1 month ago
  • March 16, 2026
License

Links
Synced at

Mon, 16 Mar 2026 00:33:07 GMT

Languages