Giter Site home page Giter Site logo

broadway_rabbitmq's People

Contributors

adrianomitre avatar alexcastano avatar andrewhr avatar basilenouvellet avatar bdusauso avatar britto avatar ceolinrenato avatar cliftonmcintosh avatar dmitrykk avatar dmorneau avatar fcevado avatar harrisonl avatar isaacsanders avatar joelepper avatar josevalim avatar kianmeng avatar lackac avatar lucacorti avatar mike-bhs avatar msaraiva avatar nathanl avatar nbw avatar ono avatar philss avatar rbino avatar vitorleal avatar whatyouhide avatar wkirschbaum avatar wojtekmach avatar xandervr avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

broadway_rabbitmq's Issues

Providing multiple URLs for connections

Right now, a BroadwayRabbitMQ producer can only connect to a single RabbitMQ node. Some SaaS platforms provide multiple nodes and only guarantee high uptime and a subset of the nodes, not all of them. How can we solve this problem in BroadwayRabbitMQ? For example, a solution could be allowing to pass a function to :connection that returns a URL (and could just choose at random in a list). This is a simple solution but not very flexible, but just to show an example of what I mean.

Ultimately, this could be an additional use case for having multiple producers, where each producer can connect to a different RabbitMQ node. Thoughts?

Any plan for new version release?

The new ampq 3.0.0 change is a very nice one that helps with dropping lager and OTP24 compatibility. Is there any plan to make a release soon with that change. It seems life changing enough to have a release :)

Proposal: require users to specify `:on_failure`

As I updated the docs to show in #73, :reject_and_requeue can cause an infinite retry loop if the message is unprocessable. This happened to us in production, as my colleague @shamil614 figured out.

I would like to suggest a (breaking) change which could prevent this problem.

Rather than defaulting to :reject_and_requeue, the user could be required to specify one of these options. This would prompt them to consider the trade offs and whether they want to set up a dead letter exchange and potentially re-publish those messages. (@shamil614 came up with a way to do that, republishing a fixed number of times with an increasing delay using the RabbitMQ delayed message plugin to achieve a "back off" effect.)

:message_count is not honoured as metadata option.

Current documentation shows :message_count as an available option for metadata of a message. I belive this is not possible to achieve with current implementation. message_count is only available as a metadata when doing basic get(not recommended by the way), current implementation uses basic consume that doesn't expose message_count as a metadata.
Possible see solutions:

  • just remove it from the docs.
  • use AMQP.Queue.message_count/2 to populate the message_count metadata.

I can do a fast PR for fixing it. just need to know what is the preferred approach.

Passing a :client option to producers

Hello guys,

I would like to use broadway_rabbitmq, because I think that it's a great package but I would also be able to create my queue directly from my application and also being able to setup few things before consuming it.

Why is not possible to implement a client by using BroadwayRabbitMQ.RabbitmqClient behaviour and call it in the broadway.start_link/2

def start_link(_opts) do
      Broadway.start_link(__MODULE__,
        name: __MODULE__,
        producers: [
          default: [
            module: {BroadwayRabbitMQ.Producer,
              client: MyModule.Client
              queue: "my_queue",
            },
            stages: 2
          ]
        ],
        processors: [
          default: [
            stages: 50
          ]
        ]
      )
    end

By reading the code, I had the impression that it was possible because of this line: https://github.com/plataformatec/broadway_rabbitmq/blob/1ba40e62f079ad2277d7ef6b253872f31065fb78/lib/broadway_rabbitmq/producer.ex#L118

Can you tell me if it is something you will give the ability to the users to do?

Multiple producers is an anti-pattern, right?

In the documentation example, multiple producers are used.

Isn't that always a bad idea for RabbitMQ (and any other queue-like situation where a Broadway producer does nothing but pull from a finite set of items)? It seems to me that you'd always want a producer concurrency: 1 in such cases.

I created this demo which runs smoothly unless the producer concurrency is increased beyond 1, in which case the performance suffers dramatically. The basic issue seems to be that every producer tries to satisfy the demand of every processor, such that a few processors get multiple messages (one from each producer), which just pile up in their mailboxes. The other processors get no messages and are idle.

