Giter Site home page Giter Site logo

broadway_sqs's People

Contributors

adrianomitre avatar balexand avatar bannmoore avatar cduruk avatar dokie avatar edennis avatar fabiorodrigues avatar gvaughn avatar istefo avatar ixtli avatar josevalim avatar jstewart avatar maartenvanvliet avatar msaraiva avatar nbw avatar philss avatar rogerweb avatar samullen avatar sophisticasean avatar thefirstavenger avatar tjsousa avatar vanetix avatar whatyouhide avatar wkirschbaum avatar wojtekmach avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

broadway_sqs's Issues

SQS producer stops getting new messages after a minute of application start

Hey all, I'm not sure where to start debugging this, but I have a bunch of broadway processes running, one of which consistently stops sending pulling in new messages in roughly a minute after launching the app.

I haven't gotten any error messages. Other queues keep running fine.

It feels like it's a bug, but it could very well be a configuration error on my side as well.

Here's the code i'm running, left out some of our business logic.

defp producer do
    if Application.fetch_env!(:basement, :indexing_enabled) do
      {BroadwaySQS.Producer, queue_url: @sqs_url}
    else
      # {BroadwaySQS.Producer, queue_url: @sqs_url}
      {Broadway.DummyProducer, []}
    end
  end

  def start_link(_opts) do
    Logger.info("Starting #{inspect(__MODULE__)}")

    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: producer()
      ],
      processors: [
        default: [concurrency: 10]
      ],
      batchers: [
        default: [concurrency: 1, batch_size: 10, batch_timeout: 1000]
      ]
    )
  end

  def handle_message(_processor_name, message, _context) do
    Logger.info(
      "Basement.Indexers.Tasks.RefreshMetadataImagesTask: refreshing images for #{inspect(message.data)}"
    )

    process(message.data)
    |> case do
      :ok -> Message.put_batcher(message, :default)
      {:error, error} -> Message.failed(message, error)
    end
  end

  def handle_batch(:default, messages, _batch_info, _context) do
    messages
  end

  def handle_failed(messages, _context) do
    messages
    |> Enum.each(fn %Message{status: status} ->
      Logger.error(
        "Indexers.Ethereum.BlockIngress message failed with status: #{inspect(status)}"
      )
    end)

    messages
  end

AWS.SimpleQueueService.BatchEntryIdsNotDistinct

We are getting these errors on occasion (once or twice a week):

[error] ** (ExAws.Error) ExAws Request Error! 
 {:error, {:http_error, 400, %{code: "AWS.SimpleQueueService.BatchEntryIdsNotDistinct", detail: "", message: "Id 688a1987-8bce-4093-99ac-8acf3bebd17e repeated.", request_id: "7974caea-316e-53ce-8222-9dc58e5736ba", type: "Sender"}}} 
     (ex_aws 2.1.3) lib/ex_aws.ex:66: ExAws.request!/2 
     (elixir 1.10.2) lib/enum.ex:783: Enum."-each/2-lists^foreach/1-0-"/2 
     (elixir 1.10.2) lib/enum.ex:783: Enum.each/2 
     (elixir 1.10.2) lib/enum.ex:789: anonymous fn/3 in Enum.each/2 
     (stdlib 3.12.1) maps.erl:232: :maps.fold_1/3 
     (elixir 1.10.2) lib/enum.ex:2127: Enum.each/2 
     (broadway 0.6.0) lib/broadway/consumer.ex:64: Broadway.Consumer.handle_events/3 
     (gen_stage 1.0.0) lib/gen_stage.ex:2395: GenStage.consumer_dispatch/6 

My first hunch was a duplicate message due to at-least-once behavior of standard queues. However, the crash repeats over and over until I go in to the SQS manager and delete the single message...

I'm a bit stumped at the root cause, but would a PR that runs uniq by id on the receipts before sending up the delete batch be welcome?

Application fails to start with badarg, :ets :match

I was trying to setup one of my applications with broadway_sqs. It just does not start though. Here's the module that 'uses' Broadway:

defmodule MySQSListener do
  use Broadway

  alias Broadway.Message

  def start_link(_opts) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: {
          BroadwaySQS.Producer,
          queue_url:
            "https://my-sqs-queue-url"
        }
      ],
      processors: [
        default: []
      ],
      batchers: [
        sqs: [stages: 2, batch_size: 10]
      ]
    )
  end
end

I have added the MySQSListener as a child in my app supervisor like so:

children = [ 
          # Few other children...
          {MySQSListener, []}
        ]

Supervisor.start_link(children, strategy: :one_for_one)

Here's what my mix.exs deps looks like:

defp deps do
    [
      {:jason, ">= 1.0.0"},
      {:tesla, "~> 1.2"},
      {:mongodb, ">= 0.4.7"},
      {:poolboy, "~> 1.5.1"},
      {:bongo, "~> 0.1.0"},
      {:distillery, "~> 2.0"},
      {:hackney, "~> 1.9"},
      {:plug_cowboy, "~> 2.0"},
      {:appsignal, "~> 1.10.2"},
      {:timex, "~> 3.5"},
      {:quantum, "~> 2.3"},
      {:broadway_sqs, "~> 0.5.0"}
    ]
  end

And here's what is output just before the app crashes when i do iex -S mix:

