Giter Site home page Giter Site logo

honeydew's People

Contributors

aaronrenner avatar amokan avatar bokner avatar evadne avatar fanzeyi avatar feymartynov avatar fishtreesugar avatar garthk avatar giodamelio avatar hauleth avatar janpieper avatar koudelka avatar nathanl avatar pragtob avatar rodrigues avatar vihu avatar x4lldux avatar zillou avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

honeydew's Issues

longer timeout possible?

hey
i often get

** (stop) exited in: GenServer.call(#PID<0.459.0>, {:enqueue, %Honeydew.Job{by: nil, completed_at: nil, enqueued_at: 1548523091425, failure_private: nil, from: nil, job_monitor: nil, private: nil, queue: :vnc_screenshot, result: nil, started_at: nil, task: {:screenshot, [{"x", xxxx}]}}}, 5000) ** (EXIT) time out (elixir) lib/gen_server.ex:989: GenServer.call/3 (honeydew) lib/honeydew.ex:71: Honeydew.async/3 (elixir) lib/task/supervised.ex:90: Task.Supervised.invoke_mfa/2 (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
anyway to increase it to 50 seconds`?

Add a documented way to select finished ecto queue jobs

I want to select all items that have been successfully processed by honeydew, I guessed by looking at the source code that I can use where([x], is_nil(x.honeydew_whatever_lock)) to do this, but this couples my code tightly against honeydew implementation details, I'd like a more "official" or "public" way to achieve this.

# 2. Indicates the status of the job, it can be either:
# - "ready", between zero and SQL.ready()
# - "delayed", between SQL.ready() and the beginning of the stale window
# - "in progress", from now until now + stale_timeout
# - "stale", within a year ago ago from now
# - "abandoned", -1
# - "finished", nil

A documented way to select jobs in a different state would be nice too, but this is not something I'm struggling with currently.

Ecto.Adapters.MyXQL, isn't currently supported

Hi, I found Honeydew while looking for a solution for background processing in a Phoenix app with use of Ecto. I followed the example and ran into the following issue:

 your repo's ecto adapter, Ecto.Adapters.MyXQL, isn't currently supported, but it's probably not hard to implement, open an issue and we'll chat! 

I had a look at the PG.Adapter, seems possible to implement it for MySQL. Any pointers on what I should look out for, did anybody already try and fail?

Delayed Jobs support?

@koudelka what do you think about adding Delayed Jobs support to honeydew?

The primary use-case I envision is delayed retries: when a job fail b/o temporary unavailability of some 3-rd party API it makes no sense to retry immediately - that would exhaust all retry attempts too fast.

Would this be a nice feature for honeydew? Maybe it's on the roadmap already?

Postgres Support

edit: All below was laboring under the misapprehension that this project already supported Postgres, which it doesn't. I'm working on a PR to add that side-by-side with Cockroach, and make it easier to add support and tests for other databases with an adapter model. Ideally the postgres adapter would just be ANSI SQL but I'm not sure reserve can be implemented with the same semantics without using CTEs, which is how I presently have it working.

honeydew_fields seems to be generating invalid SQL. In the example project I get a postgres syntax error when I run ecto.migrate. I'm using postgres 10.3 but none of this has changed recently so I'm a bit mystified how you didn't have the same result.

The reason for the failure is because the syntax of extract is wrong, it should be EXTRACT('milisecond' from NOW())). This is generated fragment:

(EXTRACT('millisecond', (CAST('1994-03-26 04:20:00' AS TIMESTAMP))) + CAST((CAST('1994-03-26 04:20:00' AS TIMESTAMP)) AS INT) * 1000)

I fixed that in the macro but then it complains that datetime cannot be cast to an int - the second part. I think maybe what you are after here with the cast is implemented in the epoch function.

[] ~/repos/elixir/honeydew/examples/ecto_poll_queue <master> ✗ mix ecto.create
Compiling 6 files (.ex)
Generated ecto_poll_queue_example app
The database for EctoPollQueueExample.Repo has been created
[] ~/repos/elixir/honeydew/examples/ecto_poll_queue <master> ✗ mix ecto.migrate

17:53:01.570 [info]  == Running EctoPollQueueExample.Repo.Migrations.CreatePhotosAndUsers.change/0 forward

17:53:01.572 [info]  create table photos
** (Postgrex.Error) ERROR 42601 (syntax_error): syntax error at or near ","
    (ecto) lib/ecto/adapters/sql.ex:200: Ecto.Adapters.SQL.query!/5
    (ecto) lib/ecto/adapters/postgres.ex:96: anonymous fn/4 in Ecto.Adapters.Postgres.execute_ddl/3
    (elixir) lib/enum.ex:1899: Enum."-reduce/3-lists^foldl/2-0-"/3
    (ecto) lib/ecto/adapters/postgres.ex:96: Ecto.Adapters.Postgres.execute_ddl/3
    (ecto) lib/ecto/migration/runner.ex:104: anonymous fn/2 in Ecto.Migration.Runner.flush/0
    (elixir) lib/enum.ex:1899: Enum."-reduce/3-lists^foldl/2-0-"/3
    (ecto) lib/ecto/migration/runner.ex:102: Ecto.Migration.Runner.flush/0
    (stdlib) timer.erl:181: :timer.tc/2

Cancel job on call timeout

I'd love to be able to kill a job if the cast that creates it times out. I've hacked away at work_queue.ex and kind of got something working but I thought I'd check in here and see if there's an easier non-hacky way.

I'm quite new to Elixir, but I feel like there's some way I can achieve this via process linking maybe?

Thanks for your help!

Mention min OTP version is now 19.1

The changes to add success mode support added some calls to :erlang.system_time(:milliseconds) which is only supported in OTP 19.1+. Before OTP 19.1, the call was :erlang.system_time(:milli_seconds), which is now deprecated.

If you do want to maintain compatibility with earlier OTP versions, you could call the Elixir version: System.system_time(:milliseconds). If not, it would be worth mentioning in the README the min OTP version is 19.1.

Composite primary keys

Honeydew expects just a single primary key, and it's not possible to use composite primary keys, like this:

defmodule HelloPhoenix.Player do
  use HelloPhoenix.Web, :model

  @primary_key false
  schema "players" do
    field :first_name, :string, primary_key: true
    field :last_name, :string, primary_key: true
    field :position, :string
    field :number, :integer
  . . .

I was going to take a stab at a PR, but the ecto integration test is also the example so not sure where to fit a test for composite primary keys.

It also looks like honeydew expects the primary key to be called :id rather than fetching the primary key column name here:

%^schema{id: id} = repo.load(schema, %{id: id})

Job stuck in a queue

I have a job that was retried multiple times, but it got stuck in the queue.

Here is the job:

iex(smsc@smsc03)1> Honeydew.filter("my_queue", fn a-> true end)
[
  %Honeydew.Job{
    by: nil,
    completed_at: nil,
    delay_secs: 255,
    enqueued_at: 1581708566962,
    failure_private: 9,
    from: nil,
    job_monitor: nil,
    private: -576460752303404335,
    queue: "my_queue",
    result: nil,
    started_at: nil,
    task: {:send_msg,
     [
       %Smsc.Mt.Payload{
         delivery_method: "HTTP",
         delivery_url: "https://tata.com/sms.php",
         dest: "15142875555",
         login: "my_queue",
         msg: "Bla bla",
         msg_id: 8176286,
         orig: "15148095555",
         smsc_id: "e-tx"
       }
     ]}
  }
]

My workers seem to continue processing other jobs on that queue.

Here is my queue status:

iex(smsc@smsc03)1> Honeydew.status("my_queue")
%{
  queue: %{
    count: 1,
    in_progress: 0,
    mnesia: %{
      "honeydew_\"my_queue\"": [
        access_mode: :read_write,
        active_replicas: [:smsc@smsc03],
        all_nodes: [:smsc@smsc03],
        arity: 3,
        attributes: [:key, :job],
        checkpoints: [],
        commit_work: [],
        cookie: {{1581628211666457440, -576460752303418191, 1}, :smsc@smsc03},
        cstruct: {:cstruct, :"honeydew_\"my_queue\"", :ordered_set, [],
         [:smsc@smsc03], [], [], 0, :read_write, false, [], [], false,
         :wrapped_job, [:key, :job], [], [], [],
         {{1581628211666457440, -576460752303418191, 1}, :smsc@smsc03},
         {{2, 0}, []}},
        disc_copies: [:smsc@smsc03],
        disc_only_copies: [],
        external_copies: [],
        frag_properties: [],
        index: [],
        index_info: {:index, :ordered_set, []},
        load_by_force: false,
        load_node: :smsc@smsc03,
        load_order: 0,
        load_reason: :local_only,
        local_content: false,
        majority: false,
        master_nodes: [],
        memory: 242,
        ram_copies: [],
        record_name: :wrapped_job,
        record_validation: {:wrapped_job, 3, :ordered_set},
        size: 1,
        snmp: [],
        storage_properties: [],
        storage_type: :disc_copies,
        subscribers: [],
        type: :ordered_set,
        user_properties: [],
        version: {{2, 0}, []},
        where_to_commit: [smsc@smsc03: :disc_copies],
        where_to_read: :smsc@smsc03,
        where_to_wlock: {[:smsc@smsc03], false},
        where_to_write: [:smsc@smsc03],
        wild_pattern: {:wrapped_job, :_, :_}
      ],
      "honeydew_\"my_queue\"_in_progress": [
        access_mode: :read_write,
        active_replicas: [:smsc@smsc03],
        all_nodes: [:smsc@smsc03],
        arity: 3,
        attributes: [:key, :job],
        checkpoints: [],
        commit_work: [],
        cookie: {{1581628211669843292, -576460752303418143, 1}, :smsc@smsc03},
        cstruct: {:cstruct, :"honeydew_\"my_queue\"_in_progress", :set, [],
         [:smsc@smsc03], [], [], 0, :read_write, false, [], [], false,
         :wrapped_job, [:key, :job], [], [], [],
         {{1581628211669843292, -576460752303418143, 1}, :smsc@smsc03},
         {{2, 0}, []}},
        disc_copies: [:smsc@smsc03],
        disc_only_copies: [],
        external_copies: [],
        frag_properties: [],
        index: [],
        index_info: {:index, :set, []},
        load_by_force: false,
        load_node: :smsc@smsc03,
        load_order: 0,
        load_reason: :local_only,
        local_content: false,
        majority: false,
        master_nodes: [],
        memory: 305,
        ram_copies: [],
        record_name: :wrapped_job,
        record_validation: {:wrapped_job, 3, :set},
        size: 0,
        snmp: [],
        storage_properties: [],
        storage_type: :disc_copies,
        subscribers: [],
        type: :set,
        user_properties: [],
        version: {{2, 0}, []},
        where_to_commit: [smsc@smsc03: :disc_copies],
        where_to_read: :smsc@smsc03,
        where_to_wlock: {[:smsc@smsc03], false},
        where_to_write: [:smsc@smsc03],
        wild_pattern: {:wrapped_job, :_, :_}
      ]
    },
    suspended: false
  },
  workers: %{#PID<0.4271.0> => nil}
}

Example doesn't work

Hi,
it's really strange, but at home the example works, but at work it doesn't work.
I'm using honeydew 1.4.1, my example is below, if i call start and then work,
i receive a list of jobs, but the jobs aren't really called.

defmodule MyWorker do
def do_a_thing(test) do
IO.puts("doing a thing#{test}!")
end

def start do
:ok = Honeydew.start_queue(:my_queue, queue: {Honeydew.Queue.ErlangQueue, []})
:ok = Honeydew.start_workers(:my_queue, MyWorker)
end

def test do
{:do_a_thing, ["Test2"]} |> Honeydew.async(:my_queue)
end

def work do
for i <- 1..10000 do
{:do_a_thing, [Integer.to_string(i)]} |> Honeydew.async(:my_queue)
:ok
end
end
end

The jobs are like this:

%Honeydew.Job{
by: nil,
completed_at: nil,
delay_secs: 0,
enqueued_at: 1559028347918,
failure_private: nil,
from: nil,
job_monitor: nil,
private: -576460752303423068,
queue: :my_queue,
result: nil,
started_at: nil,
task: {:do_a_thing, ["1"]}
},

I don't know why it works at home and not at work.
At work, I'm using Windows with this version: Elixir 1.8.0 and OTP 20.2
At home, I'm using Ubuntu and Elixir 1.8.2 and OTP 22.0.1.
It's really strange, it's a fresh elixir project without other dependencies,
only {:honeydew, "~> 1.4.1"}.

Thanks,
Werner.

No way to programatically restart a job?

So I am using Honeydew + Ecto, and I can't find a way to restart / re-run a job.

In my case, I have a job that I want to be run every time a record in my database has been created, and this works great.

However, I would like to also schedule a job once the record has been updated.

If I understand correctly there is currently no way to do it short of manually setting record's honeydew fields to default values, as they were when record been created.

But this puts me at risk at race condition if the record is being updated while the initlal job is running, or when multiple updates happen in short periods of time.

What is the correct way of handling the situation of running a job on record updates?

My current thinking is we create associated record that represents the record update, and corresponding job. So, if I have Photo, and I want to run ClassifyPhoto job after each update, I insert for each photo update a record, say PhotoUpdated, with a timestamp and honeydew fields, and hook up ClasifyPhoto job to this PhotoUpdated - and not Photo schema.

Is my thinking correct or am I missing some smart funcionality Honeydew provides to do it?

Custom queue makes Honeydew crashs by timeout

Hello, I created a custom mnesia queue that do some specific things to my use case.

The issue is that my queue is slower than the built-in mnesia queue, so sometimes it takes more than 5 seconds to queue a new job, making GenServer.call(queue_process, {:enqueue, job}) timeout and crash.

The obvious solution to that is to bump the timeout value, but there is no way to configure that inside Honeydew.

I tried creating a fork and seeing if there is an easy way to pass the timeout value to all Genserver calls, but it doesn't seem to be an easy way to do that since these functions can be called in so many places.

For me, an easy fix would be to simply change the default of all the calls to use :infinity as timeout since it is pretty safe and only creates a problem if you have a deadlock, but I'm not sure if that is ok for you.

Do you have any suggestion?

Can't get it to start in a Phoenix project

From the examples it's not clear to me how to start Honeydew in a phoenix project, in any point where I try to add the start up code I get this error:

no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started

I tried placing it in the existing Application.start/2 method, in a GenServer module inside of the supervision tree of the existing Application, I always get that error, could you point me in the right direction please?

documentation

  • more examples
    • local mnesia queue
    • dead letter queue
    • failure mode chains (retry x times then move to dead letter, etc)
  • user-facing api (the tl;dr section at the top of the README)
  • mnesia + distillery caveat (need to stop mnesia before create_schema, but releases start all apps)
  • move README to a wiki instead

Clear an entire queue

I found this example for clearing out a job when I know the exact arguments.

Is there any way to just whack every job in the entire queue? I've tried matching on just an empty %{} but it gives me an ArgumentError. In fact, the below example gives me that as well, even though it won't match my own arguments. I assume it would just tell me it doesn't match anything or return an empty list instead of the error.

I'm using an ErlangQueue.

# find the job that would run with the argument `10`, as it wont have started yet
# and cancel it
:ok =
  Honeydew.filter(:my_queue, %{task: {:run, [10]}})
  |> List.first
  |> Honeydew.cancel

Dynamic Workers and Queues for scaling?

Problem:

I'd like to be able to scale workers & global queues as new nodes join a cluster. For example, say I'm getting a unusually large amount of jobs, I'd like to be able to add another node and be able to start more workers on that node.

From what I can tell from the documentation and reading the code, there is no way currently to start more workers for a global queue which already has workers available. If I have started the queue and some workers and try to start more workers on a new node, I get "error, already started".

I'm currently using global queues backed by Mnesia.

I don't know if there is a way currently to do this. I'd like to be able, when a new node joins the cluster, to start more workers for a queue on that node and possibly make the new node also be a "disc copy" of the queue as well. But I'd take just being able to start more workers.

Looks like worker dies and no more jobs being picked up

The only error we have is this one:

Job failed due unexpected exit. %Honeydew.Job{by: :nonode@nohost, completed_at: nil, delay_secs: 0, enqueued_at: 1580554352191, failure_private: nil, from: nil, job_monitor: #PID<0.9131.53>, private: -576460752298270142, queue: :new_block, result: {:error, %Honeydew.Crash{reason: {:timeout, {Task, :await, [%Task{owner: #PID<0.9133.53>, pid: #PID<0.4273.53>, ref: #Reference<0.3764113133.549453826.23758>}, 50000]}}, stacktrace: [], type: :exit}}, started_at: nil, task: {:new, ["000000000000000000063f666d006acc3c83013f38be1662986639a7bfc19d79"]}}",
** (exit) exited in: Task.await(%Task{owner: #PID<0.9133.53>, pid: #PID<0.4273.53>, ref: #Reference<0.3764113133.549453826.23758>}, 50000)
    ** (EXIT) time out

After this error no more jobs is being processed.

We run honeydew with:

:ok = Honeydew.start_queue(:new, failure_mode: {Honeydew.FailureMode.Retry, times: 10})
:ok = Honeydew.start_workers(:new, App.BlockWorker, num: 1)

Infinite timeouts with job_please call

Hey!

I've been looking through the various job queue + worker libraries and I really like what I see here. My primary concern at the moment has to do with the infinite timeout you're using at https://github.com/koudelka/honeydew/blob/master/lib/honeydew/worker.ex#L45 for when a worker is waiting on a job.

While I understand why you're going that route, it's in general best to avoid these. This is particularly true when it blocks a GenServer. The worker process will be unable to respond to any debug or system messages it receives during that time, and its process inbox could fill up indefinitely.

As an alternative, have you considered the general approach taken by the proposed GenRouter architecture? Essentially, each worker would notify either your queue or perhaps an additional Scheduler process that it is available. Then, when a task comes in, the queue would just grab that a pid from its list of available worker pids and send the task off.

I do think there's value to doing it in another process honestly. In that scenario, your queue would also notify the scheduler when a new item arrives, and the scheduler would maintain a count of those items. Then if the count > 0 and a worker says its free, it can match up the job with the worker immediately. In this way you avoid blocking any genservers for an extended period of time.

Thoughts?

Honeydew mnesia error on app boot

I'm having problems booting the app. I've manually created the mnesia schema, but when I try to start I get this:

eggl_1                    | 16:20:51.643 [info]  Starting with Honeydew nodes: [:"eggl_and_rested@${HOSTNAME}"]
eggl_1                    |
eggl_1                    | =CRASH REPORT==== 8-Jan-2018::16:20:51 ===
eggl_1                    |   crasher:
eggl_1                    |     initial call: Elixir.Honeydew.Queue:init/1
eggl_1                    |     pid: <0.2499.0>
eggl_1                    |     registered_name: []
eggl_1                    |     exception error: no case clause matching
eggl_1                    |                      {aborted,
eggl_1                    |                          {bad_type,'honeydew_:eggl_default_queue',
eggl_1                    |                              disc_copies,'eggl_and_rested@${HOSTNAME}'}}
eggl_1                    |       in function  'Elixir.Honeydew.Queue.Mnesia':init/2 (lib/honeydew/queue/mnesia.ex, line 49)
eggl_1                    |       in call from 'Elixir.Honeydew.Queue':init/1 (lib/honeydew/queue.ex, line 71)
eggl_1                    |       in call from gen_server:init_it/2 (gen_server.erl, line 365)
eggl_1                    |       in call from gen_server:init_it/6 (gen_server.erl, line 333)
eggl_1                    |     ancestors: ['honeydew.queue_supervisor.eggl_default_queue',
eggl_1                    |                   'Elixir.Eggl.Workers.Supervisor','Elixir.Eggl.Supervisor',
eggl_1                    |                   <0.2381.0>]
eggl_1                    |     message_queue_len: 0
eggl_1                    |     messages: []
eggl_1                    |     links: [<0.2498.0>]
eggl_1                    |     dictionary: []
eggl_1                    |     trap_exit: false
eggl_1                    |     status: running
eggl_1                    |     heap_size: 2586
eggl_1                    |     stack_size: 27
eggl_1                    |     reductions: 649
eggl_1                    |   neighbours:

Any ideas?

Final failure callback on worker?

Hi,

I'd like to know if there is a way to be notified and take an action upon a job failing after all retries, for example I'd like to retry the job many times and if it fails every time at the end send an email notifying about the failure, I don't seem to find an example that accomplishes this, I'd have to perhaps write my own FailureMode implementation?

Thanks.

Node LRU Dispatcher stops dispatching tasks to connected nodes

Hi, not sure if this is a bug or I'm doing something wrong. In my case I create a global queue, connect two nodes with 10 workers on each and then see the following picture: node 2 receives only 10 tasks and after that only node 1 is executing tasks. Scheduler who sends task for execution to the queue sits on node 1.

I prepared a test project to demonstrate the behaviour: https://github.com/dkataskin/honeydew_nodelru

How to run it: start two nodes and connect them, node name should follow format "node{num}@127.0.0.1" as only node with name "[email protected]" starts the scheduler. You will see that second node will execute only 10 tasks and then stop processing further jobs. Job id is monotonically increased int.

More instructions in the readme of the repository, here are logs from console, logs were edited for the sake of clarity:

Node 1:

11:52:05.962 [info]  app starting...
11:52:05.971 [info]  test scheduler is starting...
11:52:05.971 [info]  app started...

11:52:05.973 [debug] [email protected]: processed job id=1
11:52:06.972 [debug] [email protected]: processed job id=2
11:52:07.973 [debug] [email protected]: processed job id=3

...

11:52:36.001 [debug] [email protected]: processed job id=31

11:52:36.621 [info]  [Honeydew] Connection to [email protected] established, looking for workers...

11:52:37.002 [debug] [email protected]: processed job id=32
11:52:39.004 [debug] [email protected]: processed job id=34
11:52:41.006 [debug] [email protected]: processed job id=36
11:52:43.008 [debug] [email protected]: processed job id=38
11:52:45.010 [debug] [email protected]: processed job id=40
11:52:47.012 [debug] [email protected]: processed job id=42
11:52:49.014 [debug] [email protected]: processed job id=44
11:52:51.016 [debug] [email protected]: processed job id=46
11:52:53.018 [debug] [email protected]: processed job id=48
11:52:55.020 [debug] [email protected]: processed job id=50
11:52:57.022 [debug] [email protected]: processed job id=52

11:52:58.023 [debug] [email protected]: processed job id=53
11:52:59.024 [debug] [email protected]: processed job id=54
11:53:00.025 [debug] [email protected]: processed job id=55
...

notice that 10 jobs from interval 32-52 have been processed on just joined Node 2 and all further tasks have been processed on Node 1.

Node 2:

11:52:27.193 [info]  app starting...
11:52:27.203 [info]  app started...

iex([email protected])1> Node.connect(:'[email protected]')

11:52:36.621 [info]  [Honeydew] Connection to [email protected] established, looking for workers...

11:52:38.004 [debug] [email protected]: processed job id=33
11:52:40.006 [debug] [email protected]: processed job id=35
11:52:42.008 [debug] [email protected]: processed job id=37
11:52:44.009 [debug] [email protected]: processed job id=39
11:52:46.012 [debug] [email protected]: processed job id=41
11:52:48.013 [debug] [email protected]: processed job id=43
11:52:50.015 [debug] [email protected]: processed job id=45
11:52:52.018 [debug] [email protected]: processed job id=47
11:52:54.019 [debug] [email protected]: processed job id=49
11:52:56.022 [debug] [email protected]: processed job id=51

...

Honeydew.status({:global, :test_queue})
%{queue: %{count: 0, in_progress: 0, suspended: false},
  workers: %{#PID<18569.165.0> => nil, #PID<0.165.0> => nil,
    #PID<18569.166.0> => nil, #PID<0.166.0> => nil, #PID<18569.167.0> => nil,
    #PID<0.167.0> => nil, #PID<18569.168.0> => nil, #PID<0.168.0> => nil,
    #PID<18569.169.0> => nil, #PID<0.169.0> => nil, #PID<18569.170.0> => nil,
    #PID<0.170.0> => nil, #PID<18569.171.0> => nil, #PID<0.171.0> => nil,
    #PID<18569.172.0> => nil, #PID<0.172.0> => nil, #PID<18569.173.0> => nil,
    #PID<0.173.0> => nil, #PID<18569.174.0> => nil, #PID<0.174.0> => nil}}

Issues with mnesia on production

When I run my app on my local computer I don't have any issues, but when I run the release version on the prod server I get the following error message:

{:error,
 {{{:case_clause,
    {:aborted,
     {:bad_type, :"honeydew_\"swiftvox\"", :disc_copies, :smsc@smsc01}}},
   [
     {Honeydew.Queue.Mnesia, :"-init/2-fun-0-", 2,
      [file: 'lib/honeydew/queue/mnesia.ex', line: 79]},
     {Enum, :"-each/2-fun-0-", 3, [file: 'lib/enum.ex', line: 789]},
     {:maps, :fold_1, 3, [file: 'maps.erl', line: 232]},
     {Enum, :each, 2, [file: 'lib/enum.ex', line: 2127]},
     {Honeydew.Queue.Mnesia, :init, 2,
      [file: 'lib/honeydew/queue/mnesia.ex', line: 76]},
     {Honeydew.Queue, :init, 1, [file: 'lib/honeydew/queue.ex', line: 97]},
     {:gen_server, :init_it, 2, [file: 'gen_server.erl', line: 374]},
     {:gen_server, :init_it, 6, [file: 'gen_server.erl', line: 342]}
   ]},
  {:child, :undefined, "swiftvox",
   {Honeydew.Queue, :start_link,
    [
      [
        "swiftvox",
        Honeydew.Queue.Mnesia,
        [disc_copies: [:smsc@smsc01]],
        {Honeydew.Dispatcher.LRU, []},
        {Honeydew.FailureMode.Abandon, []},
        nil,
        false
      ]
    ]}, :transient, 5000, :worker, [Honeydew.Queue]}}}

I am wondering what could be the issue.

Here is the sample code (based on the mnesia example):

defmodule Worker do
  @behaviour Honeydew.Worker

  def hello(thing) do
    IO.puts "Hello #{thing}!"
    :timer.sleep(5000)
  end
end

defmodule App do 
  def start do
    nodes = [node()]
    :ok = Honeydew.start_queue(:swiftvox, queue: {Honeydew.Queue.Mnesia, [disc_copies: nodes]})
    :ok = Honeydew.start_workers(:swiftvox, Worker, num: 1)
  end
end

App.start

Honeydew.filter returning an error

Hey,
I have a problem where I cant filter a job. I tried the example of queue + cancel and tried passing an fn that returns true to get all jobs from .filter but I always get an ArgumentError.

    #enqueue a job
    {:receive_work, [stages]} |> Honeydew.async({:global, :my_queue})

    # Status
    Honeydew.status({:global, :my_queue})
    |> Map.get(:queue)
    |> IO.inspect

      # find the job and cancel it
      # :ok =
      Honeydew.filter({:global, :my_queue}, %{task: {:receive_work, [stages]}})
      |> List.first
      |> Honeydew.cancel

The error:

 ** (MatchError) no match of right hand side value: {:error, %ArgumentError{message: "argument error"}}
        (honeydew) lib/honeydew.ex:191: Honeydew.filter/2
        (queue) lib/server/router.ex:133: anonymous fn/2 in App.Router.do_match/4
        (queue) lib/server/router.ex:2: App.Router.plug_builder_call/2
        (queue) lib/plug/error_handler.ex:64: App.Router."call (overridable 3)"/2
        (queue) lib/plug/debugger.ex:122: App.Router.call/2
        (plug_cowboy) lib/plug/cowboy/handler.ex:18: Plug.Adapters.Cowboy.Handler.upgrade/4
        (cowboy) src/cowboy_protocol.erl:442: :cowboy_protocol.execute/4

Thanks in advance!

Jobs are not restarted on node "crash"

Not sure if this is something the library doesn't support, or if I have missed something.

I am trying to have the node automatically retry jobs that were running on a node that died. I'm using mnesia storage.

I boot a node and start a bunch of long running jobs, then ctrl-c the iex session.

I boot a new node with the same name. I can see all the jobs I just started using Honeydew.status(:my_queue) but they just show as in_progress forever.

Is there a way to have those automatically retried by the new node?

Mention need for :mnesia to be added to :extra_applications

First of all - many thanks for a great library.

I am pretty new to Elixir, so this took me quite some to me to debug. But application would not boot when released with mix release with

11:55:15.110 [info] Application workplace2slack exited: exited in: Workplace2slack.Application.start(:normal, [])
    ** (EXIT) an exception was raised:
        ** (MatchError) no match of right hand side value: {:error, {{:undef, [{:mnesia, :create_table, [:"honeydew_:slack", [ram_copies: [:workplace2slack@bobek], attributes: [:key, :job], record_name: :wrapped_job, type: :ordered_set]], []}, {Honeydew.Queue.Mnesia, :"-init/2-fun-0-", 2, [file: 'lib/honeydew/queue/mnesia.ex', line: 79]}, {Enum, :"-each/2-fun-0-", 3, [file: 'lib/enum.ex', line: 789]}, {:maps, :fold_1, 3, [file: 'maps.erl', line: 232]}, {Enum, :each, 2, [file: 'lib/enum.ex', line: 1964]}, {Honeydew.Queue.Mnesia, :init, 2, [file: 'lib/honeydew/queue/mnesia.ex', line: 76]}, {Honeydew.Queue, :init, 1, [file: 'lib/honeydew/queue.ex', line: 97]}, {:gen_server, :init_it, 2, [file: 'gen_server.erl', line: 374]}]}, {:child, :undefined, :slack, {Honeydew.Queue, :start_link, [[:slack, Honeydew.Queue.Mnesia, [ram_copies: [:workplace2slack@bobek]], {Honeydew.Dispatcher.LRU, []}, {Honeydew.FailureMode.Abandon, []}, nil, false]]}, :transient, 5000, :worker, [Honeydew.Queue]}}}
            (workplace2slack) lib/workplace2slack/application.ex:22: Workplace2slack.Application.start/2
            (kernel) application_master.erl:277: :application_master.start_it_old/4
{"Kernel pid terminated",application_controller,"{application_start_failure,workplace2slack,{bad_return,{{'Elixir.Workplace2slack.Application',start,[normal,[]]},{'EXIT',{{badmatch,{error,{{undef,[{mnesia,create_table,['honeydew_:slack',[{ram_copies,[workplace2slack@bobek]},{attributes,[key,job]},{record_name,wrapped_job},{type,ordered_set}]],[]},{'Elixir.Honeydew.Queue.Mnesia','-init/2-fun-0-',2,[{file,\"lib/honeydew/queue/mnesia.ex\"},{line,79}]},{'Elixir.Enum','-each/2-fun-0-',3,[{file,\"lib/enum.ex\"},{line,789}]},{maps,fold_1,3,[{file,\"maps.erl\"},{line,232}]},{'Elixir.Enum',each,2,[{file,\"lib/enum.ex\"},{line,1964}]},{'Elixir.Honeydew.Queue.Mnesia',init,2,[{file,\"lib/honeydew/queue/mnesia.ex\"},{line,76}]},{'Elixir.Honeydew.Queue',init,1,[{file,\"lib/honeydew/queue.ex\"},{line,97}]},{gen_server,init_it,2,[{file,\"gen_server.erl\"},{line,374}]}]},{child,undefined,slack,{'Elixir.Honeydew.Queue',start_link,[[slack,'Elixir.Honeydew.Queue.Mnesia',[{ram_copies,[workplace2slack@bobek]}],{'Elixir.Honeydew.Dispatcher.LRU',[]},{'Elixir.Honeydew.FailureMode.Abandon',[]},nil,false]]},transient,5000,worker,['Elixir.Honeydew.Queue']}}}},[{'Elixir.Workplace2slack.Application',start,2,[{file,\"lib/workplace2slack/application.ex\"},{line,22}]},{application_master,start_it_old,4,[{file,\"application_master.erl\"},{line,277}]}]}}}}}"}
Kernel pid terminated (application_controller) ({application_start_failure,workplace2slack,{bad_return,{{'Elixir.Workplace2slack.Application',start,[normal,[]]},{'EXIT',{{badmatch,{error,{{undef,[{mne

Crash dump is being written to: erl_crash.dump...done

I have finally figured out, that I need to add mnesia to :extra_applications in mix.exs:

  def application do
    [
      extra_applications: [:logger, :mnesia],
      mod: {MyApp.Application, []}
    ]
  end

Support Ecto.Adapters.MyXQL for MySQL

Hello.
I was playing with great feature Ecto Poll Queue. At some point, I notice that you are not supporting the MySQL database.
Are there any plans for that?
Thanks

warning: function equal?/2 required by behaviour Ecto.Type

We're on Ecto 3.4.5, which demands more of you than the 3.0.7 in your mix.lock does:

warning: function embed_as/1 required by behaviour Ecto.Type is not implemented (in module Honeydew.EctoSource.ErlangTerm)
  lib/honeydew/sources/ecto/erlang_term.ex:2: Honeydew.EctoSource.ErlangTerm (module)

If I'm reading c:Ecto.Type.embed_as/1 right, the fix might be as easy as:

@impl true
def embed_as(_), do: :self

Document queue callbacks

Currently Honeydew.Queue is private and undocumented which makes it pretty hard to implement custom queues.

Priorities?

I'm surveying projects in this space and am curious about how honeydew handles or plans to handle prioritization. What happens when the workers are saturated and an important job comes in that should be run before others in the queue?

thank you for your time/thoughts.

[Feature] Ecto Queue - custom "delayed" job

Hey,

I'm just starting my journey in elixir and started my first toy project last weekend.

In this project, I got a typical situation of an asynchronous task. There is just one tiny caveat...
The database object gets created and a rate limited worker supplements the object with additional data.
Now, with this new data I can finally start my new gloriously expensive task which takes all of 100ms to finish.

any chance i could convince you to add custom constraints to the ecto queue like saying ? some_field IS NOT NULL when i'm spawning the workers?

I think the only change necessary would be in the reserve function, but I might be mistaken - I'm a total beginner after all ;)

I love the API btw 👍

Persistence

Have you looked into extracting the queue into a behaviour that could possibly support multiple backends? With the scheduler / queue distinction mentioned in #1 it would be a lot easier to shrink the size and scope of the queue genserver, and define a behaviour that even a user supplied module could adhere to.

The advantage is that if someone wanted to write a GenServer that acted as a queue but also persisted the job states to redis, ets, etc they could do so without any extra effort on your part.

Thanks!

Want to add new persistence adaptor, where to start?

Hi,

I want to persist data in a database not supported by Ecto3. I suppose it will use polling too. So what behaviour/interface I should implement? Source or Queue?

Also, how cold starts happen (say after disaster recover - fresh nodes, database is restored from backup)? Will honeydew attempt to load all the jobs at once on each node or partition somehow, like wait cluster up (whatever it means) and a selected node will dispatch persistent jobs?

Thanks!

Is there a way to have a batch of jobs?

I'd like to do something like this:

  • Start a job and include some kind of batch id
  • At the end of its work, the job does a check: based on its results, do any other jobs need to be added to the batch?
    • If so, add them
    • If not, see if there are any jobs still queued for this batch
      • If there are still jobs queued for the batch, do nothing
      • If there are no more jobs queued for the batch, fire off a message saying "the batch is done"

Is there any straightforward way to do this? (FWIW I'm using an Mnesia queue.)

Gracefull shutdown documentation

I would like to know if there's a way to do a graceful shutdown of the Honeydew queues. For the moment I am calling the Honeydew.suspend() function on all queues and wait for a certain period of time. But I am wondering if there's a way to wait until all queues are suspended.

In my case I don't want jobs to be killed in the middle of their processing.

Won't build a release

Using the mnesia queue seems too break it when trying to build a release with distillery

=CRASH REPORT==== 29-Jan-2017::17:29:46 ===
  crasher:
    initial call: Elixir.Honeydew.Queue.Mnesia:init/1
    pid: <0.580.0>
    registered_name: []
    exception exit: {undef,
                        [{mnesia,create_schema,[['[email protected]']],[]},
                         {'Elixir.Honeydew.Queue.Mnesia',init,2,
                             [{file,"lib/honeydew/queue/mnesia.ex"},
                              {line,19}]},
                         {'Elixir.Honeydew.Queue.Mnesia',init,1,
                             [{file,"lib/honeydew/queue/mnesia.ex"},{line,2}]},
                         {gen_server,init_it,6,
                             [{file,"gen_server.erl"},{line,328}]},
                         {proc_lib,init_p_do_apply,3,
                             [{file,"proc_lib.erl"},{line,247}]}]}
      in function  gen_server:init_it/6 (gen_server.erl, line 352)
    ancestors: ['honeydew.queue_supervisor.global.my_queue',<0.578.0>,
                  <0.574.0>,<0.535.0>]
    messages: []
    links: [<0.579.0>]
    dictionary: []
    trap_exit: false
    status: running
    heap_size: 987
    stack_size: 27
    reductions: 301
  neighbours:
** (EXIT from #PID<0.574.0>) shutdown: failed to start child: Honeydew.QueueSupervisor
    ** (EXIT) an exception was raised:
        ** (MatchError) no match of right hand side value: {:error, {:undef, [{:mnesia, :create_schema, [[:"[email protected]"]], []}, {Honeydew.Queue.Mnesia, :init, 2, [file: 'lib/honeydew/queue/mnesia.ex', line: 19]}, {Honeydew.Queue.Mnesia, :init, 1, [file: 'lib/honeydew/queue/mnesia.ex', line: 2]}, {:gen_server, :init_it, 6, [file: 'gen_server.erl', line: 328]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 247]}]}}
            (honeydew) lib/honeydew/queue_supervisor.ex:22: anonymous fn/2 in Honeydew.QueueSupervisor.start_link/6
            (elixir) lib/enum.ex:651: anonymous fn/3 in Enum.each/2
            (elixir) lib/enum.ex:1760: Enum.each/2
            (honeydew) lib/honeydew/queue_supervisor.ex:21: Honeydew.QueueSupervisor.start_link/6
            (stdlib) supervisor.erl:365: :supervisor.do_start_child/2
            (stdlib) supervisor.erl:348: :supervisor.start_children/3
            (stdlib) supervisor.erl:314: :supervisor.init_children/2
            (stdlib) gen_server.erl:328: :gen_server.init_it/6

Cancel all workers from a queue

Is there a way to cancel all workers from a queue? When writing tests I'd like to be able to clear all the workers from a queue in an on_exit/1 callback.

Honeydew hangs app startup 100ms per worker

While diagnosing a slow mix test run, I came across the fact that our app startup was slow because we have two honeydew supervisors with a total of 15 or so workers.

It seems that honeydew waits for 100ms for each worker during init which, in our case, slows down app startup time (and therefore every test run) by at least 1.5 seconds.

The offending line is here:

100 -> init_result

Relevant log from #elixir-lang:

3:04 PM A<aaronjensen> Aaron Jensen 
adding a 100ms sleep * # of workers during init seems like a bad idea, yea josevalim? :/
3:11 PM A<aaronjensen> Aaron Jensen 
is there a good way to defer starting of those before we are able to help fix honeydew?
3:16 PM J<josevalim> José Valim 
aaronjensen: yes, that sounds like a bad idea
3:17 PM J<josevalim> José Valim 
that's going to block the supervisor also starting it for 100ms
3:17 PM it is really really bad :(
3:17 PM benwilson512: are you using the lib above?
3:18 PM J<josevalim> José Valim 
aaronjensen:  ping the author, they should totally not do that

Include a worker's module and a queue's name in log message

Given the current log messages I can tell that some queues start and that there are workers that are ready for those queues by matching the pids, but I cannot tell which queues are started. I would really like for the log messages to give more information about where they are coming from.

Current Behavior:

19:21:42.576 [debug] [Honeydew] Worker #PID<0.361.0> sending ready

19:21:42.576 [debug] [Honeydew] Queue #PID<0.305.0> ready for worker #PID<0.361.0>

Desired Behavior:

19:21:42.576 [debug] [Honeydew] DoStuffWorkerModule Worker #PID<0.361.0>  sending ready

19:21:42.576 [debug] [Honeydew] {:global, :do_stuff} Queue #PID<0.305.0> ready for worker #PID<0.361.0>

I think this change will improve introspection into what is happening inside the queue and worker pipeline. Thanks!

Unexpected behavior when forgetting to define init/1

I just had a difficult to troubleshoot issue when I accidentally defined init/0 instead of init/1. My worker module looked like this.

defmodule MyWorker do
  def init do: {:ok, nil}

  def run(arg, _state) do
   # do work
  end
end

I had my queues set up properly and the following log messages were printed when my app started up so I assumed everything started correctly.

10:32:36.283 [debug] [Honeydew] Worker #PID<0.268.0> sending ready

10:32:36.283 [debug] [Honeydew] Worker #PID<0.267.0> sending ready

10:32:36.283 [debug] [Honeydew] Queue #PID<0.264.0> ready for worker #PID<0.267.0>

10:32:36.283 [debug] [Honeydew] Queue #PID<0.264.0> ready for worker #PID<0.268.0>

However when I tried to call the function using honeydew

{:run, ["hello"]} |> Honeydew.async(:my_queue)

I would get the following error:

10:31:28.429 [error] GenServer #PID<0.745.0> terminating
** (UndefinedFunctionError) function MyWorker.run/1 is undefined or private. Did you mean one of:

      * run/2

It wasn't until I put an IO.inspect in my init/0 function that I found it wasn't being called because it should have been init/1.

Ideally, it would be nice to either raise an exception if an init/1 function wasn't defined on a worker, or document that you could skip defining an init/1 function but then it won't pass the last state argument into the functions you call on the worker.

Examples not working

I tried this lib and I can't get examples working.
I cloned the repository and then did a mix deps.get, elixir -S mix run examples/progress_and_queue_status.exs and I got no output.

I changed the start function to display the status of queues and workers to :

  def start do
    :ok = Honeydew.start_queue(:my_queue)
    :ok = Honeydew.start_workers(:my_queue, HeavyTask, num: 10)
    IO.puts "App started"
    IO.inspect Honeydew.queues
    IO.inspect Honeydew.workers
    IO.inspect Honeydew.status(:my_queue)
  end

I have this output:

App started
[:my_queue]
[:my_queue]
%{queue: %{count: 0, in_progress: 0, suspended: false}, workers: %{}}

what am I doing wrong? I run elixir 1.7.4

Postgres Notify for Ecto-backed Queue?

Hi this looks like a good fit for a project we are working on presently, but we currently have triggers send NOTIFY that a GenServer listens to rather than polling the database. This is because the updates come from outside our application, in a middleware integration service. Would you consider a PR adding support for this? I could almost do it outside of honeydew except that I can't enqueue manually for Ecto-backed jobs.

Normalize errors before returning results

If a worker crashes due to an erlang error (like :badarg), it outputs the full erlang error tuple and stack trace in a very unfriendly fashion. If the user is waiting for a response usingHoneydew.yield, they also receive a {:error, <unfriendly erlang error>} response. It would be awesome if Honeydew normalized these errors using Exception.normalize/3 before outputting them and returning them via Honeydew.yield.

I'd be happy to make a failing test case, if you can point me where it needs to go.

Control retry time within job

Hello, I'm using honeydew to send HTTP requests to a third-party endpoint. This endpoint request can fail for some reasons and their recommendation is to do an Exponential Backoff, which I get out-of-box with your great library.

Another possibility is that they can send me a Retry-After header with the time I need to wait before retrying.

Since I will only know that value inside the job itself, is there any wait to, if I know the time I need to wait, use that time as the retry delay instead of the Exponential Backoff?

Basically this means that I would do the request, if fail, I would parse it anyway and get the Retry-After header, if it is present, I want to delay the next retry with the time they sent me, if it is not present or the job crashed for some other reason, I would fallback to Exponential Backoff

Testing strategies

Hi there, thank you so much for the great library! Having a great time so far!

I am trying to write a unit test for a module which inserts jobs in a queue with Honeydew.asyc but my test case is asserting synchronously before the jobs are inserted.

Do you have any pointers on how one can assert jobs have been inserted?

Mnesia as default queue not mentioned in start_queue/2 documentation

First of all, thank you for this amazing library. It's really useful to me 💯

While reading the documentation i've noticed a inconsistency. In the README.md and in the code the Mnesia queue is the default.
But ErlangQueue is still indicated as the default in the start_queue/2 function documentation :

honeydew/lib/honeydew.ex

Lines 346 to 347 in 3a35093

- `queue`: is the module that queue will use. Defaults to
`Honeydew.Queue.ErlangQueue`. You may also provide args to the queue's

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.