dashbitco / nimble_pool Goto Github PK
View Code? Open in Web Editor NEWA tiny resource-pool implementation for Elixir
A tiny resource-pool implementation for Elixir
I was thinking about how to implement a "least loaded" pooling strategy for sneako/finch#8, and didn't see a way of grabbing the queue length. Is that something that would be worth adding to nimble_pool
?
Any chance to get a new version up on hex?
Specifically, ea62a62 commit changes how it works quite a bit.
Hi
I've been working on a resource pool package for gleam for some time and this is what I have right now.
Currently it accepts a pool size and a function that creates a resource(can be anything) and creates a pool size number of processes that each has a resource as the state.
Then people can use an apply
function that accepts a function that acts on the resource and returns a result. apply
will checkout a process, send a message to it with the function so that the process can use that function on the resource, get the result and return it to the manager process, which will checkin the process and return the result to the user.
But right now this approach does not handle the situation in which either the worker process or the user process crashes and it seems to me that I won't be able to handle those situations and still be able to provide parallel resource usage with this approach.
So I've been trying to understand how nimble_pool
works so that I can implement a similar approach. But I'm having a hard time following what's going on in the code(I'm a nodejs developer by day and do BEAM stuff as side projects, so not really proficient in OTP).
The only thing I'm sure about right now, is that nimble_pool
will directly give the resource to the user. It also seems like there is a timeout after which it just checks the resource in, regardless of what the user has done with it but I'm not really sure about it or anything else.
I'd appreciate any guidance about this, thanks
I made a library that help to use Python, Ruby in Elixir applications.
https://github.com/nallwhy/doumi_port
But it makes error infinitly when it fails to initialize worker in init_worker/1
callback of NimblePool.
For example, it crashes with MatchError
if there is no python
installed.
lib/doumi/port/pool.ex
:63
@impl NimblePool
def init_worker({port_module, port_opts} = pool_state) do
{:ok, port} = port_module.start(port_opts) # <-- here
{:ok, %{port_module: port_module, port: port}, pool_state}
end
init_worker/1
allows to return only {:ok, _, _}
or {:asnyc, _}
, no error case.
I want the Supervisor of NimblePool crashes, and for it to propagate and terminate the application if initializing worker continues to fail.
How can I implement it?
It seems that state.queue
is not immediately cleared (while state.requests
is) after a client times out.
defmodule BadPool do
@behaviour NimblePool
@doc """
"""
def start_link(_args \\ []) do
NimblePool.start_link(
worker: {__MODULE__, nil},
pool_size: 1,
name: __MODULE__
)
end
def command(sleep \\ 5_000) do
NimblePool.checkout!(
__MODULE__,
:checkout,
fn _, _ ->
Process.sleep(sleep)
end,
1000
)
end
@impl NimblePool
def init_worker(_) do
{:ok, nil, nil}
end
@impl NimblePool
def handle_checkout(:checkout, _, _, _) do
{:ok, nil, nil, nil}
end
end
It seems the queue will build up until the connection is checked back in.
> {:ok, pool} = BadPool.start_link()
> Task.start(fn -> BadPool.command(60_000) end) # do this a few times
> :sys.get_state(pool)
%{
async: %{},
lazy: nil,
max_idle_pings: -1,
monitors: %{
#Reference<0.33670699.3888644103.189962> => #Reference<0.33670699.3888644103.189961>
},
# queue builds up despite...
queue: {[
{#PID<0.238.0>, #Reference<0.33670699.3888644097.191297>},
{#PID<0.236.0>, #Reference<0.33670699.3888644097.191289>},
{#PID<0.234.0>, #Reference<0.33670699.3888644097.191281>},
{#PID<0.232.0>, #Reference<0.33670699.3888644097.191273>}
], [{#PID<0.230.0>, #Reference<0.33670699.3888644097.191265>}]},
# ... timed-out requests having been removed here
requests: %{
#Reference<0.33670699.3888644103.189961> => {#PID<0.227.0>,
#Reference<0.33670699.3888644103.189962>, :state, nil}
},
resources: {[], []},
state: nil,
worker: BadPool,
worker_idle_timeout: nil
}
I tried the same thing with a pool size of 2 with two initial requests, one with a sleep of "infinity" and another for 60 seconds. The pool cleared as soon as the 60 second request finished.
So it seems that if all connections are checked out, the queue will grow indefinitely despite clients timing out.
I was just wondering if this is the desired behaviour?
Sometimes checkout!/4
function does not suit the needs of a program which wants to manually check resource in and out. Functions like checkin/4
and checkout/3
can solve this problem
You want to take a resource and create a Stream
which returns the resource back upon completion
With functions like checkin/3
and checkout/3
this would be possible
Stream.resource(
fn -> NimblePool.checkout(pool, :checkout, timeout) end,
fn resource -> get_part(resource) end,
fn resource -> NimblePool.checkin(pool, :checkin, resource, timeout) end
)
Hello ๐๐ผ
I would like to caveat this issue that I may have a use-case that is not a good fit for NimblePool
. I'm posting in the chance that it is and there is simply an API change that will help.
I am building a small internal library for publishing analytics events through AMQP. I would like to minimize the number of open AMQP connections by multiplexing through AMQP Channels. At application startup, I would like to open some number of connections, each with a factor multiple of channels. Then application publishers can make use of the active channels.
init_pool/1
@impl NimblePool
def init_pool(opts) do
case AMQP.Connection.open(opts) do
{:ok, conn} ->
Process.monitor(conn.pid)
{:ok, conn}
{:error, error} ->
{:stop, error}
end
end
@impl NimblePool
def init_worker(conn) do
async = fn ->
{:ok, channel} = AMQP.Channel.open(conn)
channel
end
{:async, async, conn}
end
def publish(value) do
NimblePool.checkout!(MyPool, :checkout, fn _pool, channel ->
response = AMQP.Basic.publish(channel, "exchange", "", value)
{response, channel}
end)
end
init_pool/1
blocks the init/1
callback of the poolFrom the docs
NimblePool may not be a good option to manage processes. Also avoid using NimblePool to manage resources that support multiplexing, such as HTTP 2 connections (in fact, pools are not a good option to manage resources with multiplexing in general).
Thank you :)
It would be nice to be able to get some telemetry from NimblePool.
In particular, it would be nice to be able to track pool utilization (# checkout/ #available) and time spent waiting for a resource.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.