** (Mix) Could not start application myapp: MyApp.Application.start(:normal, []) returned an error: shutdown: failed to start child: MySQSListener
    ** (EXIT) an exception was raised:
        ** (MatchError) no match of right hand side value: {:error, {:shutdown, {:failed_to_start_child, #Reference<0.3480402906.800849924.27293>, {:shutdown, {:failed_to_start_child, MySQSListener.Broadway.Producer_0, {:badarg, [{:ets, :match, [Broadway.TermStorage, {:"$1", %{config: [], queue_url: "https://myqueue-url"}}], []}, {Broadway.TermStorage, :find_by_term, 1, [file: 'lib/broadway/term_storage.ex', line: 64]}, {Broadway.TermStorage, :put, 1, [file: 'lib/broadway/term_storage.ex', line: 41]}, {BroadwaySQS.ExAwsClient, :init,1, [file: 'lib/broadway_sqs/ex_aws_client.ex', line: 32]}, {BroadwaySQS.Producer, :init, 1, [file: 'lib/broadway_sqs/producer.ex', line: 174]}, {Broadway.Producer, :init, 1, [file: 'lib/broadway/producer.ex', line: 71]}, {GenStage, :init, 1, [file: 'lib/gen_stage.ex', line: 1704]}, {:gen_server, :init_it, 2, [file: 'gen_server.erl', line: 374]}]}}}}}}
            lib/broadway/server.ex:26: Broadway.Server.init/1
            (stdlib) gen_server.erl:374: :gen_server.init_it/2
            (stdlib) gen_server.erl:342: :gen_server.init_it/6
            (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3

Please let me know in case I'm going wrong somewhere?

Proposal on dynamic visibility timeout

I have a legacy application that uses GenStage for processing messages from an SQS queue.
The processing takes a variable amount of time so I have implemented some sort of registry that handles the visibility timeout of messages. When a message arrives a timer that periodically extends its visibility is started. When the processing fails the visibility timeout is set to 0 and if instead the processing is successful the message is deleted from the queue. After some configured time the timer is stopped and the visibility set to 0.

I don't know if this is a common pattern and I don't think the whole "visibility registry" logic should be included in this project.

I, however, suggest some small additions to have more flexibility in situations like this.

The first is to have the Acknowledger behavior in a separate module and let it be overridden by a config parameter. Regarding my use case, this would allow a user to perform custom actions on success/fail outcomes by implementing its own ack logic.

The second one is to have an optional callback to be executed after wrap_received_messages to let a user modify the received messages or perform actions. For instance, I will use it for starting the visibility renew timers. Maybe also the :transformer option of producers can be used for this but it is executed for every single message and not for the whole received list and I it is executed on the handle_demand callback, not when actually the messages are received.

Dependency on ex_aws_sqs

Hi 👋

Background
As you are aware, recently the maintainer of ex_aws stepped away from the project and expressed some concerns with how the library is written. I understand that another maintainer has stepped up to take over the project, which is great and appreciated.

This lead to some discussion about the possible use of aws_elixir under the aws-beam GitHub organization as an alternative, which uses aws-codegen to generate client code from the AWS Go SDK specs. Manually written code is still necessary, but the codegen lightens the maintenance burden considerably.

These events, plus the need for me to find an AWS library that supports IMDSv2 (See here and here for some movement on that) has lead me to explore which library I want to use going forward, as well as to think through ways to ensure that my code is more isolated from changes to our external dependencies. It seems ideal if my code can wrap external dependencies and ensure that they conform to a specific behaviour, enabling me to swap out dependencies in the future with limited impact.

Proposal
With all of that as background, has there been any consideration to possibly removing the dependency on ex_aws_sqs and allowing the choice of AWS SQS client to be swapped out by consumers of the library? I already have local adapters for an SQS behaviour that I use for dev, test and prod to wrap up some of my direct ExAws calls, but I need to extend this into our dependencies like Broadway SQS as well.

For reference, it looks like these are the current dependencies on ex_aws and ex_aws_sqs on master.

ExAws.SQS.send_message/2
ExAws.SQS.request/2
ExAws.SQS.request!/2
ExAws.SQS.create_queue/1
ExAws.SQS.receive_message/2
ExAws.SQS.delete_message_batch/2
ExAws.Request.HttpClient behaviour used in tests

Does this proposal sound reasonable to you? And if so, would you like help implementing the changes? Any input or guidance is greatly appreciated.

Thanks for your time and for an excellent project!

Allow BroadwaySQS to work with temporary sessionToken

Discussion started here dashbitco/broadway#68.

@gvaughn:

Will I have to write a custom Producer to make this possible? Maybe with just a custom SqsClient?

Currently, the only option would be to create a custom SQSClient. However, we believe it would be a good idea to extend the API to allow this, so here's is a first suggestion:

We can provide a new config option for the client called token_generator, which is a {M, f, a} to be called before each receive/delete request. An initial implementation of this generator could be a GenServer that periodically updates the token (maybe on an :ets table with read_concurrency: true?).

In case you see other scenarios where we might need to update other config options, we could even consider expanding this concept to a more generic one like extra_options_generator that could be used to merge any other option to the config.

Thoughts?

`queue_url` & region config

I have the following module configured:

module:
  {BroadwaySQS.Producer,
    queue_url: "https://us-east-2.queue.amazonaws.com/100000000001/some_queue"
  }

When I start the application, Broadway starts polling against a queue region based on its own config: region: value or the ex_aws: region: config value (default: us-east-1):

[debug] ExAws: Request URL: "https://sqs.us-east-1.amazonaws.com/"

If I change the module config to the following (or the ex_aws: region: value):

{BroadwaySQS.Producer,
  queue_url: "https://us-east-2.queue.amazonaws.com/100000000001/some_queue",
  config: [
    region: "us-west-1"
  ]}

It polls the following queue:

[debug] ExAws: Request URL: "https://sqs.us-west-1.amazonaws.com/"

Is it possible to poll a queue from a different region than what is configured as the region (strictly based on the queue_url)?

I'm not understanding the purpose of providing the full queue url (required) if it's just going to use the region from the region setting. Thanks!

[error] Unable to fetch events from AWS. Reason: {:option, :server_only, :honor_cipher_order}

Hi all, getting this error when running a broadway app inside a docker container on AWS fargate. All the AWS credentials are being correctly set and passed to the producer:

Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producers: [
        default: [
          module: {
            BroadwaySQS.Producer,
            attribute_names: :all,
            message_attribute_names: :all,
            queue_name: System.get_env("QUEUE_NAME"),
            config: get_aws_credentials(),
            receive_interval: 1000
          }
        ]
      ],
      processors: [
        default: []
      ],
      batchers: [
        default: [
          batch_size: 10,
          batch_timeout: 2000
        ]
      ]
    )

Where get_aws_credentials() is:

def get_aws_credentials do
    region = System.get_env("AWS_REGION")

    [
      secret_access_key: System.get_env("AWS_SECRET_ACCESS_KEY"),
      access_key_id: System.get_env("AWS_ACCESS_KEY_ID"),
      region: region,
      host: "sqs.#{region}.amazonaws.com",
      scheme: "https"
    ]
  end

All those env vars are present and correct inside the container.

I do notice that the queue url appears to not be constructed correctly (i think it's missing the AWS account id):

It's https://sqs.eu-west-1.amazonaws.com/my_queue, when it should be https://sqs.eu-west-1.amazonaws.com/12345678/my_queue (where 12345678 is the account id).

For completion this account has roles to be able to access test and production environments, so that could be partially the cause. Is there a setting to explicitly pass the account id to the producer?

Cheers!

Upgrade broadway dependency to version 0.4.0

Hello! 👋 We're currently in a situation where we're using broadway_sqs 0.2.0, which is currently pulling in broadway 0.3.0. We're interested in leveraging testing features that were introduced in 0.4.0, such as Broadway.DummyProducer.

For the moment we've recreated that producer in our project, but it'd be awesome if we could use Broadway 0.4.0 alongside broadway_sqs.

EDIT: I created a PR for this!

$ mix deps.get
Resolving Hex dependencies...

Failed to use "broadway" (version 0.4.0) because
  apps/manifold/mix.exs requires ~> 0.4.0
  broadway_sqs (version 0.2.0) requires ~> 0.3.0

** (Mix) Hex dependency resolution failed, change the version requirements of your dependencies or unlock them (by using mix deps.update or mix deps.unlock). If you are unable to resolve the conflicts you can try overriding with {:dependency, "~> 1.0", override: true}

Producer would sometimes send messages into the void

I think there's a bug in a way producer handles the demand.

Consider following scenario:

  1. Producer receives demand for 10 messages
  2. Producer fetches 9 messages from the server here
  3. Producer then schedules an immediate receive here
  4. Producer sends 9 messages to a consumer via return value of handle_demand callback
  5. Scheduled on step 3 handle_receive_messages kicks in, fetches message number 10 from the queue and... returns it into nowhere, since it isn't called from within a handle_demand callback?

I am not familiar with SQS, but I'd assume this would delay processing of message number 10 for at least the value of visibility timeout

I wrote a test to reproduce the bug, please find it here
Sorry for not providing a full fix, I don't quite understand how this feature was desgined to work.

Process Single Batch of Messages in Parallel

Is there a way to send each message received in a batch to a different processor? From my local testing, it seems all messages received in a batch always go to the same processor (pid). I would expect that if I have producer concurrency = 1, and processor concurrency = 10, if I receive 10 messages each message would get processed in parallel. Instead, each message consumed in the batch is processed synchronously by the same processor. My config:

[
      producer: [
        module: {
          BroadwaySQS.Producer,
          queue_url: "http://example.com/myQueue",
          wait_time_seconds: 20,
          max_number_of_messages: 10,
          receive_interval: 100,
        },
        concurrency: 1
      ],
      processors: [
        default: [
          concurrency: 10
        ]
      ]
    ]

It seems like the only way to do what I am looking for is to set max_number_of_messages to 1, which will come with significant performance and cost implications.

Am I missing anything?

Thanks!

More explicit docs on SQS message acknowledgment?

Hey guys, great work on both Broadway and BSQS!

I was reading the documentation and it was not immediately obvious to me how the messages get acknowledged (or not) back to AWS in the handle_batch function.

Am i correct to assume that any messages that are returned as the return value get acknowledged and those who are not do not? If so, i think the phrasing on the docs could reflect this a bit better for newcomers. If not, do i have to somehow change the message to make sure it doesn't get acknowledged when i fail to process it?

Thanks in advance!
Pedro

Example app broken

With the upgrade to ex_aws_sqs from 2.x to 3.x, the example app is broken. Only the queue name is given, not the queue url as required now.

Error reporting/handling? (Acknowledgement silent errors!)

Hey guys,

First of all. Thanks a lot for putting so much work into this and making it available for us.

What happened with us is that we deployed to production Broadway + Broadway SQS and we noticed that the messages just wasn't being "acknowledged" on the AWS side so all messages were coming through but they weren't being deleted so they just stayed as "in flight" and then appeared again to be processed and eventually went to DLQ.

We spent many many hours trying to understand what was wrong because since there was no errors, exceptions of any sort on our logs... everything seemed to be just fine.

Naturally, we suspected that something was wrong with our implementation, read and re-read docs and examples everywhere, checked networking, CPU resources, everything! But in the end the AWS ACCESS KEY used to connect to SQS just didn't have enough permissions to "delete" messages or delete messages in batch (I didn't created the API key myself). As always the error was really stupid but since it wasn't obvious straight away we spent many hours looking at the wrong places.

My goal with this issue is at first to document what happened to us but also let you know that these silent errors can also be very tricky. Would be nice to have on the docs or to have a option on the client like a debug mode so we could have more verbose information regarding those "silent" errors during desperate times.

Thanks!

Document wildcard support for `message_attribute_names`

According to AWS documentation there is support for wildcards when specifying which message attributes to retrieve.

For testing purposes we use https://github.com/softwaremill/elasticmq which doesn't seem to support the All parameter but does indeed support .* I've tried specifying it as message_attribute_names and it works as expected. Would it be useful to add it to the documentation?

producer: [
  module: {BroadwaySQS.Producer,
    queue_url: "https://sqs.amazonaws.com/0000000000/my_queue",
    message_attribute_names: [".*"],
  }
]

Upgrade ex_aws_sqs dependancy to version 3 for localstack

Hi, I'm trying to get broadway working with localstack and was having some trouble. I specify queue_name as http:/localhost:4576/queue/events.

When requests are made the following url is used:

https://sqs.us-east-1.amazonaws.com/http:/localhost:4576/queue/events

resulting in a 404.

This PR adds the ability to specify the queue url instead which could be used to specify the exact url to work with localstack. This functionality is included in release 3.

Add opts.queue_url to the error message trying to receive messages

My application consumes messages from multiple SQS queues. If a queue is deleted, I get an error message like this:

[error] Unable to fetch events from AWS. Reason: {:http_error, 400, %{
  code: "AWS.SimpleQueueService.NonExistentQueue",
  message: "The specified queue does not exist for this wsdl version.",
  type: "Sender",
  request_id: "5436c6c6-1c58-5e85-bf11-9b5c27e3bc93",
  detail: nil
}}

However, I can't tell which queue the error refers to because AWS doesn't include its name in the response neither broadway_sqs includes it in the error message:

  defp wrap_received_messages({:error, reason}, _) do
    Logger.error("Unable to fetch events from AWS. Reason: #{inspect(reason)}")
    []
  end

Would it be possible to include the opts.queue_url in the error message?

ex_aws_sqs no longer constructs request correctly

The following worker (c&p from the docs) fails to connect to the queue.

defmodule MyBroadway.Worker do
  use Broadway

  def start_link(_) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producers: [
        default: [
          module:
            {BroadwaySQS.Producer,
             queue_name: "second",
             config: [
               access_key_id: "x",
               secret_access_key: "y",
               region: "us-east-2"
             ]}
        ]
      ],
      processors: [
        default: []
      ],
      batchers: [
        default: [
          batch_size: 10,
          batch_timeout: 2000
        ]
      ]
    )
  end

  def handle_message(_, message, _) do
    receipt = %{
      id: message.metadata.message_id,
      receipt_handle: message.metadata.receipt_handle
    }

    IO.inspect(receipt)
  end
end

It gets the following errors:

18:33:39.373 [debug] ExAws: Request URL: "https://sqs.us-east-2.amazonaws.com/" HEADERS: [{"Authorization", "AWS4-HMAC-SHA256 Credential=x/20190924/us-east-2/sqs/aws4
_request,SignedHeaders=content-encoding;content-type;host;x-amz-date,Signature=z"}, {"host", "sqs.us-east-2.amazonaws.com"
}, {"x-amz-date", "20190924T223339Z"}, {"content-type", "application/x-www-form-urlencoded"}, {"content-encoding", "identity"}] BODY: "Action=ReceiveMessage&MaxNumberOfMessages=10&Queue
Url=second" ATTEMPT: 1

18:33:39.420 [error] GenServer MyBroadway.Worker.Broadway.Producer_default_1 terminating
** (UndefinedFunctionError) function Poison.decode/1 is undefined (module Poison is not available)
    Poison.decode("<?xml version=\"1.0\"?><ErrorResponse xmlns=\"http://queue.amazonaws.com/doc/2012-11-05/\"><Error><Type>Sender</Type><Code>InvalidAddress</Code><Message>The address s
econd is not valid for this endpoint.</Message><Detail/></Error><RequestId>4939ea47-c4a1-5715-934f-bc4358a24cee</RequestId></ErrorResponse>")
    (ex_aws) lib/ex_aws/request.ex:108: ExAws.Request.client_error/2
    (ex_aws) lib/ex_aws/request.ex:57: ExAws.Request.request_and_retry/7
    (ex_aws) lib/ex_aws/operation/query.ex:41: ExAws.Operation.ExAws.Operation.Query.perform/2
    (broadway_sqs) lib/broadway_sqs/ex_aws_client.ex:50: BroadwaySQS.ExAwsClient.receive_messages/2
    (broadway_sqs) lib/broadway_sqs/producer.ex:196: BroadwaySQS.Producer.handle_receive_messages/1
    (broadway) lib/broadway/producer.ex:66: Broadway.Producer.handle_demand/2
    (gen_stage) lib/gen_stage.ex:2099: GenStage.noreply_callback/3
Last message: {:"$gen_producer", {#PID<0.316.0>, #Reference<0.1588154580.3783000066.245470>}, {:ask, 10}}
State: %{consumers: [{#PID<0.316.0>, #Reference<0.1588154580.3783000066.245470>}], module: BroadwaySQS.Producer, module_state: %{demand: 0, receive_interval: 5000, receive_timer: nil, s
qs_client: {BroadwaySQS.ExAwsClient, %{ack_ref: #Reference<0.1588154580.3783000065.247160>, config: [access_key_id: "x", secret_access_key: "y", region: "us-east-2"], queue_name: "second", receive_messages_opts: [max_number_of_messages: 10]}}}, transformer: nil}

These are my deps

  defp deps do
    [
      {:broadway_sqs, "~> 0.3.0"},
      {:hackney, github: "benoitc/hackney", override: true}
    ]
  end

When i downgrade to broadway_sqs 0.2.0 it forms the request url successfully:

18:41:13.192 [debug] ExAws: Request URL: "https://sqs.us-east-2.amazonaws.com/second" HEADERS: [{"Authorization", "AWS4-HMAC-SHA256 Credential=x/20190924/us-east-2/sqs/aws4_request,SignedHeaders=content-encoding;content-type;host;x-amz-date,Signature=z"}, {"host", "sqs.us-east-2.amazonaws.com"}, {"x-amz-date", "20190924T224113Z"}, {"content-type", "application/x-www-form-urlencoded"}, {"content-encoding", "identity"}] BODY: "Action=ReceiveMessage&MaxNumberOfMessages=10" ATTEMPT: 1

You can see that second gets appended to the base sqs url.

SQS Partitioning Unexpected Behavior

Hello,
When using partitioning on a FIFO queue on SQS, I am seeing some unexpected behavior. My broadway config is:

[
      producer: [
        module: {
          MyModule,
          queue_url: "http://my/queue/,
          on_failure: :ack,
          wait_time_seconds: 20,
          max_number_of_messages: 10,
          receive_interval: 10,
          attribute_names: :all,
          config: my_aws_config
        },
        concurrency: 1
      ],
      processors: [
        default: [
          concurrency: 75,
          max_demand: 1
          partition_by: &partition/1 #see below
        ]
      ],
      batchers: [
        default: [
          batch_size: 100, 
          batch_timeout: 300,
          concurrency: 3
        ]
      ]
    ]

The partition function set on the processors is:

def partition(%Message{
        metadata: %{
          attributes: %{
            "message_group_id" => message_group_id
          }
        }
      }) do
    :erlang.phash2(message_group_id)
  end

When running the pipeline like this, the observed behavior is:

  1. N available messages are pulled down from SQS (all with the same message group id). Assume the queue is empty after this poll
  2. The partition function is run immediately on all of the messages
  3. The first message is sent to handle_message()
  4. The first message is sent to handle_batch() and ack'd / deleted
  5. We wait wait_time_seconds
  6. The second message is processed starting at step 3 and the loop continues

From the behavior, it seems like in step 5 we are sending a poll request to SQS for more messages. Since none exist on the queue, it is waiting for the full duration of wait_time_seconds. In my opinion, since we already have messages ready to be processed from the previous poll, we shouldn't need to send a request to SQS for more messages yet. To counteract this behavior, I have set max_demand: 10 on the processor config and it seems to fix the problem as far as I can tell. Even though a fix exists, I think the existing behavior is confusing and most likely not what a user expects to happen. Is there something wrong with my config that I should be changing instead?

Thanks!

Tests failing on updated nimble_options version

When running mix deps.update --all on this project and then mix test 2 new tests are failing.

  1) test prepare_for_start/2 validation when the queue url is not present (BroadwaySQS.BroadwaySQS.ProducerTest)
     test/broadway_sqs/producer_test.exs:89
     Wrong message for ArgumentError
     expected:
       "invalid configuration given to SQSBroadway.prepare_for_start/2, required option :queue_url not found, received options: []"
     actual:
       "invalid configuration given to SQSBroadway.prepare_for_start/2, required :queue_url option not found, received options: []"
     code: assert_raise(ArgumentError, message, fn ->
     stacktrace:
       test/broadway_sqs/producer_test.exs:95: (test)

This appears to be related to the allowed mime version in hackney.

Maybe this is not an issue, but if a project is using this package then the tests would effectively fail when the packages are updated?

I am a little paranoid as we want to start using this library.

Use a local SQS instance

Hello there!

I started trying Broadway [and BroadwaySqs] Today. Whenever I use SQS, I try using a local instance via ElasticMQ for development purposes via docker (docker run -p 9324:9324 -p 9325:9325 softwaremill/elasticmq-native).

Is it possible to point to a local ElasticMQ instance using BroadwaySqs? I don't see anything related in the docs.

I tried setting the queue_url parameter to http://localhost:9324/<queue_name>, but I got the following traceback:

22:14:43.438 [error] GenServer ExAws.Config.AuthCache terminating
** (RuntimeError) Instance Meta Error: {:error, %{reason: :timeout}}

You tried to access the AWS EC2 instance meta, but it could not be reached.
This happens most often when trying to access it from your local computer,
which happens when environment variables are not set correctly prompting
ExAws to fallback to the Instance Meta.

Please check your key config and make sure they're configured correctly:

For Example:
ExAws.Config.new(:s3)
ExAws.Config.new(:dynamodb)

    (ex_aws 2.1.7) lib/ex_aws/instance_meta.ex:29: ExAws.InstanceMeta.request/2
    (ex_aws 2.1.7) lib/ex_aws/instance_meta.ex:66: ExAws.InstanceMeta.instance_role_credentials/1
    (ex_aws 2.1.7) lib/ex_aws/instance_meta.ex:74: ExAws.InstanceMeta.security_credentials/1
    (ex_aws 2.1.7) lib/ex_aws/config/auth_cache.ex:83: ExAws.Config.AuthCache.refresh_config/2
    (ex_aws 2.1.7) lib/ex_aws/config/auth_cache.ex:44: ExAws.Config.AuthCache.handle_call/3
    (stdlib 3.11.2) gen_server.erl:661: :gen_server.try_handle_call/4
    (stdlib 3.11.2) gen_server.erl:690: :gen_server.handle_msg/6
    (stdlib 3.11.2) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message (from WorldTemp.TempProcessor.Broadway.Producer_0): {:refresh_config, %{access_key_id: [{:system, "AWS_ACCESS_KEY_ID"}, :instance_role], host: "sqs.us-east-1.amazonaws.com", http_client: ExAws.Request.Hackney, json_codec: Jason, normalize_path: true, port: 443, region: "us-east-1", retries: [max_attempts: 10, base_backoff_in_ms: 10, max_backoff_in_ms: 10000], scheme: "https://", secret_access_key: [{:system, "AWS_SECRET_ACCESS_KEY"}, :instance_role]}}
State: ExAws.Config.AuthCache
Client WorldTemp.TempProcessor.Broadway.Producer_0 is alive

    (stdlib 3.11.2) gen.erl:167: :gen.do_call/4
    (elixir 1.10.2) lib/gen_server.ex:1020: GenServer.call/3
    (ex_aws 2.1.7) lib/ex_aws/config.ex:84: ExAws.Config.retrieve_runtime_value/2
    (elixir 1.10.2) lib/stream.ex:572: anonymous fn/4 in Stream.map/2
    (elixir 1.10.2) lib/enum.ex:3686: Enumerable.List.reduce/3
    (elixir 1.10.2) lib/stream.ex:1609: Enumerable.Stream.do_each/4
    (elixir 1.10.2) lib/enum.ex:959: Enum.find/3
    (ex_aws 2.1.7) lib/ex_aws/config.ex:71: anonymous fn/2 in ExAws.Config.retrieve_runtime_config/1

22:14:43.448 [error] GenServer WorldTemp.TempProcessor.Broadway.Producer_0 terminating
** (stop) exited in: GenServer.call(ExAws.Config.AuthCache, {:refresh_config, %{access_key_id: [{:system, "AWS_ACCESS_KEY_ID"}, :instance_role], host: "sqs.us-east-1.amazonaws.com", http_client: ExAws.Request.Hackney, json_codec: Jason, normalize_path: true, port: 443, region: "us-east-1", retries: [max_attempts: 10, base_backoff_in_ms: 10, max_backoff_in_ms: 10000], scheme: "https://", secret_access_key: [{:system, "AWS_SECRET_ACCESS_KEY"}, :instance_role]}}, 30000)
    ** (EXIT) an exception was raised:
        ** (RuntimeError) Instance Meta Error: {:error, %{reason: :timeout}}

I noticed it is assuming the region us-east-1, so I tried adding the region parameter as part of the producer config (config: [region: "elasticmq"]):

22:17:36.260 [error] GenServer ExAws.Config.AuthCache terminating
** (RuntimeError) Instance Meta Error: {:error, %{reason: :timeout}}

You tried to access the AWS EC2 instance meta, but it could not be reached.
This happens most often when trying to access it from your local computer,
which happens when environment variables are not set correctly prompting
ExAws to fallback to the Instance Meta.

Please check your key config and make sure they're configured correctly:

For Example:
ExAws.Config.new(:s3)
ExAws.Config.new(:dynamodb)

    (ex_aws 2.1.7) lib/ex_aws/instance_meta.ex:29: ExAws.InstanceMeta.request/2
    (ex_aws 2.1.7) lib/ex_aws/instance_meta.ex:66: ExAws.InstanceMeta.instance_role_credentials/1
    (ex_aws 2.1.7) lib/ex_aws/instance_meta.ex:74: ExAws.InstanceMeta.security_credentials/1
    (ex_aws 2.1.7) lib/ex_aws/config/auth_cache.ex:83: ExAws.Config.AuthCache.refresh_config/2
    (ex_aws 2.1.7) lib/ex_aws/config/auth_cache.ex:44: ExAws.Config.AuthCache.handle_call/3
    (stdlib 3.11.2) gen_server.erl:661: :gen_server.try_handle_call/4
    (stdlib 3.11.2) gen_server.erl:690: :gen_server.handle_msg/6
    (stdlib 3.11.2) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message (from WorldTemp.TempProcessor.Broadway.Producer_0): {:refresh_config, %{access_key_id: [{:system, "AWS_ACCESS_KEY_ID"}, :instance_role], host: nil, http_client: ExAws.Request.Hackney, json_codec: Jason, normalize_path: true, port: 443, region: "elasticmq", retries: [max_attempts: 10, base_backoff_in_ms: 10, max_backoff_in_ms: 10000], scheme: "https://", secret_access_key: [{:system, "AWS_SECRET_ACCESS_KEY"}, :instance_role]}}
State: ExAws.Config.AuthCache
Client WorldTemp.TempProcessor.Broadway.Producer_0 is alive

    (stdlib 3.11.2) gen.erl:167: :gen.do_call/4
    (elixir 1.10.2) lib/gen_server.ex:1020: GenServer.call/3
    (ex_aws 2.1.7) lib/ex_aws/config.ex:84: ExAws.Config.retrieve_runtime_value/2
    (elixir 1.10.2) lib/stream.ex:572: anonymous fn/4 in Stream.map/2
    (elixir 1.10.2) lib/enum.ex:3686: Enumerable.List.reduce/3
    (elixir 1.10.2) lib/stream.ex:1609: Enumerable.Stream.do_each/4
    (elixir 1.10.2) lib/enum.ex:959: Enum.find/3
    (ex_aws 2.1.7) lib/ex_aws/config.ex:71: anonymous fn/2 in ExAws.Config.retrieve_runtime_config/1

22:17:36.272 [error] GenServer WorldTemp.TempProcessor.Broadway.Producer_0 terminating
** (stop) exited in: GenServer.call(ExAws.Config.AuthCache, {:refresh_config, %{access_key_id: [{:system, "AWS_ACCESS_KEY_ID"}, :instance_role], host: nil, http_client: ExAws.Request.Hackney, json_codec: Jason, normalize_path: true, port: 443, region: "elasticmq", retries: [max_attempts: 10, base_backoff_in_ms: 10, max_backoff_in_ms: 10000], scheme: "https://", secret_access_key: [{:system, "AWS_SECRET_ACCESS_KEY"}, :instance_role]}}, 30000)
    ** (EXIT) an exception was raised:
        ** (RuntimeError) Instance Meta Error: {:error, %{reason: :timeout}}

I see host is now nil, yet I could not go much further. I tried setting scheme to http:// instead of https:// but it had no effect.

Then I tried following the source code, which led me to https://github.com/dashbitco/broadway_sqs/blob/master/lib/broadway_sqs/ex_aws_client.ex#L28

  @impl true
  def receive_messages(demand, opts) do
    receive_messages_opts = build_receive_messages_opts(opts, demand)

    opts.queue_url
    |> ExAws.SQS.receive_message(receive_messages_opts)
    |> ExAws.request(opts.config)
    |> wrap_received_messages(opts.ack_ref)
  end

Then I checked ExAws.SQS.receive_message/2, which relies on ExAws.SQS.request/3. This might not be only a BroadwaySQS issue because I don't see how would ExAws make the AWS host configurable.

Just so I wrap it up with a question: Is it possible to point BroadwaySQS to a local ElasticMQ instance?

P.S.: I haven't touched any Elixir code in many months, so I am probably missing some important detail here

Option to not hit live AWS when testing

I already have an AWSMock which I use in several places in my codebase. When testing BroadwaySQS, BroadwaySQS.ExAwsClient is used for calls and by that the app uses ExAws.request, is there a way to override this so I can use my mock in this setup?

handle_failed does not ack messages

Hey,

Thank you so much for the work and effort you've put into this package. It's really a breeze. I think i discovered an issue with the current :broadway_sqs ~> "0.5.0". According to the main documentation in broadway:

This callback must return the same messages given to it, possibly updated. For example, you could update the message data or use Broadway.Message.configure_ack/2 in a centralized place to configure how to ack the message based on the failure reason.

According to the code inside broadway_sqs/lib/broadway_sqs/ex_aws_client.ex:

  @impl true
  def ack(ack_ref, successful, _failed) do
    successful
    |> Enum.chunk_every(@max_num_messages_allowed_by_aws)
    |> Enum.each(fn messages -> delete_messages(messages, ack_ref) end)
  end

_failed is never used and the messages just remain in the queue. In the end i wrote this, just to clean up:

  defp get_ack_ref({_, ack_ref, _}) do
    ack_ref
  end

  def handle_failed(messages, context) do
    message = List.first(messages)
    message.acknowledger
    |> get_ack_ref()
    |> BroadwaySQS.ExAwsClient.ack(messages, [])

    messages
  end

I've seen you're discussing it and perhaps it wasn't implemented yet, but just incase anyone else comes across this or there is something i'm missing.

Thanks again!

Is it possible to use profile as authentication method?

Hi guys,

first of all, thanks for this project. I'm looking forward to test in an environment to consume around 1.5 million SQS messages per day.

I have been playing with it, but I didn't find a way to authenticate my app using AWS profile (like `~/.aws/config, [1]). If it's not possible, but you think that it would be nice to have it in the project, I'm wiling to send a PR.

[1] search for Setting an Alternate Credentials Profile in https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html

AWS.SimpleQueueService.BatchEntryIdsNotDistinct

When using non FIFO queues, it happens quite often that I receive the same message more than once. That's ok during processing, but when broadway wants to acknowledge the messages (which are in the same batch) if throws an error and move the messages to the DLQ.

Could we just filter out duplicates during acknowledging?

AWS SQS FIFO messages being processed out of order or in duplicate

I've been using broadway_sqs to consume AWS SQS FIFO queues and I noticed some unexpected behaviours when processing the messages since sometimes those were processed out of order or more than one time.

Initially I didn't had the Broadway partition_by configured and once I did that, things seemed to improve but I can still see some double processing and out of order processing occurring. For example, looking at the below logs – organized by process identifier to help readability – we can see that:

  • PID 383 consumed the second message from group C, without waiting for the first message of the same group to be acknowledge and thus removed from queue. Visibility timeout is of 10 seconds and even that wasn't guaranteed.
  • PID 419 processed the same fifth message two times.
16:39:34.610 [info] [#PID<0.382.0>] Handling message: "B" / "B1"
16:39:37.270 [info] [#PID<0.382.0>] Message acknowledge: "B" / "B1"
16:39:37.300 [info] [#PID<0.382.0>] Handling message: "B" / "B2"
16:39:39.633 [info] [#PID<0.382.0>] Message acknowledge: "B" / "B2"

16:39:34.610 [info] [#PID<0.427.0>] Handling message: "C" / "C1"
16:39:39.830 [info] [#PID<0.427.0>] Message processing failed : "C" / "C1"
16:39:39.830 [info] [#PID<0.427.0>] timeout: "C" / "C1"
16:39:39.830 [info] [#PID<0.427.0>] Handling message: "C" / "C2"
16:39:42.194 [info] [#PID<0.427.0>] Message acknowledge: "C" / "C2"
16:39:45.814 [info] [#PID<0.427.0>] Handling message: "C" / "C1"
16:39:47.528 [info] [#PID<0.427.0>] Message acknowledge: "C" / "C1"

16:39:34.610 [info] [#PID<0.419.0>] Handling message: "A" / "A1"
16:39:37.504 [info] [#PID<0.419.0>] Message acknowledge: "A" / "A1"
16:39:37.521 [info] [#PID<0.419.0>] Handling message: "A" / "A2"
16:39:40.242 [info] [#PID<0.419.0>] Message acknowledge: "A" / "A2"
16:39:40.263 [info] [#PID<0.419.0>] Handling message: "A" / "A3"
16:39:42.404 [info] [#PID<0.419.0>] Message acknowledge: "A" / "A3"
16:39:42.422 [info] [#PID<0.419.0>] Handling message: "A" / "A4"
16:39:44.933 [info] [#PID<0.419.0>] Message acknowledge: "A" / "A4"
16:39:44.954 [info] [#PID<0.419.0>] Handling message: "A" / "A5"
16:39:47.037 [info] [#PID<0.419.0>] Message acknowledge: "A" / "A5"
16:39:47.056 [info] [#PID<0.419.0>] Handling message: "A" / "A5"
16:39:50.001 [info] [#PID<0.419.0>] Message acknowledge: "A" / "A5"

Before setting up the partition_by the behaviour was even more awkward with different consumers handling messages from the same message_group_id:

16:25:33.076 [info] [#PID<0.423.0>] Handling message: "A" / "A1"
16:25:36.304 [info] [#PID<0.423.0>] Message acknowledge: "A" / "A1"
16:25:36.326 [info] [#PID<0.423.0>] Handling message: "A" / "A2"
16:25:38.859 [info] [#PID<0.423.0>] Message acknowledge: "A" / "A2"
16:25:38.869 [info] [#PID<0.423.0>] Handling message: "A" / "A3"
16:25:41.709 [info] [#PID<0.423.0>] Message acknowledge: "A" / "A3"
16:25:41.728 [info] [#PID<0.423.0>] Handling message: "A" / "A4"
16:25:44.206 [info] [#PID<0.423.0>] Message acknowledge: "A" / "A4"
16:25:44.230 [info] [#PID<0.423.0>] Handling message: "A" / "A5"
16:25:46.592 [info] [#PID<0.423.0>] Message acknowledge: "A" / "A5"
16:25:46.613 [info] [#PID<0.423.0>] Handling message: "C" / "C1"
16:25:48.219 [info] [#PID<0.423.0>] Message acknowledge: "C" / "C1"
16:25:48.233 [info] [#PID<0.423.0>] Handling message: "C" / "C2"
16:25:50.134 [info] [#PID<0.423.0>] Message acknowledge: "C" / "C2"
16:25:50.152 [info] [#PID<0.423.0>] Handling message: "B" / "B1"
16:25:52.073 [info] [#PID<0.423.0>] Message acknowledge: "B" / "B1"
16:25:52.087 [info] [#PID<0.423.0>] Handling message: "B" / "B2"
16:25:53.323 [info] [#PID<0.423.0>] Message acknowledge: "B" / "B2"

16:25:44.286 [info] [#PID<0.424.0>] Handling message: "C" / "C1"
16:25:46.866 [info] [#PID<0.424.0>] Message acknowledge: "C" / "C1"
16:25:46.877 [info] [#PID<0.424.0>] Handling message: "C" / "C2"
16:25:48.653 [info] [#PID<0.424.0>] Message acknowledge: "C" / "C2"
16:25:48.661 [info] [#PID<0.424.0>] Handling message: "A" / "A5"
16:25:50.938 [info] [#PID<0.424.0>] Message acknowledge: "A" / "A5"
16:25:50.958 [info] [#PID<0.424.0>] Handling message: "B" / "B1"
16:25:52.517 [info] [#PID<0.424.0>] Message acknowledge: "B" / "B1"
16:25:52.536 [info] [#PID<0.424.0>] Handling message: "B" / "B2"
16:25:54.045 [info] [#PID<0.424.0>] Message acknowledge: "B" / "B2"

My understanding is that AWS SQS FIFO queues, using the message_group_id, should guarantee message order within the same message group identifier and that once a message has been received, during its visibility timeout, no other consumer can receive the same message.

I'll leave here the code for my SQS producer:

defmodule ElixirBroadwayPlayground.SQSProducer do
  use Broadway

  require Logger

  alias Broadway.Message

  def start_link(config) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: {
          BroadwaySQS.Producer,
          queue_url: Keyword.get(config, :queue_url),
          receive_interval: 1000,
          on_success: :ack,
          on_failure: :noop,
          visibility_timeout: 10,
          attribute_names: [:message_group_id, :approximate_first_receive_timestamp]
        }
      ],
      processors: [
        default: [concurrency: Keyword.get(config, :num_workers, 1)]
      ],
      partition_by: &partition_by/1
    )
  end

  @impl true
  def handle_message(
        _,
        %Message{
          data: data,
          metadata: %{attributes: %{"message_group_id" => message_group_id}}
        } = message,
        _
      ) do
    log_event("Handling message", data, message_group_id)

    HTTPoison.get("https://swapi.dev/api/people/1", [{"Content-Type", "application/json"}],
      ssl: [verify: :verify_none]
    )
    |> handle_response(message)
  end

  defp handle_response(
         {:ok, _},
         %Message{
           data: data,
           metadata: %{attributes: %{"message_group_id" => message_group_id}}
         } = message
       ) do
    log_event("Message acknowledge", data, message_group_id)
    Message.ack_immediately(message)
  end

  defp handle_response(
         {:error, %HTTPoison.Error{reason: reason}},
         %Message{
           data: data,
           metadata: %{attributes: %{"message_group_id" => message_group_id}}
         } = message
       ) do
    log_event("Message processing failed ", data, message_group_id)
    log_event(reason, data, message_group_id)
    Message.failed(message, reason)
  end

  defp log_event(text, data, msg_group_id) do
    message_id = Jason.decode!(data)["id"]
    Logger.info("[#{inspect(self())}] #{text}: #{inspect(msg_group_id)} / #{inspect(message_id)}")
  end

  defp partition_by(%Message{
         metadata: %{attributes: %{"message_group_id" => message_group_id}}
       }) do
    :erlang.phash2(message_group_id)
  end
end

I'm I misinterpreting the behaviour that should be expected? Anyone has experienced the same behaviour?

Should we consider extending the `BroadwaySQS.SQSClient` behaviour?

As I mentioned in #9, I would like to extend the SQS visibility timeout dynamically at runtime which is possible through the underlying ExAws.SQS module. There are many ways it could be done but I wanted to know if you thought it would be an acceptable proposal to extend the Broadway.SQSClient behaviour in such a manner as;

@type message_visibility :: {receipt_handle :: String.t(), visibility_timeout :: pos_integer()}
@type message_visibility_batch_entry :: {unique_id :: String.t(), visibility :: message_visibility()}
@type message_visibility_batch :: [message_visibility_batch_entry()]

@callback change_message_visibility(queue_name :: String.t(), visibility :: message_visibility()) :: {:ok, response :: map()} | {:error, details :: map()}
@callback change_message_visibility_batch(queue_name :: String.t(), batch :: message_visibility_batch()) :: {:ok, response :: map()} | {:error, details :: map()}

Or do you consider this to be too specific? Ideas welcomed of course.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.