Giter Site home page Giter Site logo

dashbitco / broadway Goto Github PK

View Code? Open in Web Editor NEW
2.3K 48.0 150.0 762 KB

Concurrent and multi-stage data ingestion and data processing with Elixir

Home Page: https://elixir-broadway.org

License: Apache License 2.0

Elixir 100.00%
elixir genstage data-ingestion data-processing concurrent broadway

broadway's Introduction

Broadway

CI

Build concurrent and multi-stage data ingestion and data processing pipelines with Elixir. Broadway allows developers to consume data efficiently from different sources, known as producers, such as Amazon SQS, Apache Kafka, Google Cloud PubSub, RabbitMQ, and others. Broadway pipelines are long-lived, concurrent, and robust, thanks to the Erlang VM and its actors.

Broadway takes its name from the famous Broadway street in New York City, renowned for its stages, actors, and producers.

To learn more and get started, check out our official website and our guides and docs.

Broadway Logo

Built-in features

Broadway takes the burden of defining concurrent GenStage topologies and provide a simple configuration API that automatically defines concurrent producers, concurrent processing, batch handling, and more, leading to both time and cost efficient ingestion and processing of data. It features:

  • Back-pressure
  • Automatic acknowledgements at the end of the pipeline
  • Batching
  • Fault tolerance
  • Graceful shutdown
  • Built-in testing
  • Custom failure handling
  • Ordering and partitioning
  • Rate-limiting
  • Metrics

Producers

There are several producers that you can use to integrate with existing services and technologies. See the docs for detailed how-tos and supported producers.

Installation

Add :broadway to the list of dependencies in mix.exs:

def deps do
  [
    {:broadway, "~> 1.0"}
  ]
end

A quick example: SQS integration

Assuming you have added broadway_sqs as a dependency and configured your SQS credentials accordingly, you can consume Amazon SQS events in only 20 LOCs:

defmodule MyBroadway do
  use Broadway

  alias Broadway.Message

  def start_link(_opts) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: {BroadwaySQS.Producer, queue_url: "https://us-east-2.queue.amazonaws.com/100000000001/my_queue"}
      ],
      processors: [
        default: [concurrency: 50]
      ],
      batchers: [
        s3: [concurrency: 5, batch_size: 10, batch_timeout: 1000]
      ]
    )
  end

  def handle_message(_processor_name, message, _context) do
    message
    |> Message.update_data(&process_data/1)
    |> Message.put_batcher(:s3)
  end

  def handle_batch(:s3, messages, _batch_info, _context) do
    # Send batch of messages to S3
  end

  defp process_data(data) do
    # Do some calculations, generate a JSON representation, process images.
  end
end

Once your Broadway module is defined, you just need to add it as a child of your application supervision tree as {MyBroadway, []}.

Comparison to Flow

You may also be interested in Flow by Dashbit. Both Broadway and Flow are built on top of GenStage. Flow is a more general abstraction than Broadway that focuses on data as a whole, providing features like aggregation, joins, windows, etc. Broadway focuses on events and on operational features, such as metrics, automatic acknowledgements, failure handling, and so on. Broadway is recommended for continuous, long-running pipelines. Flow works with short- and long-lived data processing.

License

Copyright 2019 Plataformatec
Copyright 2020 Dashbit

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

broadway's People

Contributors

acco avatar adrianomitre avatar akoutmos avatar feliperenan avatar hajto avatar jdenen avatar jesseshieh avatar josevalim avatar kenspirit avatar kianmeng avatar ktec avatar lackac avatar laszloesl avatar linjunpop avatar markthequark avatar mathiasose avatar mcrumm avatar mertonium avatar msaraiva avatar nathanl avatar objectuser avatar oskarkook avatar philss avatar slashmili avatar smaximov avatar spk avatar thefirstavenger avatar v0idpwn avatar whatyouhide avatar wojtekmach avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

broadway's Issues

Use list as data source

This is not an issue

Hello @josevalim

