koudelka / honeydew Goto Github PK
View Code? Open in Web Editor NEWJob Queue for Elixir. Clustered or Local. Straight BEAM. Optional Ecto. 💪🍈
License: MIT License
Job Queue for Elixir. Clustered or Local. Straight BEAM. Optional Ecto. 💪🍈
License: MIT License
hey
i often get
** (stop) exited in: GenServer.call(#PID<0.459.0>, {:enqueue, %Honeydew.Job{by: nil, completed_at: nil, enqueued_at: 1548523091425, failure_private: nil, from: nil, job_monitor: nil, private: nil, queue: :vnc_screenshot, result: nil, started_at: nil, task: {:screenshot, [{"x", xxxx}]}}}, 5000) ** (EXIT) time out (elixir) lib/gen_server.ex:989: GenServer.call/3 (honeydew) lib/honeydew.ex:71: Honeydew.async/3 (elixir) lib/task/supervised.ex:90: Task.Supervised.invoke_mfa/2 (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
anyway to increase it to 50 seconds`?
I want to select all items that have been successfully processed by honeydew, I guessed by looking at the source code that I can use where([x], is_nil(x.honeydew_whatever_lock))
to do this, but this couples my code tightly against honeydew implementation details, I'd like a more "official" or "public" way to achieve this.
honeydew/lib/honeydew/sources/ecto_source.ex
Lines 8 to 14 in b60b639
A documented way to select jobs in a different state would be nice too, but this is not something I'm struggling with currently.
Hi, I found Honeydew while looking for a solution for background processing in a Phoenix app with use of Ecto. I followed the example and ran into the following issue:
your repo's ecto adapter, Ecto.Adapters.MyXQL, isn't currently supported, but it's probably not hard to implement, open an issue and we'll chat!
I had a look at the PG.Adapter, seems possible to implement it for MySQL. Any pointers on what I should look out for, did anybody already try and fail?
@koudelka what do you think about adding Delayed Jobs support to honeydew?
The primary use-case I envision is delayed retries: when a job fail b/o temporary unavailability of some 3-rd party API it makes no sense to retry immediately - that would exhaust all retry attempts too fast.
Would this be a nice feature for honeydew? Maybe it's on the roadmap already?
edit: All below was laboring under the misapprehension that this project already supported Postgres, which it doesn't. I'm working on a PR to add that side-by-side with Cockroach, and make it easier to add support and tests for other databases with an adapter model. Ideally the postgres adapter would just be ANSI SQL but I'm not sure reserve can be implemented with the same semantics without using CTEs, which is how I presently have it working.
honeydew_fields
seems to be generating invalid SQL. In the example project I get a postgres syntax error when I run ecto.migrate.
I'm using postgres 10.3 but none of this has changed recently so I'm a bit mystified how you didn't have the same result.
The reason for the failure is because the syntax of extract is wrong, it should be EXTRACT('milisecond' from NOW())
). This is generated fragment:
(EXTRACT('millisecond', (CAST('1994-03-26 04:20:00' AS TIMESTAMP))) + CAST((CAST('1994-03-26 04:20:00' AS TIMESTAMP)) AS INT) * 1000)
I fixed that in the macro but then it complains that datetime cannot be cast to an int - the second part. I think maybe what you are after here with the cast is implemented in the epoch
function.
[] ~/repos/elixir/honeydew/examples/ecto_poll_queue <master> ✗ mix ecto.create
Compiling 6 files (.ex)
Generated ecto_poll_queue_example app
The database for EctoPollQueueExample.Repo has been created
[] ~/repos/elixir/honeydew/examples/ecto_poll_queue <master> ✗ mix ecto.migrate
17:53:01.570 [info] == Running EctoPollQueueExample.Repo.Migrations.CreatePhotosAndUsers.change/0 forward
17:53:01.572 [info] create table photos
** (Postgrex.Error) ERROR 42601 (syntax_error): syntax error at or near ","
(ecto) lib/ecto/adapters/sql.ex:200: Ecto.Adapters.SQL.query!/5
(ecto) lib/ecto/adapters/postgres.ex:96: anonymous fn/4 in Ecto.Adapters.Postgres.execute_ddl/3
(elixir) lib/enum.ex:1899: Enum."-reduce/3-lists^foldl/2-0-"/3
(ecto) lib/ecto/adapters/postgres.ex:96: Ecto.Adapters.Postgres.execute_ddl/3
(ecto) lib/ecto/migration/runner.ex:104: anonymous fn/2 in Ecto.Migration.Runner.flush/0
(elixir) lib/enum.ex:1899: Enum."-reduce/3-lists^foldl/2-0-"/3
(ecto) lib/ecto/migration/runner.ex:102: Ecto.Migration.Runner.flush/0
(stdlib) timer.erl:181: :timer.tc/2
I'd love to be able to kill a job if the cast that creates it times out. I've hacked away at work_queue.ex and kind of got something working but I thought I'd check in here and see if there's an easier non-hacky way.
I'm quite new to Elixir, but I feel like there's some way I can achieve this via process linking maybe?
Thanks for your help!
The changes to add success mode support added some calls to :erlang.system_time(:milliseconds)
which is only supported in OTP 19.1+. Before OTP 19.1, the call was :erlang.system_time(:milli_seconds)
, which is now deprecated.
If you do want to maintain compatibility with earlier OTP versions, you could call the Elixir version: System.system_time(:milliseconds)
. If not, it would be worth mentioning in the README the min OTP version is 19.1.
It looks like the changelog file is a little out of date.
Honeydew expects just a single primary key, and it's not possible to use composite primary keys, like this:
defmodule HelloPhoenix.Player do
use HelloPhoenix.Web, :model
@primary_key false
schema "players" do
field :first_name, :string, primary_key: true
field :last_name, :string, primary_key: true
field :position, :string
field :number, :integer
. . .
I was going to take a stab at a PR, but the ecto integration test is also the example so not sure where to fit a test for composite primary keys.
It also looks like honeydew expects the primary key to be called :id
rather than fetching the primary key column name here:
honeydew/lib/honeydew/sources/ecto_source.ex
Line 194 in 3793723
I have a job that was retried multiple times, but it got stuck in the queue.
Here is the job:
iex(smsc@smsc03)1> Honeydew.filter("my_queue", fn a-> true end)
[
%Honeydew.Job{
by: nil,
completed_at: nil,
delay_secs: 255,
enqueued_at: 1581708566962,
failure_private: 9,
from: nil,
job_monitor: nil,
private: -576460752303404335,
queue: "my_queue",
result: nil,
started_at: nil,
task: {:send_msg,
[
%Smsc.Mt.Payload{
delivery_method: "HTTP",
delivery_url: "https://tata.com/sms.php",
dest: "15142875555",
login: "my_queue",
msg: "Bla bla",
msg_id: 8176286,
orig: "15148095555",
smsc_id: "e-tx"
}
]}
}
]
My workers seem to continue processing other jobs on that queue.
Here is my queue status:
iex(smsc@smsc03)1> Honeydew.status("my_queue")
%{
queue: %{
count: 1,
in_progress: 0,
mnesia: %{
"honeydew_\"my_queue\"": [
access_mode: :read_write,
active_replicas: [:smsc@smsc03],
all_nodes: [:smsc@smsc03],
arity: 3,
attributes: [:key, :job],
checkpoints: [],
commit_work: [],
cookie: {{1581628211666457440, -576460752303418191, 1}, :smsc@smsc03},
cstruct: {:cstruct, :"honeydew_\"my_queue\"", :ordered_set, [],
[:smsc@smsc03], [], [], 0, :read_write, false, [], [], false,
:wrapped_job, [:key, :job], [], [], [],
{{1581628211666457440, -576460752303418191, 1}, :smsc@smsc03},
{{2, 0}, []}},
disc_copies: [:smsc@smsc03],
disc_only_copies: [],
external_copies: [],
frag_properties: [],
index: [],
index_info: {:index, :ordered_set, []},
load_by_force: false,
load_node: :smsc@smsc03,
load_order: 0,
load_reason: :local_only,
local_content: false,
majority: false,
master_nodes: [],
memory: 242,
ram_copies: [],
record_name: :wrapped_job,
record_validation: {:wrapped_job, 3, :ordered_set},
size: 1,
snmp: [],
storage_properties: [],
storage_type: :disc_copies,
subscribers: [],
type: :ordered_set,
user_properties: [],
version: {{2, 0}, []},
where_to_commit: [smsc@smsc03: :disc_copies],
where_to_read: :smsc@smsc03,
where_to_wlock: {[:smsc@smsc03], false},
where_to_write: [:smsc@smsc03],
wild_pattern: {:wrapped_job, :_, :_}
],
"honeydew_\"my_queue\"_in_progress": [
access_mode: :read_write,
active_replicas: [:smsc@smsc03],
all_nodes: [:smsc@smsc03],
arity: 3,
attributes: [:key, :job],
checkpoints: [],
commit_work: [],
cookie: {{1581628211669843292, -576460752303418143, 1}, :smsc@smsc03},
cstruct: {:cstruct, :"honeydew_\"my_queue\"_in_progress", :set, [],
[:smsc@smsc03], [], [], 0, :read_write, false, [], [], false,
:wrapped_job, [:key, :job], [], [], [],
{{1581628211669843292, -576460752303418143, 1}, :smsc@smsc03},
{{2, 0}, []}},
disc_copies: [:smsc@smsc03],
disc_only_copies: [],
external_copies: [],
frag_properties: [],
index: [],
index_info: {:index, :set, []},
load_by_force: false,
load_node: :smsc@smsc03,
load_order: 0,
load_reason: :local_only,
local_content: false,
majority: false,
master_nodes: [],
memory: 305,
ram_copies: [],
record_name: :wrapped_job,
record_validation: {:wrapped_job, 3, :set},
size: 0,
snmp: [],
storage_properties: [],
storage_type: :disc_copies,
subscribers: [],
type: :set,
user_properties: [],
version: {{2, 0}, []},
where_to_commit: [smsc@smsc03: :disc_copies],
where_to_read: :smsc@smsc03,
where_to_wlock: {[:smsc@smsc03], false},
where_to_write: [:smsc@smsc03],
wild_pattern: {:wrapped_job, :_, :_}
]
},
suspended: false
},
workers: %{#PID<0.4271.0> => nil}
}
Hi,
it's really strange, but at home the example works, but at work it doesn't work.
I'm using honeydew 1.4.1, my example is below, if i call start and then work,
i receive a list of jobs, but the jobs aren't really called.
defmodule MyWorker do
def do_a_thing(test) do
IO.puts("doing a thing#{test}!")
end
def start do
:ok = Honeydew.start_queue(:my_queue, queue: {Honeydew.Queue.ErlangQueue, []})
:ok = Honeydew.start_workers(:my_queue, MyWorker)
end
def test do
{:do_a_thing, ["Test2"]} |> Honeydew.async(:my_queue)
end
def work do
for i <- 1..10000 do
{:do_a_thing, [Integer.to_string(i)]} |> Honeydew.async(:my_queue)
:ok
end
end
end
The jobs are like this:
%Honeydew.Job{
by: nil,
completed_at: nil,
delay_secs: 0,
enqueued_at: 1559028347918,
failure_private: nil,
from: nil,
job_monitor: nil,
private: -576460752303423068,
queue: :my_queue,
result: nil,
started_at: nil,
task: {:do_a_thing, ["1"]}
},
I don't know why it works at home and not at work.
At work, I'm using Windows with this version: Elixir 1.8.0 and OTP 20.2
At home, I'm using Ubuntu and Elixir 1.8.2 and OTP 22.0.1.
It's really strange, it's a fresh elixir project without other dependencies,
only {:honeydew, "~> 1.4.1"}.
Thanks,
Werner.
So I am using Honeydew + Ecto, and I can't find a way to restart / re-run a job.
In my case, I have a job that I want to be run every time a record in my database has been created, and this works great.
However, I would like to also schedule a job once the record has been updated.
If I understand correctly there is currently no way to do it short of manually setting record's honeydew fields to default values, as they were when record been created.
But this puts me at risk at race condition if the record is being updated while the initlal job is running, or when multiple updates happen in short periods of time.
What is the correct way of handling the situation of running a job on record updates?
My current thinking is we create associated record that represents the record update, and corresponding job. So, if I have Photo, and I want to run ClassifyPhoto job after each update, I insert for each photo update a record, say PhotoUpdated, with a timestamp and honeydew fields, and hook up ClasifyPhoto job to this PhotoUpdated - and not Photo schema.
Is my thinking correct or am I missing some smart funcionality Honeydew provides to do it?
Hello, I created a custom mnesia queue that do some specific things to my use case.
The issue is that my queue is slower than the built-in mnesia queue, so sometimes it takes more than 5 seconds to queue a new job, making GenServer.call(queue_process, {:enqueue, job})
timeout and crash.
The obvious solution to that is to bump the timeout value, but there is no way to configure that inside Honeydew.
I tried creating a fork and seeing if there is an easy way to pass the timeout value to all Genserver
calls, but it doesn't seem to be an easy way to do that since these functions can be called in so many places.
For me, an easy fix would be to simply change the default of all the calls to use :infinity
as timeout since it is pretty safe and only creates a problem if you have a deadlock, but I'm not sure if that is ok for you.
Do you have any suggestion?
From the examples it's not clear to me how to start Honeydew in a phoenix project, in any point where I try to add the start up code I get this error:
no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
I tried placing it in the existing Application.start/2
method, in a GenServer module inside of the supervision tree of the existing Application, I always get that error, could you point me in the right direction please?
I found this example for clearing out a job when I know the exact arguments.
Is there any way to just whack every job in the entire queue? I've tried matching on just an empty %{}
but it gives me an ArgumentError
. In fact, the below example gives me that as well, even though it won't match my own arguments. I assume it would just tell me it doesn't match anything or return an empty list instead of the error.
I'm using an ErlangQueue
.
# find the job that would run with the argument `10`, as it wont have started yet
# and cancel it
:ok =
Honeydew.filter(:my_queue, %{task: {:run, [10]}})
|> List.first
|> Honeydew.cancel
Otherwise this is hard to gracefully handle errors when starting queues.
Problem:
I'd like to be able to scale workers & global queues as new nodes join a cluster. For example, say I'm getting a unusually large amount of jobs, I'd like to be able to add another node and be able to start more workers on that node.
From what I can tell from the documentation and reading the code, there is no way currently to start more workers for a global queue which already has workers available. If I have started the queue and some workers and try to start more workers on a new node, I get "error, already started".
I'm currently using global queues backed by Mnesia.
I don't know if there is a way currently to do this. I'd like to be able, when a new node joins the cluster, to start more workers for a queue on that node and possibly make the new node also be a "disc copy" of the queue as well. But I'd take just being able to start more workers.
The only error we have is this one:
Job failed due unexpected exit. %Honeydew.Job{by: :nonode@nohost, completed_at: nil, delay_secs: 0, enqueued_at: 1580554352191, failure_private: nil, from: nil, job_monitor: #PID<0.9131.53>, private: -576460752298270142, queue: :new_block, result: {:error, %Honeydew.Crash{reason: {:timeout, {Task, :await, [%Task{owner: #PID<0.9133.53>, pid: #PID<0.4273.53>, ref: #Reference<0.3764113133.549453826.23758>}, 50000]}}, stacktrace: [], type: :exit}}, started_at: nil, task: {:new, ["000000000000000000063f666d006acc3c83013f38be1662986639a7bfc19d79"]}}",
** (exit) exited in: Task.await(%Task{owner: #PID<0.9133.53>, pid: #PID<0.4273.53>, ref: #Reference<0.3764113133.549453826.23758>}, 50000)
** (EXIT) time out
After this error no more jobs is being processed.
We run honeydew with:
:ok = Honeydew.start_queue(:new, failure_mode: {Honeydew.FailureMode.Retry, times: 10})
:ok = Honeydew.start_workers(:new, App.BlockWorker, num: 1)
Hey!
I've been looking through the various job queue + worker libraries and I really like what I see here. My primary concern at the moment has to do with the infinite timeout you're using at https://github.com/koudelka/honeydew/blob/master/lib/honeydew/worker.ex#L45 for when a worker is waiting on a job.
While I understand why you're going that route, it's in general best to avoid these. This is particularly true when it blocks a GenServer. The worker process will be unable to respond to any debug or system messages it receives during that time, and its process inbox could fill up indefinitely.
As an alternative, have you considered the general approach taken by the proposed GenRouter architecture? Essentially, each worker would notify either your queue or perhaps an additional Scheduler process that it is available. Then, when a task comes in, the queue would just grab that a pid from its list of available worker pids and send the task off.
I do think there's value to doing it in another process honestly. In that scenario, your queue would also notify the scheduler when a new item arrives, and the scheduler would maintain a count of those items. Then if the count > 0 and a worker says its free, it can match up the job with the worker immediately. In this way you avoid blocking any genservers for an extended period of time.
Thoughts?
I'm having problems booting the app. I've manually created the mnesia schema, but when I try to start I get this:
eggl_1 | 16:20:51.643 [info] Starting with Honeydew nodes: [:"eggl_and_rested@${HOSTNAME}"]
eggl_1 |
eggl_1 | =CRASH REPORT==== 8-Jan-2018::16:20:51 ===
eggl_1 | crasher:
eggl_1 | initial call: Elixir.Honeydew.Queue:init/1
eggl_1 | pid: <0.2499.0>
eggl_1 | registered_name: []
eggl_1 | exception error: no case clause matching
eggl_1 | {aborted,
eggl_1 | {bad_type,'honeydew_:eggl_default_queue',
eggl_1 | disc_copies,'eggl_and_rested@${HOSTNAME}'}}
eggl_1 | in function 'Elixir.Honeydew.Queue.Mnesia':init/2 (lib/honeydew/queue/mnesia.ex, line 49)
eggl_1 | in call from 'Elixir.Honeydew.Queue':init/1 (lib/honeydew/queue.ex, line 71)
eggl_1 | in call from gen_server:init_it/2 (gen_server.erl, line 365)
eggl_1 | in call from gen_server:init_it/6 (gen_server.erl, line 333)
eggl_1 | ancestors: ['honeydew.queue_supervisor.eggl_default_queue',
eggl_1 | 'Elixir.Eggl.Workers.Supervisor','Elixir.Eggl.Supervisor',
eggl_1 | <0.2381.0>]
eggl_1 | message_queue_len: 0
eggl_1 | messages: []
eggl_1 | links: [<0.2498.0>]
eggl_1 | dictionary: []
eggl_1 | trap_exit: false
eggl_1 | status: running
eggl_1 | heap_size: 2586
eggl_1 | stack_size: 27
eggl_1 | reductions: 649
eggl_1 | neighbours:
Any ideas?
Hi,
I'd like to know if there is a way to be notified and take an action upon a job failing after all retries, for example I'd like to retry the job many times and if it fails every time at the end send an email notifying about the failure, I don't seem to find an example that accomplishes this, I'd have to perhaps write my own FailureMode
implementation?
Thanks.
Hi, not sure if this is a bug or I'm doing something wrong. In my case I create a global queue, connect two nodes with 10 workers on each and then see the following picture: node 2 receives only 10 tasks and after that only node 1 is executing tasks. Scheduler who sends task for execution to the queue sits on node 1.
I prepared a test project to demonstrate the behaviour: https://github.com/dkataskin/honeydew_nodelru
How to run it: start two nodes and connect them, node name should follow format "node{num}@127.0.0.1" as only node with name "[email protected]" starts the scheduler. You will see that second node will execute only 10 tasks and then stop processing further jobs. Job id is monotonically increased int.
More instructions in the readme of the repository, here are logs from console, logs were edited for the sake of clarity:
Node 1:
11:52:05.962 [info] app starting...
11:52:05.971 [info] test scheduler is starting...
11:52:05.971 [info] app started...
11:52:05.973 [debug] [email protected]: processed job id=1
11:52:06.972 [debug] [email protected]: processed job id=2
11:52:07.973 [debug] [email protected]: processed job id=3
...
11:52:36.001 [debug] [email protected]: processed job id=31
11:52:36.621 [info] [Honeydew] Connection to [email protected] established, looking for workers...
11:52:37.002 [debug] [email protected]: processed job id=32
11:52:39.004 [debug] [email protected]: processed job id=34
11:52:41.006 [debug] [email protected]: processed job id=36
11:52:43.008 [debug] [email protected]: processed job id=38
11:52:45.010 [debug] [email protected]: processed job id=40
11:52:47.012 [debug] [email protected]: processed job id=42
11:52:49.014 [debug] [email protected]: processed job id=44
11:52:51.016 [debug] [email protected]: processed job id=46
11:52:53.018 [debug] [email protected]: processed job id=48
11:52:55.020 [debug] [email protected]: processed job id=50
11:52:57.022 [debug] [email protected]: processed job id=52
11:52:58.023 [debug] [email protected]: processed job id=53
11:52:59.024 [debug] [email protected]: processed job id=54
11:53:00.025 [debug] [email protected]: processed job id=55
...
notice that 10 jobs from interval 32-52 have been processed on just joined Node 2 and all further tasks have been processed on Node 1.
Node 2:
11:52:27.193 [info] app starting...
11:52:27.203 [info] app started...
iex([email protected])1> Node.connect(:'[email protected]')
11:52:36.621 [info] [Honeydew] Connection to [email protected] established, looking for workers...
11:52:38.004 [debug] [email protected]: processed job id=33
11:52:40.006 [debug] [email protected]: processed job id=35
11:52:42.008 [debug] [email protected]: processed job id=37
11:52:44.009 [debug] [email protected]: processed job id=39
11:52:46.012 [debug] [email protected]: processed job id=41
11:52:48.013 [debug] [email protected]: processed job id=43
11:52:50.015 [debug] [email protected]: processed job id=45
11:52:52.018 [debug] [email protected]: processed job id=47
11:52:54.019 [debug] [email protected]: processed job id=49
11:52:56.022 [debug] [email protected]: processed job id=51
...
Honeydew.status({:global, :test_queue})
%{queue: %{count: 0, in_progress: 0, suspended: false},
workers: %{#PID<18569.165.0> => nil, #PID<0.165.0> => nil,
#PID<18569.166.0> => nil, #PID<0.166.0> => nil, #PID<18569.167.0> => nil,
#PID<0.167.0> => nil, #PID<18569.168.0> => nil, #PID<0.168.0> => nil,
#PID<18569.169.0> => nil, #PID<0.169.0> => nil, #PID<18569.170.0> => nil,
#PID<0.170.0> => nil, #PID<18569.171.0> => nil, #PID<0.171.0> => nil,
#PID<18569.172.0> => nil, #PID<0.172.0> => nil, #PID<18569.173.0> => nil,
#PID<0.173.0> => nil, #PID<18569.174.0> => nil, #PID<0.174.0> => nil}}
When I run my app on my local computer I don't have any issues, but when I run the release version on the prod server I get the following error message:
{:error,
{{{:case_clause,
{:aborted,
{:bad_type, :"honeydew_\"swiftvox\"", :disc_copies, :smsc@smsc01}}},
[
{Honeydew.Queue.Mnesia, :"-init/2-fun-0-", 2,
[file: 'lib/honeydew/queue/mnesia.ex', line: 79]},
{Enum, :"-each/2-fun-0-", 3, [file: 'lib/enum.ex', line: 789]},
{:maps, :fold_1, 3, [file: 'maps.erl', line: 232]},
{Enum, :each, 2, [file: 'lib/enum.ex', line: 2127]},
{Honeydew.Queue.Mnesia, :init, 2,
[file: 'lib/honeydew/queue/mnesia.ex', line: 76]},
{Honeydew.Queue, :init, 1, [file: 'lib/honeydew/queue.ex', line: 97]},
{:gen_server, :init_it, 2, [file: 'gen_server.erl', line: 374]},
{:gen_server, :init_it, 6, [file: 'gen_server.erl', line: 342]}
]},
{:child, :undefined, "swiftvox",
{Honeydew.Queue, :start_link,
[
[
"swiftvox",
Honeydew.Queue.Mnesia,
[disc_copies: [:smsc@smsc01]],
{Honeydew.Dispatcher.LRU, []},
{Honeydew.FailureMode.Abandon, []},
nil,
false
]
]}, :transient, 5000, :worker, [Honeydew.Queue]}}}
I am wondering what could be the issue.
Here is the sample code (based on the mnesia example):
defmodule Worker do
@behaviour Honeydew.Worker
def hello(thing) do
IO.puts "Hello #{thing}!"
:timer.sleep(5000)
end
end
defmodule App do
def start do
nodes = [node()]
:ok = Honeydew.start_queue(:swiftvox, queue: {Honeydew.Queue.Mnesia, [disc_copies: nodes]})
:ok = Honeydew.start_workers(:swiftvox, Worker, num: 1)
end
end
App.start
Hey,
I have a problem where I cant filter a job. I tried the example of queue + cancel and tried passing an fn that returns true to get all jobs from .filter but I always get an ArgumentError.
#enqueue a job
{:receive_work, [stages]} |> Honeydew.async({:global, :my_queue})
# Status
Honeydew.status({:global, :my_queue})
|> Map.get(:queue)
|> IO.inspect
# find the job and cancel it
# :ok =
Honeydew.filter({:global, :my_queue}, %{task: {:receive_work, [stages]}})
|> List.first
|> Honeydew.cancel
The error:
** (MatchError) no match of right hand side value: {:error, %ArgumentError{message: "argument error"}}
(honeydew) lib/honeydew.ex:191: Honeydew.filter/2
(queue) lib/server/router.ex:133: anonymous fn/2 in App.Router.do_match/4
(queue) lib/server/router.ex:2: App.Router.plug_builder_call/2
(queue) lib/plug/error_handler.ex:64: App.Router."call (overridable 3)"/2
(queue) lib/plug/debugger.ex:122: App.Router.call/2
(plug_cowboy) lib/plug/cowboy/handler.ex:18: Plug.Adapters.Cowboy.Handler.upgrade/4
(cowboy) src/cowboy_protocol.erl:442: :cowboy_protocol.execute/4
Thanks in advance!
Not sure if this is something the library doesn't support, or if I have missed something.
I am trying to have the node automatically retry jobs that were running on a node that died. I'm using mnesia storage.
I boot a node and start a bunch of long running jobs, then ctrl-c the iex session.
I boot a new node with the same name. I can see all the jobs I just started using Honeydew.status(:my_queue)
but they just show as in_progress
forever.
Is there a way to have those automatically retried by the new node?
First of all - many thanks for a great library.
I am pretty new to Elixir, so this took me quite some to me to debug. But application would not boot when released with mix release
with
11:55:15.110 [info] Application workplace2slack exited: exited in: Workplace2slack.Application.start(:normal, [])
** (EXIT) an exception was raised:
** (MatchError) no match of right hand side value: {:error, {{:undef, [{:mnesia, :create_table, [:"honeydew_:slack", [ram_copies: [:workplace2slack@bobek], attributes: [:key, :job], record_name: :wrapped_job, type: :ordered_set]], []}, {Honeydew.Queue.Mnesia, :"-init/2-fun-0-", 2, [file: 'lib/honeydew/queue/mnesia.ex', line: 79]}, {Enum, :"-each/2-fun-0-", 3, [file: 'lib/enum.ex', line: 789]}, {:maps, :fold_1, 3, [file: 'maps.erl', line: 232]}, {Enum, :each, 2, [file: 'lib/enum.ex', line: 1964]}, {Honeydew.Queue.Mnesia, :init, 2, [file: 'lib/honeydew/queue/mnesia.ex', line: 76]}, {Honeydew.Queue, :init, 1, [file: 'lib/honeydew/queue.ex', line: 97]}, {:gen_server, :init_it, 2, [file: 'gen_server.erl', line: 374]}]}, {:child, :undefined, :slack, {Honeydew.Queue, :start_link, [[:slack, Honeydew.Queue.Mnesia, [ram_copies: [:workplace2slack@bobek]], {Honeydew.Dispatcher.LRU, []}, {Honeydew.FailureMode.Abandon, []}, nil, false]]}, :transient, 5000, :worker, [Honeydew.Queue]}}}
(workplace2slack) lib/workplace2slack/application.ex:22: Workplace2slack.Application.start/2
(kernel) application_master.erl:277: :application_master.start_it_old/4
{"Kernel pid terminated",application_controller,"{application_start_failure,workplace2slack,{bad_return,{{'Elixir.Workplace2slack.Application',start,[normal,[]]},{'EXIT',{{badmatch,{error,{{undef,[{mnesia,create_table,['honeydew_:slack',[{ram_copies,[workplace2slack@bobek]},{attributes,[key,job]},{record_name,wrapped_job},{type,ordered_set}]],[]},{'Elixir.Honeydew.Queue.Mnesia','-init/2-fun-0-',2,[{file,\"lib/honeydew/queue/mnesia.ex\"},{line,79}]},{'Elixir.Enum','-each/2-fun-0-',3,[{file,\"lib/enum.ex\"},{line,789}]},{maps,fold_1,3,[{file,\"maps.erl\"},{line,232}]},{'Elixir.Enum',each,2,[{file,\"lib/enum.ex\"},{line,1964}]},{'Elixir.Honeydew.Queue.Mnesia',init,2,[{file,\"lib/honeydew/queue/mnesia.ex\"},{line,76}]},{'Elixir.Honeydew.Queue',init,1,[{file,\"lib/honeydew/queue.ex\"},{line,97}]},{gen_server,init_it,2,[{file,\"gen_server.erl\"},{line,374}]}]},{child,undefined,slack,{'Elixir.Honeydew.Queue',start_link,[[slack,'Elixir.Honeydew.Queue.Mnesia',[{ram_copies,[workplace2slack@bobek]}],{'Elixir.Honeydew.Dispatcher.LRU',[]},{'Elixir.Honeydew.FailureMode.Abandon',[]},nil,false]]},transient,5000,worker,['Elixir.Honeydew.Queue']}}}},[{'Elixir.Workplace2slack.Application',start,2,[{file,\"lib/workplace2slack/application.ex\"},{line,22}]},{application_master,start_it_old,4,[{file,\"application_master.erl\"},{line,277}]}]}}}}}"}
Kernel pid terminated (application_controller) ({application_start_failure,workplace2slack,{bad_return,{{'Elixir.Workplace2slack.Application',start,[normal,[]]},{'EXIT',{{badmatch,{error,{{undef,[{mne
Crash dump is being written to: erl_crash.dump...done
I have finally figured out, that I need to add mnesia to :extra_applications
in mix.exs
:
def application do
[
extra_applications: [:logger, :mnesia],
mod: {MyApp.Application, []}
]
end
In my case :finally
had invalid configuration: finally: Honeydew.FailureMode.Move
and this led to an interesting behaviour when honeydew tried to retry job forever. I had log entries like this:
..retrying 0 more times, job: %Honeydew.Job{...}
and then
..retrying -1 more times, job: %Honeydew.Job{...}
..retrying -2 more times, job: %Honeydew.Job{...}
and so on.
Hello.
I was playing with great feature Ecto Poll Queue. At some point, I notice that you are not supporting the MySQL database.
Are there any plans for that?
Thanks
We're on Ecto 3.4.5, which demands more of you than the 3.0.7 in your mix.lock
does:
warning: function embed_as/1 required by behaviour Ecto.Type is not implemented (in module Honeydew.EctoSource.ErlangTerm)
lib/honeydew/sources/ecto/erlang_term.ex:2: Honeydew.EctoSource.ErlangTerm (module)
If I'm reading c:Ecto.Type.embed_as/1
right, the fix might be as easy as:
@impl true
def embed_as(_), do: :self
Currently Honeydew.Queue
is private and undocumented which makes it pretty hard to implement custom queues.
I'm surveying projects in this space and am curious about how honeydew handles or plans to handle prioritization. What happens when the workers are saturated and an important job comes in that should be run before others in the queue?
thank you for your time/thoughts.
Hey,
I'm just starting my journey in elixir and started my first toy project last weekend.
In this project, I got a typical situation of an asynchronous task. There is just one tiny caveat...
The database object gets created and a rate limited worker supplements the object with additional data.
Now, with this new data I can finally start my new gloriously expensive task which takes all of 100ms to finish.
any chance i could convince you to add custom constraints to the ecto queue like saying ? some_field IS NOT NULL
when i'm spawning the workers?
I think the only change necessary would be in the reserve
function, but I might be mistaken - I'm a total beginner after all ;)
I love the API btw 👍
Have you looked into extracting the queue into a behaviour that could possibly support multiple backends? With the scheduler / queue distinction mentioned in #1 it would be a lot easier to shrink the size and scope of the queue genserver, and define a behaviour that even a user supplied module could adhere to.
The advantage is that if someone wanted to write a GenServer that acted as a queue but also persisted the job states to redis, ets, etc they could do so without any extra effort on your part.
Thanks!
Hi,
I want to persist data in a database not supported by Ecto3. I suppose it will use polling too. So what behaviour/interface I should implement? Source or Queue?
Also, how cold starts happen (say after disaster recover - fresh nodes, database is restored from backup)? Will honeydew attempt to load all the jobs at once on each node or partition somehow, like wait cluster up (whatever it means) and a selected node will dispatch persistent jobs?
Thanks!
I'd like to do something like this:
Is there any straightforward way to do this? (FWIW I'm using an Mnesia queue.)
I would like to know if there's a way to do a graceful shutdown of the Honeydew queues. For the moment I am calling the Honeydew.suspend() function on all queues and wait for a certain period of time. But I am wondering if there's a way to wait until all queues are suspended.
In my case I don't want jobs to be killed in the middle of their processing.
Using the mnesia queue seems too break it when trying to build a release with distillery
=CRASH REPORT==== 29-Jan-2017::17:29:46 ===
crasher:
initial call: Elixir.Honeydew.Queue.Mnesia:init/1
pid: <0.580.0>
registered_name: []
exception exit: {undef,
[{mnesia,create_schema,[['[email protected]']],[]},
{'Elixir.Honeydew.Queue.Mnesia',init,2,
[{file,"lib/honeydew/queue/mnesia.ex"},
{line,19}]},
{'Elixir.Honeydew.Queue.Mnesia',init,1,
[{file,"lib/honeydew/queue/mnesia.ex"},{line,2}]},
{gen_server,init_it,6,
[{file,"gen_server.erl"},{line,328}]},
{proc_lib,init_p_do_apply,3,
[{file,"proc_lib.erl"},{line,247}]}]}
in function gen_server:init_it/6 (gen_server.erl, line 352)
ancestors: ['honeydew.queue_supervisor.global.my_queue',<0.578.0>,
<0.574.0>,<0.535.0>]
messages: []
links: [<0.579.0>]
dictionary: []
trap_exit: false
status: running
heap_size: 987
stack_size: 27
reductions: 301
neighbours:
** (EXIT from #PID<0.574.0>) shutdown: failed to start child: Honeydew.QueueSupervisor
** (EXIT) an exception was raised:
** (MatchError) no match of right hand side value: {:error, {:undef, [{:mnesia, :create_schema, [[:"[email protected]"]], []}, {Honeydew.Queue.Mnesia, :init, 2, [file: 'lib/honeydew/queue/mnesia.ex', line: 19]}, {Honeydew.Queue.Mnesia, :init, 1, [file: 'lib/honeydew/queue/mnesia.ex', line: 2]}, {:gen_server, :init_it, 6, [file: 'gen_server.erl', line: 328]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 247]}]}}
(honeydew) lib/honeydew/queue_supervisor.ex:22: anonymous fn/2 in Honeydew.QueueSupervisor.start_link/6
(elixir) lib/enum.ex:651: anonymous fn/3 in Enum.each/2
(elixir) lib/enum.ex:1760: Enum.each/2
(honeydew) lib/honeydew/queue_supervisor.ex:21: Honeydew.QueueSupervisor.start_link/6
(stdlib) supervisor.erl:365: :supervisor.do_start_child/2
(stdlib) supervisor.erl:348: :supervisor.start_children/3
(stdlib) supervisor.erl:314: :supervisor.init_children/2
(stdlib) gen_server.erl:328: :gen_server.init_it/6
Is there a way to cancel all workers from a queue? When writing tests I'd like to be able to clear all the workers from a queue in an on_exit/1
callback.
While diagnosing a slow mix test
run, I came across the fact that our app startup was slow because we have two honeydew supervisors with a total of 15 or so workers.
It seems that honeydew waits for 100ms for each worker during init
which, in our case, slows down app startup time (and therefore every test run) by at least 1.5 seconds.
The offending line is here:
honeydew/lib/honeydew/worker.ex
Line 25 in 0fe01a1
Relevant log from #elixir-lang:
3:04 PM A<aaronjensen> Aaron Jensen
adding a 100ms sleep * # of workers during init seems like a bad idea, yea josevalim? :/
3:11 PM A<aaronjensen> Aaron Jensen
is there a good way to defer starting of those before we are able to help fix honeydew?
3:16 PM J<josevalim> José Valim
aaronjensen: yes, that sounds like a bad idea
3:17 PM J<josevalim> José Valim
that's going to block the supervisor also starting it for 100ms
3:17 PM it is really really bad :(
3:17 PM benwilson512: are you using the lib above?
3:18 PM J<josevalim> José Valim
aaronjensen: ping the author, they should totally not do that
Given the current log messages I can tell that some queues start and that there are workers that are ready for those queues by matching the pids, but I cannot tell which queues are started. I would really like for the log messages to give more information about where they are coming from.
Current Behavior:
19:21:42.576 [debug] [Honeydew] Worker #PID<0.361.0> sending ready
19:21:42.576 [debug] [Honeydew] Queue #PID<0.305.0> ready for worker #PID<0.361.0>
Desired Behavior:
19:21:42.576 [debug] [Honeydew] DoStuffWorkerModule Worker #PID<0.361.0> sending ready
19:21:42.576 [debug] [Honeydew] {:global, :do_stuff} Queue #PID<0.305.0> ready for worker #PID<0.361.0>
I think this change will improve introspection into what is happening inside the queue and worker pipeline. Thanks!
I just had a difficult to troubleshoot issue when I accidentally defined init/0
instead of init/1
. My worker module looked like this.
defmodule MyWorker do
def init do: {:ok, nil}
def run(arg, _state) do
# do work
end
end
I had my queues set up properly and the following log messages were printed when my app started up so I assumed everything started correctly.
10:32:36.283 [debug] [Honeydew] Worker #PID<0.268.0> sending ready
10:32:36.283 [debug] [Honeydew] Worker #PID<0.267.0> sending ready
10:32:36.283 [debug] [Honeydew] Queue #PID<0.264.0> ready for worker #PID<0.267.0>
10:32:36.283 [debug] [Honeydew] Queue #PID<0.264.0> ready for worker #PID<0.268.0>
However when I tried to call the function using honeydew
{:run, ["hello"]} |> Honeydew.async(:my_queue)
I would get the following error:
10:31:28.429 [error] GenServer #PID<0.745.0> terminating
** (UndefinedFunctionError) function MyWorker.run/1 is undefined or private. Did you mean one of:
* run/2
It wasn't until I put an IO.inspect in my init/0
function that I found it wasn't being called because it should have been init/1
.
Ideally, it would be nice to either raise an exception if an init/1
function wasn't defined on a worker, or document that you could skip defining an init/1
function but then it won't pass the last state argument into the functions you call on the worker.
I tried this lib and I can't get examples working.
I cloned the repository and then did a mix deps.get
, elixir -S mix run examples/progress_and_queue_status.exs
and I got no output.
I changed the start function to display the status of queues and workers to :
def start do
:ok = Honeydew.start_queue(:my_queue)
:ok = Honeydew.start_workers(:my_queue, HeavyTask, num: 10)
IO.puts "App started"
IO.inspect Honeydew.queues
IO.inspect Honeydew.workers
IO.inspect Honeydew.status(:my_queue)
end
I have this output:
App started
[:my_queue]
[:my_queue]
%{queue: %{count: 0, in_progress: 0, suspended: false}, workers: %{}}
what am I doing wrong? I run elixir 1.7.4
Hi this looks like a good fit for a project we are working on presently, but we currently have triggers send NOTIFY
that a GenServer
listens to rather than polling the database. This is because the updates come from outside our application, in a middleware integration service. Would you consider a PR adding support for this? I could almost do it outside of honeydew
except that I can't enqueue
manually for Ecto-backed jobs.
Is there a way to set the multi_time_warp
VM argument when testing? If not, how can I disable the output of this warning?
If a worker crashes due to an erlang error (like :badarg
), it outputs the full erlang error tuple and stack trace in a very unfriendly fashion. If the user is waiting for a response usingHoneydew.yield
, they also receive a {:error, <unfriendly erlang error>}
response. It would be awesome if Honeydew normalized these errors using Exception.normalize/3
before outputting them and returning them via Honeydew.yield
.
I'd be happy to make a failing test case, if you can point me where it needs to go.
Hello, I'm using honeydew to send HTTP requests to a third-party endpoint. This endpoint request can fail for some reasons and their recommendation is to do an Exponential Backoff, which I get out-of-box with your great library.
Another possibility is that they can send me a Retry-After header with the time I need to wait before retrying.
Since I will only know that value inside the job itself, is there any wait to, if I know the time I need to wait, use that time as the retry delay instead of the Exponential Backoff?
Basically this means that I would do the request, if fail, I would parse it anyway and get the Retry-After header, if it is present, I want to delay the next retry with the time they sent me, if it is not present or the job crashed for some other reason, I would fallback to Exponential Backoff
Is there a forecast to when ecto will be updated to 3.0?
Hi there, thank you so much for the great library! Having a great time so far!
I am trying to write a unit test for a module which inserts jobs in a queue with Honeydew.asyc
but my test case is asserting synchronously before the jobs are inserted.
Do you have any pointers on how one can assert jobs have been inserted?
First of all, thank you for this amazing library. It's really useful to me 💯
While reading the documentation i've noticed a inconsistency. In the README.md and in the code the Mnesia queue is the default.
But ErlangQueue is still indicated as the default in the start_queue/2
function documentation :
Lines 346 to 347 in 3a35093
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.