lake
lake
Lake is a generic connection pooling shard for the crystal programming language. With Lake, you can create a generically typed Lake(T)
for whatever type of connection or object you want to work with.
Overview
When you want to use a connection/object in the pool, simply call #dip
to asynchronously get a reference to a free connection in the pool and use it within the block you provide, or call #dip_sync
with the same parameters for a synchronous version of #dip
.
lake = Lake(Redis).new
lake.dip { |redis| puts redis.get("my-key") }
lake.dip { |redis| redis.set("my-key", "cool") }
# no guarantee on run order since `#dip` is asynchronous
And using #dip_sync
...
lake = Lake(Redis).new
lake.dip_sync { |redis| redis.set("my-key", "hello") }
val = nil
lake.dip_sync { |redis| val = redis.get("my-key") }
val.should eq "hello"
When using things like redis
where certain connection operations, such as pub/sub, make the connection unusable for a period of time, you can use #leak
which will return and remove a connection from the pool and replace it with a new one safely.
lake = Lake(Redis).new
redis = lake.leak
spawn do
redis.subscribe("my-key") do |on|
...
Connections are returned by dip
and dip_sync
on a least-recently-used basis, to ensure that we are always minimizing the chance that another operation is currently in-progress on the returned connection.
Lake maintains a channel for each connection in the pool which it uses to buffer incoming dip
and dip_sync
requests.
You can also overload the default constructor which calls .new
on whatever object you have decided to use as a pool entry type, as well as specify the size of the lake.
lake = Lake(Redis).new(50)
The second optional parameter allows you to override the default "factory" (T.new
) for newly created pool objects by passing a ->{ }
block returning a T
.
lake = Lake(MyClass).new(25, ->{ MyClass.new("some_arg") })
Pool objects are initialized at pool creation time using this block, and a new object is also initialized each time you call lake.leak
.
If you do not specify a pool size, the default is 24
(Lake::DEFAULT_CAPACITY
).
How it Works
A thread-safe queue (channel) is maintained for each object in the pool. When you pass a block that takes a pool object to dip_sync
or dip
, the block is sent over the appropriate channel and processed when the pool object is done with any other pending jobs that were already queued for it specifically. Pool objects are accessed via a least-recently-used pattern to minimize the chances that you are given a pool object that is still busy.
An event loop is also created for each pool object. The event loop will run until the object is taken out of the pool via leak
or until clear
is called.
lake
- 1
- 0
- 0
- 0
- 1
- over 4 years ago
- June 20, 2020
MIT License
Sun, 24 Nov 2024 23:50:05 GMT