i am creating one cli based application which can download twitch videos, which can download videos in multiple chunks. retry download and resume download etc features will be there.
(This idea got when you were streaming advent of code 2018. i have looked into software to download your videos, than i thought i should make twitch video downloader in elixir it self)

Here i want to give an option to the user, about how many chunks they want to download concurrent. so i have few question.

  1. Broadway is good for this cli app? (i want to learn and in future i will make tutorial for the same)
  2. how can we use list of objects as data source, instead of redis, rabbitmq.
  3. how can come to know that list is empty or when all the chunks has been downloaded data source will become empty. once its completed i want to close cli process. can Broadway tell me that data source has no work to do?

Allow manual control of message acknowledge

Not sure if this issue should be here, but when I mark a message as failed for RabbitMQ, I wanna decide if I'll requeue or not based on the failure reason. Ex: JSON parse error should not be requeued but for other temporary errors I'll requeue to reprocess. I did a ad-hoc implementation trusting an internal contract and it's not ideal, but the impl looks like this:

def handle_message(_, %Message{data: payload, acknowledger: ack} = message, _) do
    {_, _, ack_data} = ack
    ack_data = Map.put(ack_data, :requeue, true)
    ack = 
      ack
      |> Tuple.delete_at(2) 
      |> Tuple.insert_at(2, ack_data)}
      
    message = %{message | acknowledger: ack}
    Message.failed(message, "failed")
end

Not sure the best option to have this through the API but I'll try to think about and propose something.

Stopping a pipeline on error

Error handling is described as follows in the Broadway documentation:

In case of failures, Broadway does its best to keep the failures contained and avoid losing messages. The failed message or batch is acknowledged as failed immediately. For every failure, a log report is also emitted.

Assume there's a requirement that we'd like to manually intervene whenever an error is encountered, instead of allowing processing to continue (due to processing being dependent upon strict message order). Would it be possible to stop a pipeline when encountering an error? Could I use the Broadway.Acknowledger behaviour to signal the producer to stop sending any further messages, or is there a better approach?

Additionally, if the messages were being partitioned it would be preferable to only stop messages within the affected partition, and allow other partitions to continue processing.

I'm interested in using Broadway for event handling (in event sourced applications) and currently support three error strategies: retry, skip, and stop. Retrying messages and skipping (nack) can be handled using the existing Broadway consumer API, but I can't see how to stop processing.

Terminator reporting error with intentional shutdown

I have a custom producer that consumes from an api so my producer achieve a point where there is no events to produce anymore and when all events are successfully acknowledged my broadway tree can go down, when this happens i send a message to stop the Broadway GenServer that is the beginning of the tree. i do that using GenServert.stop/2. My producer implements GenStage, Acknowledger and Producer behaviours. It handles ack/3, prepare_for_draining/1 and terminate/2 callbacks. When I send the message to the Broadway GenServer it logs this error message:

[error] Supervisor '<MyBroadway>.Supervisor' had child '<MyBroadway>.Terminator' started with 'Elixir.Broadway.Terminator':start_link([{producers,['<MyBroadway>.Producer_default_1']},{first,['<MyBroadway>.Processor_default_1',...]},...], [{name,'<MyBroadway>.Terminator'}]) at <0.532.0> exit with reason killed in context shutdown_error

Should i cover something else to avoid the error message? There is any config that i can pass to Broadway.start_link that gonna make the tree temporary? or this is a expected behavior?

Broadway.test_messages/2 doesn't send ack when using batchers

Here's a test that reproduces it:

defmodule EmptyProducer do
  use GenStage
  @behaviour Broadway.Producer

  @impl true
  def init(_args) do
    {:producer, []}
  end

  @impl true
  def handle_demand(_demand, state) do
    {:noreply, [], state}
  end
end

defmodule MyBroadway do
  use Broadway

  @impl true
  def handle_message(:default, message, _context) do
    message
  end
end