Should producers automatically create exchanges?

Thanks for the great work on this producer. Quick question:

The AMQP library on which this producer is based provides the AMQP.Exchange.declare/4 function to declare exchanges. Calling this function creates an exchange if it doesn't exist or does nothing if the exchange exists.

I am trying to follow the docs to create a producer that binds an auto-generated queue to an exchange, and I want the exchange to be auto created when the producer starts if the exchange doesn't exist. I also want to create the exchange as type :topic.

Is there a way to do this? If not, is there a reason why the broadway_rabbitmq producer shouldn't declare exchanges?

Thanks.

Crash supervisor after killing Broadway process

Hi, I have application supervisor which starts broadway process:

  opts = [strategy: :one_for_one, name: __MODULE__]
  Supervisor.start_link([MessagesConsumer], opts)

and my process:

use Broadway

def start_link(_) do
  Broadway.start_link(__MODULE__,
    name: __MODULE__,
    producer: [
      module:
        {BroadwayRabbitMQ.Producer,
         queue: @queue,
         declare: QueueDeclareOptions.get(@queue),
         connection: connection(),
         on_failure: :reject_and_requeue,
         qos: [prefetch_count: 10],
         buffer_size: 10_000}
    ],
    processors: [default: [concurrency: 10]]
  )
end

When I kill my process via Process.exit(:kill) then my supervisor crashes totally:

09:37:44.222 [error] CRASH REPORT Process <0.664.0> with 0 neighbours crashed with reason: no match of right hand value {error,{already_started,<0.624.0>}} in 'Elixir.Broadway.Topology':init/1 line 37
...
09:37:44.246 [info] Application scenarios exited with reason: shutdown

As I got it, the linked process was not killed. How to solve this problem?

mix.exs:

  {:broadway_rabbitmq, "~> 0.6.5"},

Retrieving additional information from AMQP.Queue.declare

I have a requirement where I need to get access to the rabbitmq queue size. The AMQP client returns this information when using declare/3 which is currently being ignored by broadway_rabbitmq here.

Is this something that could be added to the message metadata or the producer state so that I can access it in handle_message/3?

I'm happy to open a PR if this is something that would be helpful to add

Update nimble options requirement

Hi,

I am looking to install this but I am running into an issue with nimble options dependency requirement. The broadway_rabbitmq package requires ~> 0.3.5 or ~> 0.4 but opentelemetry_phoenix (version 1.1.0) is requiring ~> 0.5. Could it be possible to update this dependency in here or relax the requirements a bit more?

Thanks!

Add API to setup RabbitMQ topology on producer connect

We (at work) often have the necessity of defining complex RabbitMQ topologies of exchanges and queues. RabbitMQ is pretty clear on the pattern here: declare everything you need every time you connect since declaring is idempotent. We also have a hard requirement of trying to connect to multiple RabbitMQ URLs until one connection succeeds.

The code that we currently have to do things like declare an exchange and bind it to another exchange before declaring the consumer queue and starting the producer looks like this:

with {:ok, conn} <- AMQP.Connection.open(...),
     {:ok, chan} <- AMQP.Channel.open(conn),
     :ok <- declare_exchange(chan),
     :ok <- bind_exchange(chan) do
  Broadway.start_link(...)
end

This has a huge disadvantage: it makes our Broadway pipeline synchronous when starting. That is, the pipeline won't start unless RabbitMQ is available right away. This goes against the good principle of starting the process and connecting in the background with potential backoff (see this great article).

What we would like to do instead is to be able to declare the necessary RabbitMQ topology every time the producer connects to RabbitMQ. This will free us of the synchronization point and generally makes things more coherent.

To achieve this, we can go two ways: either we provide a way to pass in a generic piece of code that takes a AMQP channel, or we come up with a schema for additional options that lets us declare arbitrary topologies.

Note that we're only interested in topologies that are strictly related to the pipeline here. This means that we don't care about being able to declare additional queues for example, since the pipeline can only consume from a single queue.

Option 1: custom code

