Comments (4)
Hey @sezaru,
Can you give me a bit more information about what your use case is? I just want to make sure this isn't an XY problem. :)
I try to steer clear of :infinity
waits in blocking calls, since that means that the caller isn't able to respond to any system/debug messages, and its mailbox can fill up.
from honeydew.
Sure.
So, I created a fork of the built-in mnesia queue to do 2 things, one is to add a priority parameter to the key so the queue would handle some prioritized jobs before others. And I also changed the mnesia calls to be sync_transaction
since I need to guarantee that they are indeed persisted to the disk and also to avoid mnesia overload
messages.
The main issue here is probably the sync_transaction
, since every write to the disk will follow a sync call that can take some time if the harddrive is overloaded (which it is in my case), That means that, for example, the enqueue call will timeout sometimes.
If :infinity
is not ok, can we bump the timeout time then?
Or maybe some way to allow the user to specify the timeout he desires, this is probably the best solution IMO, but I didn't see an easy way to do that in every part of the code that contains a GenServer.call
unless using a config, something like:
config :honeydew, timeout: 10_000
For completeness, I'm pasting my custom mnesia queue bellow (note that I didn't add the Priorities
module since I can't disclosure it, but the Priorities.priority!
function just returns an integer with the priority of the job I want.)
Click to expand!
defmodule Notification.Queues.PriorityMnesia do
alias Notification.Queues.PriorityMnesia.{Tables, State, WrappedJob}
alias Honeydew.Queue
require Logger
@behaviour Queue
@poll_interval 1_000
@impl Queue
defdelegate validate_args!(opts), to: Queue.Mnesia
@impl Queue
def init(queue_name, table_opts) do
Tables.all(queue_name)
|> Enum.each(fn {name, opts} ->
opts = Keyword.merge(table_opts, opts)
create_table!(name, opts)
end)
state = State.new!(queue_name)
reset_after_crash(state)
time_warp_mode_warning()
poll()
{:ok, state}
end
@impl Queue
def enqueue(job, %{table: table, counter_table: counter_table} = state) do
id = :mnesia.dirty_update_counter(counter_table, :counter, 1)
record = WrappedJob.new_record(job, id)
:mnesia.activity(:sync_transaction, fn -> :mnesia.write(table, record, :write) end)
{state, WrappedJob.job(record)}
end
@impl Queue
def reserve(%{table: table} = state) do
match_spec = WrappedJob.Helper.reserve_match_spec()
:mnesia.activity(:sync_transaction, fn -> :mnesia.select(table, match_spec, 1, :read) end)
|> case do
:"$end_of_table" ->
{:empty, state}
{[record], _} ->
move_to_in_progress_table(record, state)
{WrappedJob.job(record), state}
end
end
@impl Queue
def ack(%{private: id}, state) do
%{in_progress_table: in_progress_table} = state
pattern = WrappedJob.Helper.id_pattern(id)
:mnesia.activity(:sync_transaction, fn ->
[wrapped_job] = :mnesia.match_object(in_progress_table, pattern, :read)
:mnesia.delete_object(in_progress_table, wrapped_job, :write)
end)
state
end
@impl Queue
def nack(job, state) do
%{private: id, failure_private: failure_private, delay_secs: delay_secs} = job
move_to_pending_table(id, %{failure_private: failure_private, delay_secs: delay_secs}, state)
state
end
@impl Queue
def status(%{table: table, in_progress_table: in_progress_table}) do
info = %{
table => :mnesia.table_info(table, :all),
in_progress_table => :mnesia.table_info(in_progress_table, :all)
}
%{mnesia: info, count: info[table][:size], in_progress: info[in_progress_table][:size]}
end
@impl Queue
def filter(%{table: table}, map) when is_map(map) do
:mnesia.activity(:sync_transaction, fn ->
pattern = WrappedJob.Helper.filter_pattern(map)
table |> :mnesia.match_object(pattern, :read) |> Enum.map(&WrappedJob.job/1)
end)
end
def filter(%{table: table}, fun) when is_function(fun) do
:mnesia.activity(:sync_transaction, fn ->
:mnesia.foldl(
fn record, list ->
job = WrappedJob.job(record)
case fun.(job) do
true -> [job | list]
false -> list
end
end,
[],
table
)
end)
end
@impl Queue
def cancel(%{private: id}, state) do
%{table: table, in_progress_table: in_progress_table} = state
pattern = WrappedJob.Helper.id_pattern(id)
reply =
:mnesia.activity(:sync_transaction, fn ->
table
|> :mnesia.match_object(pattern, :read)
|> case do
[wrapped_job] ->
:mnesia.delete_object(table, wrapped_job, :write)
[] ->
in_progress_table
|> :mnesia.match_object(pattern, :read)
|> case do
[_] -> {:error, :in_progress}
[] -> {:error, :not_found}
end
end
end)
{reply, state}
end
@impl Queue
def handle_info(:__poll__, queue_state) do
poll()
{:noreply, Queue.dispatch(queue_state)}
end
defp create_table!(name, opts) do
{:atomic, :ok} =
with {:aborted, {:already_exists, ^name}} <- :mnesia.create_table(name, opts) do
{:atomic, :ok}
end
:ok = :mnesia.wait_for_tables([name], 15_000)
end
defp poll, do: {:ok, _} = :timer.send_after(@poll_interval, :__poll__)
defp reset_after_crash(%{in_progress_table: in_progress_table} = state) do
:mnesia.activity(:sync_transaction, fn -> :mnesia.first(in_progress_table) end)
|> case do
:"$end_of_table" ->
:ok
key ->
key |> WrappedJob.id_from_key() |> move_to_pending_table(%{}, state)
reset_after_crash(state)
end
end
defp move_to_in_progress_table(record, state) do
%{table: table, in_progress_table: in_progress_table} = state
:mnesia.activity(:sync_transaction, fn ->
:mnesia.delete(table, WrappedJob.key(record), :write)
:mnesia.write(in_progress_table, record, :write)
end)
end
defp move_to_pending_table(id, updates, state) do
%{table: table, in_progress_table: in_progress_table} = state
pattern = WrappedJob.Helper.id_pattern(id)
:mnesia.activity(:sync_transaction, fn ->
record = in_progress_table |> :mnesia.match_object(pattern, :read) |> List.first()
:mnesia.delete_object(in_progress_table, record, :write)
record = record |> WrappedJob.job() |> struct(updates) |> WrappedJob.new_record(id)
:mnesia.write(table, record, :write)
end)
end
defp time_warp_mode_warning do
if :erlang.system_info(:time_warp_mode) != :multi_time_warp do
Logger.warn(
"[Honeydew] It's recommended to use the Mnesia queue with the 'multi_time_warp' time correction mode to minimize montonic clock freezes, see http://erlang.org/doc/apps/erts/time_correction.html#multi-time-warp-mode."
)
end
end
end
defmodule Notification.Queues.PriorityMnesia.State do
alias Notification.Queues.PriorityMnesia.Tables
use TypedStruct
typedstruct enforce: true do
field :table, reference
field :counter_table, reference
field :in_progress_table, reference
end
@spec new!(atom) :: t
def new!(queue_name) do
struct!(__MODULE__,
table: Tables.table_name(queue_name),
counter_table: Tables.counter_table_name(queue_name),
in_progress_table: Tables.in_progress_table_name(queue_name)
)
end
end
defmodule Notification.Queues.PriorityMnesia.Tables do
@default attributes: [:key, :job], record_name: :wrapped_job
def all(queue_name) do
%{
table_name(queue_name) => Keyword.merge(@default, type: :ordered_set),
in_progress_table_name(queue_name) => Keyword.merge(@default, type: :set),
counter_table_name(queue_name) => [attributes: [:type, :id], type: :set]
}
end
def table_name(queue_name),
do: ["honeydew", inspect(queue_name)] |> Enum.join("_") |> String.to_atom()
def in_progress_table_name(queue_name),
do: ["honeydew", inspect(queue_name), "in_progress"] |> Enum.join("_") |> String.to_atom()
def counter_table_name(queue_name),
do: ["honeydew", inspect(queue_name), "counter"] |> Enum.join("_") |> String.to_atom()
end
defmodule Notification.Queues.PriorityMnesia.WrappedJob do
alias Notification.Queues.PriorityMnesia.WrappedJob.{Priorities, Helper}
alias Honeydew.Job
use TypedStruct
typedstruct enforce: true do
field :priority, integer | :_
field :run_at, integer | :_
field :id, integer | :_
field :job, Job.t() | :_
end
def new(job, id) do
%{delay_secs: delay_secs} = job
run_at = Helper.now() + delay_secs
priority = Priorities.priority!(job)
job = %{job | private: id}
new(priority, run_at, id, job)
end
def new(priority, run_at, id, job),
do: struct!(__MODULE__, priority: priority, run_at: run_at, id: id, job: job)
def new_record(job, id), do: new(job, id) |> to_record()
def new_record(priority, run_at, id, job), do: new(priority, run_at, id, job) |> to_record()
def from_record({:wrapped_job, {priority, run_at, id}, job}),
do: new(priority, run_at, id, job)
def to_record(%{priority: priority, run_at: run_at, id: id, job: job}),
do: {:wrapped_job, key(priority, run_at, id), job}
def key(priority, run_at, id), do: {priority, run_at, id}
def key({:wrapped_job, key, _}), do: key
def job({:wrapped_job, _, job}), do: job
def id_from_key({_, _, id}), do: id
end
defmodule Notification.Queues.PriorityMnesia.WrappedJob.Helper do
alias Notification.Queues.PriorityMnesia.WrappedJob
alias Honeydew.Job
@job_filter %Job{}
|> Map.from_struct()
|> Enum.map(fn {k, _} -> {k, :_} end)
|> (&struct(Job, &1)).()
def id_pattern(id), do: WrappedJob.new_record(:_, :_, id, :_)
def filter_pattern(map) do
job = struct(@job_filter, map)
WrappedJob.new_record(:_, :_, :_, job)
end
def reserve_match_spec do
pattern = WrappedJob.new_record(:_, :"$1", :_, :_)
[{pattern, [{:"=<", :"$1", now()}], [:"$_"]}]
end
def now, do: :erlang.monotonic_time(:second)
end
PS. Totally off-topic, but I noticed that in the built-in mnesia queue you use :erlang.unique_integer
to create a unique id for the job. Note that the uniqueness is only guaranteed during the VM run, if you close the VM and reopens, it is possible that :erlang.unique_integer
will return an id number that is already used by some job inside the queue, this will break some logic inside the queue when searching for the job via id since there will be more than one job with the same id. To fix that, I used instead a mnesia table and the function dirty_update_counter
which will persist the id counter and guarantee it is unique even if I restart the VM. Maybe this is something that you are interested to add to the built-in queue.
from honeydew.
Are you running a multi-node mnesia cluster? I ask because ensuring that a job is written to disk on a single-node cluster isn't a lot of assurance that you'll have continuity of service if that disk dies.
I'm guessing that you're using a single node, since honeydew will automatically use the :sync_transaction
access context when the table is replicated to multiple nodes.
If that's the case, I believe that :async_dirty
is the right choice. If I've properly understood mnesia's access contexts, the "dirty" part is fine, because there's only one queue process accessing the table, hence no need for transaction isolation overhead. The "async" part is fine, because the synchronicity that it's referring to is that it'll block until other nodes in the cluster have executed the operation, "async" in a single node context still waits for the operation to be completed locally. So I :async_dirty
will still block while it write to a single-node's disk.
I'd really prefer not to mess with the messaging timeouts, five seconds is a really long time as it is. If you're waiting five seconds on a single job write, then you're probably doing so much traffic that you're going to be backing up the queue's mailbox into memory. Having that backpressure is really important, it prevents overloads from becoming systemic catastrophes. How big are your job parameters? Are you enqueuing very large binaries?
At the moment, the mnesia queue is only intended to be run in a single process throughout the entire cluster, so that usage of :unique_integer
is safe (unless I'm missing something). That being said, I'm honestly not pleased with the mnesia queue's distribution story, I'm in the middle of the slow and grueling process of replacing it with a better implementation.
I quite like your priority implementation, I'll have to steal that at some point. :)
from honeydew.
Sorry for taking a while to reply.
Are you running a multi-node mnesia cluster?
No, I'm running just a single node for mnesia.
So I :async_dirty will still block while it write to a single-node's disk.
Oh! I didn't know that! I guessed that the dirty functions didn't guarantee disk write. Nice!
How big are your job parameters? Are you enqueuing very large binaries?
Nah, the jobs are very small, it is a job to send notifications to my app via FCM, so their payload needs to be always less than 4kb anyway.
I think the issue is that other parts of the system are loading and thrashing the disk so much that sometimes everything hangs for some time (my guess is that I fill my SSD cache, so it needs to dump it to disk and this gives very bad disk performance for some time).
But the major issue I see is that I will receive the timeout error, but the job call will still finish correctly, for my case, this means that the job will be added to the queue, I will receive a timeout, meaning that the system will retry to add the job, adding a duplicated one.
Since my jobs are notifications to users of my app, this means that they would receive duplicated notifications, which is bad, that's why for my case I prefer to hang there and wait then simply timeout and not be sure if the message went through or not.
I quite like your priority implementation, I'll have to steal that at some point. :)
haha thanks.
One issue with it is that I cannot send a custom argument to the job queue when adding a new job, so I did a workaround by adding a new, unused argument to the function I want to run with the priority of that job and I retrieve it from the job struct during job queue. If priority queue is something in the roadmap, maybe a way to add a custom value to the job async function would be great (If it is not clear what I meant, I can open a new issue to explain that better so it doesn't bloat this issue with too much off-topic).
from honeydew.
Related Issues (20)
- Clear an entire queue HOT 1
- Disabling clock freeze warning in test environment HOT 1
- Control retry time within job HOT 2
- Support Ecto.Adapters.MyXQL for MySQL HOT 6
- Mnesia as default queue not mentioned in start_queue/2 documentation HOT 1
- warning: function equal?/2 required by behaviour Ecto.Type
- How to rerun jobs if the system crashes? HOT 1
- pg2 scheduled for removal HOT 10
- How to cancel Jobs in Ecto Queue? HOT 1
- Doc links to code broken HOT 2
- Creates a behaviour for `Honeydew.Logger` module
- Unexpected behavior: bad_return application start failure using Mnesia outside of Honeydew HOT 5
- no match of right hand side value: [] HOT 2
- API for accessing in_progress jobs
- Redesigning API for concurrency
- Timeout error on application start HOT 1
- ErlangQueue doesn't respect delay_secs option HOT 1
- Shutdown hangs HOT 11
- Error when listing mnesia in mix extra_applications.
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from honeydew.