defmodule BugTest do
  use ExUnit.Case

  test "works", c do
    {:ok, pid} =
      Broadway.start_link(MyBroadway,
        name: c.test,
        producers: [
          default: [
            module: {EmptyProducer, []}
          ]
        ],
        processors: [
          default: [
            stages: 1
          ]
        ]
      )

    ref = Broadway.test_messages(pid, [1])
    assert_receive {:ack, ^ref, [%{data: 1}], []}
  end

  test "works with batchers", c do
    {:ok, pid} =
      Broadway.start_link(MyBroadway,
        name: c.test,
        producers: [
          default: [
            module: {EmptyProducer, []}
          ]
        ],
        batchers: [
          default: [
            stages: 1
          ]
        ],
        processors: [
          default: [
            stages: 1
          ]
        ]
      )

    ref = Broadway.test_messages(pid, [1])
    assert_receive {:ack, ^ref, [%{data: 1}], []}
  end
end

The 2nd test fails with:

  1) test works with batchers (BugTest)
     test/bug_test.exs:48
     No message matching {:ack, ^ref, [%{data: 1}], []} after 100ms.
     The following variables were pinned:
       ref = #Reference<0.1852397191.3371696133.32665>
     The process mailbox is empty.
     code: assert_receive {:ack, ^ref, [%{data: 1}], []}
     stacktrace:
       test/bug_test.exs:70: (test)

Hot Upgrade Woes

Yeah, I know they may not be the most popular feature, but my company loves hot upgrades and probably 75% of our deploys are done that way. A distillery upgrade release causes the whole node to crash and revert with this stack:

11:33:52.447 [error] GenServer MyBroadway.Supervisor terminating
** (FunctionClauseError) no function clause matching in Broadway.Server.handle_call/3
    (broadway) lib/broadway/server.ex:37: Broadway.Server.handle_call(:which_children, {#PID<0.3780.0>, #Reference<0.2960573015.1770520577.240437>}, %{name: MyBroadway, producers_names: [MyBroadway.Producer_default_1], supervisor_pid: #PID<0.3620.0>, terminator: MyBroadway.Terminator})
    (stdlib) gen_server.erl:661: :gen_server.try_handle_call/4
    (stdlib) gen_server.erl:690: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message: {:EXIT, #PID<0.3619.0>, {:function_clause, [{Broadway.Server, :handle_call, [:which_children, {#PID<0.3780.0>, #Reference<0.2960573015.1770520577.240437>}, %{name: MyBroadway, producers_names: [MyBroadway.Producer_default_1], supervisor_pid: #PID<0.3620.0>, terminator: MyBroadway.Terminator}], [file: 'lib/broadway/server.ex', line: 37]}, {:gen_server, :try_handle_call, 4, [file: 'gen_server.erl', line: 661]}, {:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 690]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 249]}]}}
11:33:52.448 [error] GenServer MyBroadway terminating

Proposal: Publishers with handled back pressure from business logic

As it was discussed at λ-days, I am to describe our use case for Broadway.

We have a Rabbit queue of incoming messages at the rate ~10K/sec in peaks.
We apply validation rules onto them since some of the messages are malformed.
These validation rules might be time-consuming, the result of applying them
would be another Rabbit queue (valid messages.)

Then we apply some processing again and deliver them to another queue(s).
That said, we currently use Rabbit as a communication channel between
different processes, microservices, and the main application. That simplifies
the architecture because there are very few if any ties between code
in different microservices.

It could be drawn this way:

——————————   ——————————   ——————————   ——————————   ——————————   —————————— 
| Rabbit | → |  Flow  | → | Rabbit | → |  ....  | → | Rabbit | → |  Flow  | 
——————————   ——————————   ——————————   ——————————   ——————————   —————————— 

Flows over GenStages are used for heavy computation. To avoid code repetitions,
we built a library that takes one or more processors in a form of {Mod, :fun} tuples
alongside with sources and destinations. It was created two years ago and
basically works over connection pools and compiled configs, that’s why we were so
excited to hear about Broadway. Unfortunately in it’s current documentation
Broadway is meant as a “dead end,” meaning that it has a great infrastructure
to handle back pressure and to process incoming data, but one cannot just plug-and-play
the existing Flow implementation into the pipeline as shown above.

It would be great to have [optional] publishers or how do you name it as well
(top half of the picture was taken from announce by Plataformatec):

                         [producer_1]
                             / \
                            /   \
                           /     \
                          /       \
                 [processor_1] [processor_2] ... [processor_50]  <- process each message
                          /\     /\
                         /  \   /  \
                        /    \ /    \
                       /      x      \ 
                      /      / \      \
                     /      /   \      \
                    /      /     \      \
              [batcher_s3_odd]  [batcher_s3_even]
                    /\                  \
                   /  \                  \
                  /    \                  \
                 /      \                  \
 [consumer_s3_odd_1] [consumer_s3_odd_2]  [consumer_s3_even_1] <- process each batch

 ————————————————————————————————————————————————————————————————————————————————————————

 [consumer_s3_odd_1] [consumer_s3_odd_2]  [consumer_s3_even_1] <- process each batch
                \      /             \      /                   
                 \    /               \    /                   
                  \  /                 \  /                   
                   \/                   \/                   
            [batcher_s3_odd]     [batcher_s3_even]
                         / \     /\ 
                        /   \   /  \
                       /     \ /    \
                      /       x      \
                     /       / \      \
                    /       /   \      \
                   /       /     \      \
                  /       /       \      \
                [publisher_1]   [publisher_2] ... [publisher_50]  <- publish each message

That way the consumer part (optionally including batchers) might be easily
extracted into a separate codebase, making the business code fully isolated from
any broker/connection handling.

We currently use kinda this architecture with supported source backends RabbitMQ / HTTP and
destinations backends SQL / RabbitMQ / HTTP / Slack. We are able to simply add another publisher
and change nothing in the business logic to export data to the database or to another
say client who requires HTTP webhooks.

As I said, our implementation of connectors is just a pool hence we need to reimplement
back pressure support in our business logic units. It is not as much of boilerplate,
but I am pretty sure it might bring a huge added value to Broadway if it was supported
through a configuration like:

def start_link(_opts) do
  Broadway.start_link(__MODULE__,
    name: __MODULE__,
    producers: [...],
    processors: [...],
    batchers: [...],

    pipeline: {MyBusinessUnit, :process},

    batchers: [
      s3_positive: [stages: 2, batch_size: 10],
      s3_negative: [stages: 1, batch_size: 10]
    ],
    publishers: [
      rabbit: [
        module: {BroadwayRabbitMQ.Publisher, exchange_name: "my_exchange"}
      ]
    ]
  )
end

Support partitioning in the batcher

For example, imagine that you want to send e-mails on the batcher or you have some sort of group identifier. That's the batcher_key/partition and we need to support those cases.

Add example for Erlang support

During the announcement someone asked about Erlang support. I think this should be a good starting point, but i don't do that much Erlang..

-module(broaderl).
-define(broadway, 'Elixir.Broadway').
-define(message, 'Elixir.Broadway.Message').
-behaviour(?broadway).

-export([start_link/0]).
-export([handle_message/3, handle_batch/4]).

start_link() ->
  Opts = [],
  ?broadway:start_link(broaderl, Opts).

handle_message(_, #{} = Message, _) ->
  Fun = fun(_Data) ->
    % Do some calculations
    ok
  end,

  Updated = ?message:update_data(Message, Fun),
  ?message:put_batcher(Updated, sqs).

handle_batch(sqs, _Messages, _BatchInfo, _Context) ->
  % Send batch of messages to S3
  ok.

Revise configuration API

The configuration API will be:

producers: [
  name: ...
],
processors: [
  default: ...
],
batchers: [
  one_or_many: [
    batch_size: ...,
  ]
]

For now processors can only have one entry (like producers). handle_message will receive the name of the processor. handle_batch will continue to receive the name of the batcher. We will replace "publishers" by "batchers".

Improve guides to include publishing messages

We should add an extra step that shows how to publish a message. It can be via command line but, if so, we should link to the Elixir API for them too (ExAWS for SQS, AMPQ for RabbitMQ, etc).

Generally speaking, the guides should be thin on details and include references to the docs for any in depth topic. For example, on the "Create queue" section for RabbitMQ, we can include the CLI example and link to the module docs that show how to do it programatically.

Pass topology options to the producer

Today all Broadway producers projects are in need of receiving more options about the topology. SQS and GCPubSub would benefit from knowing the topology to configure a connection pool. Kafka needs it for reasons I can't remember. And RabbitMQ would need it for the index: dashbitco/broadway_rabbitmq#36

One important thing to mention is also that Broadway does not respect the child_spec contract. We will always call the init function (and never child_spec and start_link) since the child specification is configured externally by the topology. Also remember that Broadway already has an optional callback, called prepare_for_draining.

In any case, we need to introduce a new callback. My suggestion is to call it init_for_broadway. It will receive init_for_broadway(options, {name, index}, topology). This callback is optional, if it is not defined, we will call the regular init callback. The name is the name given in the topology. And the index is index >= 0 with the position of the process in the supervision tree.

/cc @msaraiva @mcrumm @whatyouhide

Can Broadway work with temporary AWS sessionToken credentials?

I'm beginning an integration with a 3rd party which gives us a temporary/refreshable SQS sessionToken along with access_key_id and secret_access_key. I'd like to use Broadway, but right now I have more questions than answers. I apologize if a github issue is not the best way to ask my questions.

I see that ExAws has support for this authentication method, so I'm confident I can get things working at that level. But my concern is how to get Broadway to give me some sort of a "hook" point when credentials expire so I can refresh them. Will I have to write a custom Producer to make this possible? Maybe with just a custom SqsClient?

For an initial test, I've created a Broadway based module but configured it with bad credentials. I end up with

18:41:37.837 [error] Unable to fetch events from AWS. Reason: {:http_error, 403, %{code: "InvalidClientTokenId", detail: "", message: "The security token included in the request is invalid.", request_id: "dc265821-44a2-5c6c-84fd-0d01dcd34b38", type: "Sender"}}
18:41:37.838 [error] GenServer SeatScouts.TicketmasterSapiBroadway.Producer_default_1 terminating
** (ArgumentError) argument error
    :erlang.length(:ok)
    (broadway_sqs) lib/broadway_sqs/producer.ex:92: BroadwaySQS.Producer.handle_receive_messages/1
    (broadway) lib/broadway/producer.ex:66: Broadway.Producer.handle_demand/2
    (gen_stage) lib/gen_stage.ex:2170: GenStage.noreply_callback/3
    (stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:711: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message: {:"$gen_producer", {#PID<0.3797.0>, #Reference<0.3890175515.1423966212.213191>}, {:ask, 10}}
State: %{consumers: [{#PID<0.3797.0>, #Reference<0.3890175515.1423966212.213191>}], module: BroadwaySQS.Producer, module_state: %{demand: 0, receive_interval: 5000, receive_timer: nil, sqs_client: {BroadwaySQS.ExAwsClient, %{ack_ref: #Reference<0.3890175515.1423966209.215279>, config: [access_key_id: "ACCESS_KEY_ID", secret_access_key: "SECRET_ACCESS_KEY", security_token: "sessionToken"], queue_name: "my_queue", receive_messages_opts: [max_number_of_messages: 10]}}}, transformer: nil}

The SQS.Producer failing in handle_receive_messages is what makes me wonder if I'm going to have to write a Producer to handle my situation.

Any guidance would be appreciated. Thank you.

Distribution of messages to processor with multiple producers

Continuation from the topic on the forum here: https://elixirforum.com/t/broadway-message-distribution-strategy/24169/13

Given an extremely loaded set of processors and multiple producers some processors will get loaded up with a high number of messages (dozens or more) in their mailbox. Using the SQS producer, this leads to frequent dropped messages in the DLQ due to them hitting the timeout from waiting in the mailbox of a single worker for so long.

The only currently known workaround to prevent this mailbox buildup in the processors is to use a single stage for the producers.

Acknowledgement API

We need to define a different way to acknowledge messages. The current approach only acknowledge successful messages returned by handle_batch/4 as {:ack, successful: messages, failed: []}. Some ideas:

  • Create function ack_now(messages :: [Message.t]) - The user can immediately acknowledge messages as successful or failed.
  • Create function Message.failed(message :: Message.t) - Mark the message as failed. This forces the message to not be forwarded anymore.

Proposal: chained pipelines

Batcher would be able to forward messages to another pipeline. acks would be performed by the very last batcher.

The case is to have smaller pipelines that can be put together without the need to have an intermediate step of some external queue/topic.

Split processor and consumer?

Hello @josevalim,

This is my current GenStage based data ingestion flow:

[Uploader] -> [MetadataExtractor] -> [Persister]

An Uploader is a process hooked up to a Phoenix route, which takes in a binary upload, not with the Plug.Upload temporary file mechanism, but with a setup where every chunk enters the GenStage pipeline. The files being uploaded are software package files (DEB, RPM, ...).

The MetadataExtractor is in GenStage a ProducerConsumer. For every package type listed above, I have a specific implementation, e.g. DebMetadataExtractor, RpmMetadataExtractor, ...

The Persister is how the package file eventually is stored in backing storage: S3Persister, AzurePersister, ...

By reading the architecture documentation of Broadway, I assume my MetadataExtractor is the Broadway processor, where my Persister is the Broadway consumer. In my GenStage setup, I can have every combination of a processor and a consumer. They are orthogonal to each other.

Looking at how to implement a Broadway pipeline, the handle_message/3 (processor) and handle_batch/4 (consumer) functions need to be together in the same module.

Am I interpreting things wrong? If not, could Broadway be adapted such that the processor and consumer roles are split?

Initial Feedback

Hi guys,

I spent a few hours playing around with broadway and wanted to share my feedback. If it is too early for this, please feel free to close this issue. I think most of these things will be addressed when the documentation is published and finalized, but in the mean time I couldn't find anywhere else to discuss this stuff (I didn't see a #broadway channel on the Elixir Slack).

  1. Is the Message struct supposed to be opaque? In my producer I had to construct one manually for my handle_demand/2 callback. It was also initially unclear that handle_demand/2 was supposed to return a list of Message structs (I thought maybe they would get wrapped automatically).

I ended up doing something like this:

def handle_demand(demand, state) do
  messages =
    Enum.map(0..demand, fn _ ->
      data = make_random_data()
      %Broadway.Message{data: data}
    end)
  {:noreply, messages, state}
end
  1. Some of the example code was incomplete. For example I thought that I would get these functions via use Broadway, but it turns out they are in the Message module and I needed to prefix them as such.

  2. Similar to 1., it wasn't clear how I was supposed to implement my handle_batch/4 callback such that I would provide an :acknowledger value on the message. I also couldn't figure out which module should implement the Broadway.Acknowledger (or why it would make any difference). I ended up doing this:

defmodule MyBroadway do
  ...
  def handle_batch(:my_publisher, messages, _batch_info, _context) do
    successful =
      messages
      |> Enum.map(fn m -> Map.put(m, :acknowledger, {MyBroadway, nil}) end)
    {:ack, successful: successful, failed: []}
  end
end
  1. Is it possible to send messages from one processor to another? From the example, it was not clear how you would make more complex graphs. For example, if I want to have multiple processing steps, do they all get bundled together in the handle_message/2 callback? Or is there something similar to the existing Message.put_publisher/2, for example: Message.put_next_processor/2.

Let me know if you want more details! Looking forward to the Lambda Days presentation!

Exception when calling Broadway.start_link() w/ BroadwaySQS.Producer

I am basically implementing the module from the guide https://hexdocs.pm/broadway/amazon-sqs.html

However, I am getting an error at the point of calling Broadway.start_link.

MyBroadway.start_link([])                                       
** (EXIT from #PID<0.932.0>) shell process exited with reason: an exception was raised:
    ** (MatchError) no match of right hand side value: {:error, {:shutdown, {:failed_to_start_child, #Reference<0.2717259105.895746049.233120>, {:shutdown, {:failed_to_start_child, MyBroadway.Producer_sqs_1, {:badarg, [{:ets, :match, [Broadway.TermStorage, {:"$1", %{config: [], queue_name: "my-test-queue"}}], []}, {Broadway.TermStorage, :find_by_term, 1, [file: 'lib/broadway/term_storage.ex', line: 64]}, {Broadway.TermStorage, :put, 1, [file: 'lib/broadway/term_storage.ex', line: 41]}, {BroadwaySQS.ExAwsClient, :init, 1, [file: 'lib/broadway_sqs/ex_aws_client.ex', line: 22]}, {BroadwaySQS.Producer, :init, 1, [file: 'lib/broadway_sqs/producer.ex', line: 60]}, {Broadway.Producer, :init, 1, [file: 'lib/broadway/producer.ex', line: 40]}, {GenStage, :init, 1, [file: 'lib/gen_stage.ex', line: 1700]}, {:gen_server, :init_it, 2, [file: 'gen_server.erl', line: 374]}, {:gen_server, :init_it, 6, [file: 'gen_server.erl', line: 342]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 249]}]}}}}}}
        lib/broadway/server.ex:16: Broadway.Server.init/1
        (stdlib) gen_server.erl:374: :gen_server.init_it/2
        (stdlib) gen_server.erl:342: :gen_server.init_it/6
        (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3

It seems like the error may be from an uncaught exception in BroadwaySQS.ExAwsClient on this line:
https://github.com/plataformatec/broadway_sqs/blob/b54183218df12e80f5f007dc4c8b5042df939d6d/lib/broadway_sqs/ex_aws_client.ex#L32

It looks like Broadway.TermStorage is not getting started?

I've gone through the docs multiple times and have confirmed:

  • my dependencies are up to date
  • my AWS access keys are set in the environment

Here is my module's start_link function

Broadway.start_link(__MODULE__,
          name: __MODULE__,
          producers: [
            sqs: [
              module: {BroadwaySQS.Producer, queue_name: "my-test-queue"}
            ]
          ],
          processors: [
            default: []
          ],
          batchers: [
            default: [
              batch_size: 10,
              batch_timeout: 2000
            ]
          ]
        )

Provide tutorials/guides

  • Clean up README and docs
  • Guide: BroadwaySQS
  • Guide: How to integrate custom producers
  • Guide: Architecture

Add API to ack at will

@josevalim and I discussed a new API to manually ack a message instead of having to wait for the Broadway pipeline to be done so that the message is acked automatically.

This API would call the acknowledger in the message directly and then set it to a no-op acknowledger.

Broadway.Message.ack(message)

We are not passing options to this functions because the acknowledger can be configured individually for each message through the API proposed in #104.

Add Broadway.push_messages/2

It is a wrapper around Broadway.Producer.push_messages. We first need to task the Broadway.Server for a name of a producer (it can be randomic) which we return to the client and then the client invokes it.

We also need to validate that the messages are properly formatted (i.e. it has the acknowledger field and what not).

Distinguish failed messages and errors

We need a way to distinguish manually-failed messages from messages where there's an error in the Broadway pipeline. We'll probably need a way to add information alongside the failure/error so that we can pass around the failure/error reason.

Provide synced wait

If the source has nothing to provide, we should be able to wait across all producers, otherwise if all of them poll at a given frequency, it can be too much.

Rate-limit producer demand

This allows the producer wrapper to not call handle_demand under a certain rate limiting configuration. We would love to hear use cases on this before implementing it.

Error reporting

Since those are stateless, we can rescue those errors and provide back-off currently. We also need to fail messages.

Messages between LiveView and a Broadway application/worker

Hi everyone, feel free to close this is it's too dumb of a question, but i was wondering if it would be possible (maybe using Phoenix's PubSub) to send events from a running Broadway worker to (somehow) a LiveView connected web browser, so a user could see in realtime the work Broadway is doing at the moment.

Thanks in advance,
Pedro

Add an API to configure the acknowledger

@josevalim and I discussed an API to configure the acknowledger for a specific message.

This API would let users change the way each single message is acknowledged:

Broadway.Message.configure_ack(message, nack: :reject)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.