The first option is to pass a generic piece of code that takes the AMQP channel. The API I propose is a new option :rabbitmq_setup_fun that takes an anonymous function or an MFA tuple.

rabbitmq_setup_fun: (AMQP.Channel.t() -> :ok | {:error, reason}) | {module(), atom(), [term()]}

An example of its usage:

rabbitmq_setup_fun: fn chan ->
  with :ok <- AMQP.Exchange.declare(chan, "my-exchange", :direct),
       :ok <- AMQP.Exchange.bind(chan, "other-exchange", "my-exchange", routing_key: "#"),
       do: :ok
end

or with MFA:

rabbitmq_setup_fun: {__MODULE__, :declare_exchange, [_routing_key = "#"]}

# In the module:
def declare_exchange(chan, routing_key) do
  with :ok <- AMQP.Exchange.declare(chan, "my-exchange", :direct),
       :ok <- AMQP.Exchange.bind(chan, "other-exchange", "my-exchange", routing_key: routing_key),
       do: :ok
end

Option 2: new options to declare exchanges and do exchange-to-exchange bindings

The alternative to custom code is to provide more options that let us declare arbitrary topologies. The nice thing is that in RabbitMQ you can only have bindings that look like this:

exchange1 -> exchange2 -> ... -> exchangeN -> queue

That is, you can only bind to one queue at the end of the "exchanges pipeline". This means we only need to support two things:

  1. Declaring an arbitrary number of exchanges
  2. Declaring an arbitrary number of exchange-to-exchange bindings

We don't need to support exchange-to-queue bindings since we already support that through the :bindings option.

Declaring exchanges

What I propose is to have a new :declare_exchanges option to declare exchanges:

declare_exchanges: [{name :: String.t(), type :: :direct | :topic | :headers | :fanout, options :: keyword()}]

For example:

declare_exchanges: [
  {"my-exchange", :topic, durable: true},
  {"my-other-exchange", :headers, durable: true, internal: true}
]

Binding exchanges to other exchanges

I have two different proposals here.

Option 1: add a new option :exchange_bindings (to mirror the name of the already existing :bindings) option.

exchange_bindings: [{source :: String.t(), dest :: String.t(), options :: keyword()}]

Option 2: modify the current :bindings option to support exchange-to-exchange bindings as well. Right now this option supports a list of {exchange_name :: String.t(), options :: keyword()}. What I propose is to switch to three-element tuples like the one above: {source :: String.t(), dest :: String.t(), options :: keyword()}. However, we would have a special possible value for dest which is the atom :queue which represents the queue used in the :queue option.

All thoughts are welcome, excited to have this discussion!

cc @josevalim @msaraiva @wojtekmach

Deal with the broker sending a basic_cancel

Today, the RabbitMQ producer here stops if it receives a basic_cancel from the broker: https://github.com/plataformatec/broadway_rabbitmq/blob/1ba40e62f079ad2277d7ef6b253872f31065fb78/lib/broadway_rabbitmq/producer.ex#L154-L157

After speaking with @josevalim, it seems that the supervisor will restart the producer and all will proceed. We have two alternatives to improve the current situation:

  1. Let the supervisor do its job: basically what we have today. We can use :shutdown as the reason. Using a supervisor as a restart mechanism is not the best IMO, as outlined in the famous post "It's all about the guarantees" (go read it if you haven't!).

  2. Handle the reconnection internally. This means using a possible backoff mechanism. This will let us control the logic behind reconnecting, while the producer process will never go down and always act as the "connection manager" instead of the connection itself.

Currently, the use case I have for the broker sending a basic_cancel to the consumer is when a queue is deleted. If the RabbitMQ consumer is consuming from a queue and that queue is deleted (for example, for recreating it with different properties), the broker sends a basic_cancel to the consumer.

Let me know how I can help.

Handle duplicated messages after connection lost

If the connection is lost after the message has been successfully processed but before been acknowledged, there's no way to acknowledge the message anymore since the acknowledgement is bound to the channel that has delivered the message. The documentation states that:

A channel only exists in the context of a connection and never on its own. When a connection is closed, so are all channels on it.

That means messages that were processed but not acknowledged will be requeued and processed more than once. However, the documentation also explains that:

If a message is delivered to a consumer and then requeued (because it was not acknowledged before the consumer connection dropped, for example) then RabbitMQ will set the redelivered flag on it when it is delivered again (whether to the same consumer or a different one). This is a hint that a consumer may have seen this message before (although that's not guaranteed, the message may have made it out of the broker but not into a consumer before the connection dropped)

That raises a couple of questions:

  1. Should broadway_rabbitmq have a builtin way to handle duplicated message due to a connection lost?
  2. Can we use the redelivered flag to avoid processing the message again? If so, how can we check if the redelivered message was previously successfully processed or not? The new message will have a different delivery_tag on a new channel which removes the possibility of comparing the messages. Is there another way?

Graceful shutdown

  • Implement Producer.prepare_for_draining to ensure no more messages will be received
  • Change the terminator
  • Test all messages are processed before shutdown

AMQP 2.0

Hi there. I am a co-maintainer of amqp library. I am currently preparing a major version release for the library.

Since broadway_rabbitmq seems to be the most popular library that uses amqp, I'd like to get your feedback before releasing the 2.0.0.

As you can see the release notes, there are couple breaking changes.

no_wait to nowait

amqp still supports no_wait option so you don't have to make any changes for this.

Connection name

Connection name is now moved to options parameter. You need to make a little change on BroadwayRabbitMQ.AmqpClient.setup_channel/1.

Since the option was introduced at 1.5 you will also need to change the dependency requirement like this:

{:amqp, "~> 1.5 or ~> 2.0"},

If this can block you supporting 2.0, I can consider dropping the change on Connection.open and keep supporting connection name as a separate parameter. What do you think?

If you have any other questions or/and feedback, please let me know.

Thanks!

ACK timeout kills connection without getting restarted

versions:
broadway: 1.0.0
bradway_rabbitmq: 0.7.0
amqp: 2.1
elixir: 1.12
otp: 24.0.5

I have some long-running tasks that sometime may time-out the consumer_timeout from rabbitmq with message:

09:38:26.044 [warn] AMQP channel went down with reason: {:shutdown, {:server_initiated_close, 406, "PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Timeout value used: 7200000 ms. This timeout value can be configured, see consumers doc guide to learn more"}}

The expected behavior would be to reestablish a new connection, kill the timed-out processors and rabbitmq to redeliver messages.

The current behavior is that the GenServer is killed and broadway can no longer send messages to rabbitmq. This is fixed only by restarting the broadway process.

Questions about supported versions of the AMQP protocol

Hi guys, first I would like to thank you for this excellent library.

In my company we have many services that connect to a broker called ActiveMQ Artemis. This broker among other types of protocol supports version 1.0 of the AMQP protocol. What I would like to know is that it would be possible to use this library to consume messages from that Broker. I know that the versions of the AMQP protocol 0.9 and 1.0 are completely different, but since RabbitMQ itself supports version 1.0 via plugin, I was hopeful that this library could work in my use case.

I would love to replace the current services written in C# by Elixir but I am encountering this barrier of protocols. I cannot change the Broker as it is also used by partners.

Note: I tried to use the MQTT Broadway library, but without success.

Any help?

Ease amqp dep

Would it be possible to ease amqp dep from 1.1 to 1.3 or ~> 1 ?

Allow for multiple rabbit queues

This is a feature request.

My use case is I have multiple rabbitmq queues across multiple rabbitmq instances.

I want to transform the data coming in, and publish it to elasticsearch in batches using the bulk api; https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html

The challenge is, Broadway is moving to a model where it is limited to one producer. So that means I'll have 1 batcher per queue. I need to listen on many rabbit queues (lets say about 100), and the architecture is forcing me to use 1 elasticsearch "stream" / bulk insert per queue.

So this would look like

<base supervisor>(1)
  <broadway supervisor>(1)
     <broadway instances per queue>(100)

I guess a workaround would be to have the handle_batch in each call a single process, which then batches the batches from these 100 instances, and flushes them to elasticsearch at certain intervals. But it seems like there is a strong use case for one of ;

  • having a way to share a batcher between broadway instances
  • allowing multiple producers per broadway instance
  • allow multiple hosts/queues in a single broadway_rabbitmq producer

Thoughts?

AMQP 1.0 support

Background :

At work we have a need to use AMQP 1.0 to consume messages from Azure Service Bus and IBM MQ via AMQP 1.0. We wanted to use broadway, so we made an offbroadway amqp10 plugin. The plugin is targeted for AMQP 1.0 in general vs just rabbitmq or a particular service.

We plan on open sourcing this, but before we do it was suggested by @wojtekmach to open an issue for discussion.
An AMQP 1.0 client could be added to broadway vs publishing a new offbroadway plugin. The question is, does it make sense?

Differences between AMQP 1.0 and AMQP < 1.0 aside, it makes sense in that this plugin could support rabbitmq installs that have AMQP 1.0 enabled (via rmq plugin).

On the other hand :

  • We'd have to come up with a way to map AMQP 1.0 to broadway_rabbitmq options and so forth. I haven't put any time in yet to seeing how that would work. It may be there are better ways to do this besides trying to map options (i.e., ship with two producer flavors, that utilize different clients).
  • We'd have to put in the docs, you can use broadway_rabbitmq for services other than rabbitmq (azure service bus, ibm mq, active mq, etc. etc.), which is slightly confusing.
  • There is no abstraction like the elixir amqp library right now. Since there is no abstraction, you have to expose "low level" concepts in the options, at least that's what we're doing in our offbroadway plugin atm, until a clear abstraction becomes clear. Though, it would not be difficult to just make one if time is put into to looking at all the possible cases.

Interested to hear thoughts on all this.

Reference:

AMQP 1.0 spec

Overview of AMQP 1.0 in Azure Service Bus

amqp10_client

producers keep failing with declare: [no_wait: true]

when i set it to declare: [no_wait: true] the producer keeps failing:

[error] Cannot connect to RabbitMQ broker: :ok
[error] Crashing because of unexpected error when connecting to RabbitMQ
[error] GenServer Rmq.Broadway.Producer_0 terminating
** (RuntimeError) unexpected error when connecting to RabbitMQ broker
    (broadway_rabbitmq 0.6.5) lib/broadway_rabbitmq/producer.ex:605: BroadwayRabbitMQ.Producer.handle_connection_failure/2
    (broadway_rabbitmq 0.6.5) lib/broadway_rabbitmq/producer.ex:428: BroadwayRabbitMQ.Producer.handle_info/2
    (broadway 0.6.2) lib/broadway/topology/producer_stage.ex:228: Broadway.Topology.ProducerStage.handle_info/2
    (gen_stage 1.1.0) lib/gen_stage.ex:2108: GenStage.noreply_callback/3
    (stdlib 3.14) gen_server.erl:689: :gen_server.try_dispatch/4
    (stdlib 3.14) gen_server.erl:765: :gen_server.handle_msg/6
    (stdlib 3.14) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
Last message: {:connect, :no_init_client}

elixir: 1.11.3
erlang 23.2.7

I belive the producer needs to wait the proper declare operation to finish before trying to bind the queue to the exchange.

An unhandled case clause

Library version 0.3.0
erlang 21.3.6
elixir 1.8.1

** (CaseClauseError) no case clause matching: {:error, :unknown_host}
    (broadway_rabbitmq) lib/broadway_rabbitmq/producer.ex:278: BroadwayRabbitMQ.Producer.connect/1
    (broadway_rabbitmq) lib/broadway_rabbitmq/producer.ex:187: BroadwayRabbitMQ.Producer.handle_info/2
    (broadway) lib/broadway/producer.ex:103: Broadway.Producer.handle_info/2
    (gen_stage) lib/gen_stage.ex:2082: GenStage.noreply_callback/3
    (stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:711: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3

An unknown host currently causes an exception in the Producer code.

I would expect this code to bubble up the error to my application code.

Ack at the start of the pipeline

Right now, Broadway acknowledges messages at the end of the pipeline. This is good if you are using an "at least once" delivery model, where your messages are idempotent and can be redelivered to a consumer many times. However, it's problematic in an "at most once" delivery model where you want to avoid processing a message twice. In those cases, the common pattern is to ack a message as soon as it's received by a consumer so that even if processing fails the message is not requeued.

This could be supported with an option on the producer (since that's the thing that knows how to deal with the external system and knows how to ack messages):

      producers: [
        default: [
          module: {BroadwayRabbitMQ.Producer, opts}
          stages: 1,
          ack: :start_of_pipeline
        ]
      ],

I can't think of a good name for the option but you get the idea.

One thing that's not clear to me is the behaviour of requeue: :never (in the RabbitMQ producer): I get that if I manually fail the message then the :requeue option is used, but what happens if something crashes in my pipeline? The message will not be acked and the :ack option would make sense there, right?

\cc @josevalim @msaraiva

Allow usage of connection/channel pool

this relates with the discussion on #60 but i'm creating a new issue to address just the points related to handling the resource that the producer uses to connect/communicate with rabbitmq.
just stating some things about rabbitmq and how AMQP lib model stuff:

  • to properly consume mesages a producer just needs a amqp channel not a connection.
  • a %AMQP.Channel{} has a pid for the channel that can be monitored.
  • to be aware of disconnections it's just needed to monitor the channel pid.
  • to handle reconnect it would be necessary just to checkout a new channel once the old one died.

With that in mind, what do you think about adding an option pool to the producer config, that option accept a 1-arity function or a mfa.
Once a channel is needed we checkout a channel by calling the function passing the producer pid for the pool manager.

AmqpClient should accept connection also in URI format

BroadwayRabbitMQ.AmqpClient validates connection params and accepts only a keyword list: https://github.com/plataformatec/broadway_rabbitmq/blob/3a7c7a68811dbcf723d8bc805e22a7eca27934ce/lib/broadway_rabbitmq/amqp_client.ex#L115

But AMPQ.Connection.open accepts also a binary for the url: https://hexdocs.pm/amqp/1.1.1/AMQP.Connection.html#open/1 in the form of "amqp://user:pass@server/db".
I can not set the rabbitmq connection param from ENV due to this limitation.
Can we change it to accept also a uri. We can validate the URI format if required.

`qos: [prefetch_count: 0]` doesn't work

The docs state (emphasis mine):

Unlike the RabittMQ client that has a default :prefetch_count = 0, which disables back-pressure, BroadwayRabbitMQ overwrite the default value to 50 enabling the back-pressure mechanism. You can still define it as 0, however, if you do this, make sure the machine has enough resources to handle the number of messages coming from the broker.

However, setting prefetch_count to 0 currently doesn't work, because it causes the buffer_size to be set to 0 as well, which prevents GenStage from starting up.

I think when it is set to 0, buffer_size should be set to a reasonably high value, or better yet, allow it to be configured.

missing license section in the readme

License

Copyright 2019 Plataformatec

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

econnrefused when deploying elixir project with broadway

versions:
elixir 1.12
OTP 24.0
broadway_rabbitmq :0.7
broadway: 1.0
amqp: 2.1.2

Suppose we have the following amqp configuration:

config :amqp,
  connections: [
    server: [
      url: "amqp://rabbitmq:rabbitmq@localhost"
    ]
  ],
  channels: [
    channel: [connection: :server]
  ]

With broadway producer configuration:

producer: [
        module: {
          BroadwayRabbitMQ.Producer,
          queue: "broadway_queue",
          qos: [prefetch_count: 50],
          declare: [durable: true],
          bindings: [{"outgoing-exchange", []}],
          on_failure: :reject
        },
        concurrency: 1
      ],

By default, when run with: iex -S mix, everything works as expected, no errors and side effects.

When creating a release however, after starting it, in log keeps appearing the error:

07:31:41.497 [error] Cannot connect to RabbitMQ broker: :econnrefused
07:31:57.607 [error] Cannot connect to RabbitMQ broker: :econnrefused
07:32:16.246 [error] Cannot connect to RabbitMQ broker: :econnrefused
07:32:38.095 [error] Cannot connect to RabbitMQ broker: :econnrefused
07:32:57.107 [error] Cannot connect to RabbitMQ broker: :econnrefused

I can confirm that there is a open connection, looking in rabbitmq management console.

When running a remote console with command:

> AMQP.Connection.open()
> {:error, :econnrefused}

However running:

AMQP.Connection.open("amqp://rabbitmq:rabbitmq@localhost")

Opens a connection successfully.

To my understanding this happens because broadway and amqp try to use the same connection. The thing that bothers me is that in development it works without any problems, even the connection to production rabbitmq server over tls.

Is this the case, or there is a bug?

Improve RabbitMQ docs

We need to:

  1. Mention the official guide at the top, as the guide is a quick getting started and not a complete reference (see SQS)

  2. Have a section on Acknowledgements. Say explicitly what we ack and what we don't ack. We can move the note about requeue to inside this section, but we should break it apart too. For example, it should say as distinct pargraphs: "if requeue is always, then watch out for x and y". "if requeue is never, then watch out for z and w". Etc. We should also talk about the redelivered metadata and have a subsection on batching that says if batching is necessary for RabbitMQ or not (see SQS)

  3. We should have a session on configuring RabbitMQ to use metadata. I removed it from the README here (See SQS)

Support for queue bindings

I've had the experience that when we weren't setting up queue bindings before starting to consume the queue, the RabbitMQ broker would cease to route messages correctly.

We've since moved to excessively declaring every detail of the queues and exchanges, including the bindings, and the problems with routing have disappeared.

I think it should be possible to either define a list of bindings to the queue, or add a callback that can do it instead.

Examples:

[
  queue: "fancy_queue",
  declare: [durable: true],
  bindings: [{"some_exchange", routing_key: "only.for.fancy.queue"}]
]

Alternatively with a callback, it could look like this:

[
  queue: "fancy_queue",
  declare: [durable: true],
  before_consume: &setup_fancy_queue/2
]

# ...

def setup_fancy_queue(channel, queue) do
  AMQP.Queue.bind(channel, queue, "some_exchange", routing_key: "only.for.fancy.queue")
end

Broadway.prepare_messages/2 callback always receives exactly one message

The Broadway docs for the Broadway.prepare_messages/2 callback state:

The length of the list of messages received by this callback is based on the min_demand/max_demand configuration in the processor.

However, in our application using BroadwayRabbitMQ.Producer, it seems as if the length of the list of messages is always exactly one regardless of the min_demand/max_demand configuration in the processor. Furthermore, it doesn't seem to matter if there are only a few messages in the RabbitMQ queue being used, or if there there are very many messages in the queue. Whenever I IO.inspect(length(messages), label: "prepare_messages batch size") in my prepare_messages callback, it always prints prepare_messages batch size: 1.

Is this a bug in BroadwayRabbitMQ.Producer? Or have I misconfigured something? Or is there something else happening here that I'm not correctly understanding?

For reference, here is the Broadway topology for the pipeline in question:

[
  producers: [
    %{
      name: Integration.MQ.TrackingData.PlayerTrackingConsumer.Broadway.Producer,
      concurrency: 1
    }
  ],
  processors: [
    %{
      name: Integration.MQ.TrackingData.PlayerTrackingConsumer.Broadway.Processor_default,
      concurrency: 2,
      processor_key: :default
    }
  ],
  batchers: [
    %{
      name: Integration.MQ.TrackingData.PlayerTrackingConsumer.Broadway.BatchProcessor_derived_stats,
      concurrency: 4,
      batcher_key: :derived_stats,
      batcher_name: Integration.MQ.TrackingData.PlayerTrackingConsumer.Broadway.Batcher_derived_stats
    }
  ]
]

And below is the (elided) configuration being passed to Broadway.start_link. Note that :qos and most other optional configs are not being overridden, and that the processor is configured with min_demand: 10, max_demand: 20, which should result in a batch of 10 messages being passed to prepare_messages.

    producer: [
        module: {BroadwayRabbitMQ.Producer,
         [
           bindings: [
             {"***elided exchange***",
              [routing_key: "***elided***"]}
           ],
           on_failure: :reject,
           metadata: [:delivery_tag, :exchange, :routing_key, :content_type,
            :content_encoding, :headers, :correlation_id, :message_id,
            :timestamp, :type, :app_id],
           declare: [durable: true],
           queue: "***elided***",
           connection: "***elided***"
         ]}
    ],
    processors: [
      default: [
        concurrency: 2,
        min_demand: 10,
        max_demand: 20
      ]
    ],
    batchers: [
      derived_stats: [
        concurrency: 4,
        batch_size: 20,
        batch_timeout: 1_000
      ]
    ]

Pass in an established connection

Hi there... Wondering if there's a way to pass in an already established connection to use for creating channels. (Probably please excuse my ignorance)

When I bump up the concurrency it makes a new connection for each producer rather than creating a new channel on an existing connection. I'm getting close to maxing out my connections and wondering if there's a way to pass in an MFA to get an established connection to make a channel from.

Working dead letter example

Hi,

I'm happy to be working on an academic project involving criminal justice reform using Elixir and Broadway RabbitMQ!

This started out as an issue but as I did more and more research, it is more of an example.

However it is sort of an issue, in that it's not totally clear how dead-letter configuration works in the docs, and I think it would be helpful to add an example.

Here is the code--I wanted to share a method so that others may benefit. Happy to create a PR if that's helpful as well.

Thanks!

This will send failed messages to the my_queue_error queue on the default exchange.

def declare_queues do
    {:ok, connection} = AMQP.Connection.open()
    {:ok, channel} = AMQP.Channel.open(connection)
    AMQP.Queue.declare(channel, "my_queue_error", durable: true)
    AMQP.Queue.declare(channel, "my_queue",
      durable: true,
      arguments: [
        {"x-dead-letter-exchange", :longstr, ""},
        {"x-dead-letter-routing-key", :longstr, "my_queue_error"}
      ]
    )
end

def handle_message(_, message, _) do
    try do
      data = Jason.decode!(message, keys: :atoms)
      do_something_with!(message)
      message
    rescue
      e in RuntimeError ->
        message
        |> Broadway.Message.failed(e.message)
    end
end

Ability to retrieve the AMQP Channel used by the Producer

First of all, thank you for this amazing library!

What do you think about adding the ability to retrieve the AMQP Channel used by the Producer?

I'm aware that the Channel is already available in the message metadata, but I'm talking here about having a dedicated method to query the Channel (from the Producer state) outside of a message consumption.

I see several use-cases for this:

  1. Publish RabbitMQ events: Broadway would be responsible for managing the AMQP Channel (connection, re-connection in case of failure, proper disconnection in case the process crashes, etc). This would save a lot of boilerplate code, avoiding the management of a dedicated Channel just to publish events.

  2. Send RabbitMQ RPC requests: building a RPC Client sending RPC requests with the Channel managed by Broadway, using Broadway as a reply-queue consumer for handling RPC responses. Kind of the same use-case as 1., but with the added complexity that we need to keep a mapping between the caller process PID and the RabbitMQ correlation ID, in order to send the response back to the caller process when the RPC response is consumed.

For now, I have circumvented this by having a dedicated process RequestHandler storing the Channel. I send the Channel to RequestHandler using the Broadway after_connect option. But this feels quite hacky to be honest.

Please tell me if you think this would be outside the scope of Broadway, as one could argue it is supposed to be essentially a consumer of messages, not a producer (publish events & RPC commands).

Use a shared connection among Broadway producers

Currently, each Broadway producer opens and monitors it's own connection. This has the advantage of keeping producers completely independent from each other, each one maintaining its own state. However, RabbitMQ's documentation states that:

Each connection uses about 100 KB of RAM (and even more, if TLS is used). Thousands of connections can be a heavy burden on a RabbitMQ server. In the worst case, the server can crash due to out-of-memory. The AMQP protocol has a mechanism called channels that “multiplexes” a single TCP connection. It is recommended that each process only creates one TCP connection, and uses multiple channels in that connection for different threads.

So ideally, I believe we should have a separated process to maintain the connection. This process would be responsible for opening, monitoring and reopening the connection when necessary using the backoff strategy chosen by the user.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.