dashbitco / broadway_sqs Goto Github PK
View Code? Open in Web Editor NEWA Broadway producer for Amazon SQS
A Broadway producer for Amazon SQS
Hey all, I'm not sure where to start debugging this, but I have a bunch of broadway processes running, one of which consistently stops sending pulling in new messages in roughly a minute after launching the app.
I haven't gotten any error messages. Other queues keep running fine.
It feels like it's a bug, but it could very well be a configuration error on my side as well.
Here's the code i'm running, left out some of our business logic.
defp producer do
if Application.fetch_env!(:basement, :indexing_enabled) do
{BroadwaySQS.Producer, queue_url: @sqs_url}
else
# {BroadwaySQS.Producer, queue_url: @sqs_url}
{Broadway.DummyProducer, []}
end
end
def start_link(_opts) do
Logger.info("Starting #{inspect(__MODULE__)}")
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: producer()
],
processors: [
default: [concurrency: 10]
],
batchers: [
default: [concurrency: 1, batch_size: 10, batch_timeout: 1000]
]
)
end
def handle_message(_processor_name, message, _context) do
Logger.info(
"Basement.Indexers.Tasks.RefreshMetadataImagesTask: refreshing images for #{inspect(message.data)}"
)
process(message.data)
|> case do
:ok -> Message.put_batcher(message, :default)
{:error, error} -> Message.failed(message, error)
end
end
def handle_batch(:default, messages, _batch_info, _context) do
messages
end
def handle_failed(messages, _context) do
messages
|> Enum.each(fn %Message{status: status} ->
Logger.error(
"Indexers.Ethereum.BlockIngress message failed with status: #{inspect(status)}"
)
end)
messages
end
We are getting these errors on occasion (once or twice a week):
[error] ** (ExAws.Error) ExAws Request Error!
{:error, {:http_error, 400, %{code: "AWS.SimpleQueueService.BatchEntryIdsNotDistinct", detail: "", message: "Id 688a1987-8bce-4093-99ac-8acf3bebd17e repeated.", request_id: "7974caea-316e-53ce-8222-9dc58e5736ba", type: "Sender"}}}
(ex_aws 2.1.3) lib/ex_aws.ex:66: ExAws.request!/2
(elixir 1.10.2) lib/enum.ex:783: Enum."-each/2-lists^foreach/1-0-"/2
(elixir 1.10.2) lib/enum.ex:783: Enum.each/2
(elixir 1.10.2) lib/enum.ex:789: anonymous fn/3 in Enum.each/2
(stdlib 3.12.1) maps.erl:232: :maps.fold_1/3
(elixir 1.10.2) lib/enum.ex:2127: Enum.each/2
(broadway 0.6.0) lib/broadway/consumer.ex:64: Broadway.Consumer.handle_events/3
(gen_stage 1.0.0) lib/gen_stage.ex:2395: GenStage.consumer_dispatch/6
My first hunch was a duplicate message due to at-least-once behavior of standard queues. However, the crash repeats over and over until I go in to the SQS manager and delete the single message...
I'm a bit stumped at the root cause, but would a PR that runs uniq by id on the receipts before sending up the delete batch be welcome?
I was trying to setup one of my applications with broadway_sqs. It just does not start though. Here's the module that 'uses' Broadway:
defmodule MySQSListener do
use Broadway
alias Broadway.Message
def start_link(_opts) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {
BroadwaySQS.Producer,
queue_url:
"https://my-sqs-queue-url"
}
],
processors: [
default: []
],
batchers: [
sqs: [stages: 2, batch_size: 10]
]
)
end
end
I have added the MySQSListener as a child in my app supervisor like so:
children = [
# Few other children...
{MySQSListener, []}
]
Supervisor.start_link(children, strategy: :one_for_one)
Here's what my mix.exs deps looks like:
defp deps do
[
{:jason, ">= 1.0.0"},
{:tesla, "~> 1.2"},
{:mongodb, ">= 0.4.7"},
{:poolboy, "~> 1.5.1"},
{:bongo, "~> 0.1.0"},
{:distillery, "~> 2.0"},
{:hackney, "~> 1.9"},
{:plug_cowboy, "~> 2.0"},
{:appsignal, "~> 1.10.2"},
{:timex, "~> 3.5"},
{:quantum, "~> 2.3"},
{:broadway_sqs, "~> 0.5.0"}
]
end
And here's what is output just before the app crashes when i do iex -S mix:
** (Mix) Could not start application myapp: MyApp.Application.start(:normal, []) returned an error: shutdown: failed to start child: MySQSListener
** (EXIT) an exception was raised:
** (MatchError) no match of right hand side value: {:error, {:shutdown, {:failed_to_start_child, #Reference<0.3480402906.800849924.27293>, {:shutdown, {:failed_to_start_child, MySQSListener.Broadway.Producer_0, {:badarg, [{:ets, :match, [Broadway.TermStorage, {:"$1", %{config: [], queue_url: "https://myqueue-url"}}], []}, {Broadway.TermStorage, :find_by_term, 1, [file: 'lib/broadway/term_storage.ex', line: 64]}, {Broadway.TermStorage, :put, 1, [file: 'lib/broadway/term_storage.ex', line: 41]}, {BroadwaySQS.ExAwsClient, :init,1, [file: 'lib/broadway_sqs/ex_aws_client.ex', line: 32]}, {BroadwaySQS.Producer, :init, 1, [file: 'lib/broadway_sqs/producer.ex', line: 174]}, {Broadway.Producer, :init, 1, [file: 'lib/broadway/producer.ex', line: 71]}, {GenStage, :init, 1, [file: 'lib/gen_stage.ex', line: 1704]}, {:gen_server, :init_it, 2, [file: 'gen_server.erl', line: 374]}]}}}}}}
lib/broadway/server.ex:26: Broadway.Server.init/1
(stdlib) gen_server.erl:374: :gen_server.init_it/2
(stdlib) gen_server.erl:342: :gen_server.init_it/6
(stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Please let me know in case I'm going wrong somewhere?
I have a legacy application that uses GenStage
for processing messages from an SQS queue.
The processing takes a variable amount of time so I have implemented some sort of registry that handles the visibility timeout of messages. When a message arrives a timer that periodically extends its visibility is started. When the processing fails the visibility timeout is set to 0 and if instead the processing is successful the message is deleted from the queue. After some configured time the timer is stopped and the visibility set to 0.
I don't know if this is a common pattern and I don't think the whole "visibility registry" logic should be included in this project.
I, however, suggest some small additions to have more flexibility in situations like this.
The first is to have the Acknowledger
behavior in a separate module and let it be overridden by a config parameter. Regarding my use case, this would allow a user to perform custom actions on success/fail outcomes by implementing its own ack
logic.
The second one is to have an optional callback to be executed after wrap_received_messages
to let a user modify the received messages or perform actions. For instance, I will use it for starting the visibility renew timers. Maybe also the :transformer
option of producers
can be used for this but it is executed for every single message and not for the whole received list and I it is executed on the handle_demand
callback, not when actually the messages are received.
Hi 👋
Background
As you are aware, recently the maintainer of ex_aws
stepped away from the project and expressed some concerns with how the library is written. I understand that another maintainer has stepped up to take over the project, which is great and appreciated.
This lead to some discussion about the possible use of aws_elixir under the aws-beam GitHub organization as an alternative, which uses aws-codegen to generate client code from the AWS Go SDK specs. Manually written code is still necessary, but the codegen lightens the maintenance burden considerably.
These events, plus the need for me to find an AWS library that supports IMDSv2 (See here and here for some movement on that) has lead me to explore which library I want to use going forward, as well as to think through ways to ensure that my code is more isolated from changes to our external dependencies. It seems ideal if my code can wrap external dependencies and ensure that they conform to a specific behaviour, enabling me to swap out dependencies in the future with limited impact.
Proposal
With all of that as background, has there been any consideration to possibly removing the dependency on ex_aws_sqs
and allowing the choice of AWS SQS client to be swapped out by consumers of the library? I already have local adapters for an SQS behaviour that I use for dev, test and prod to wrap up some of my direct ExAws calls, but I need to extend this into our dependencies like Broadway SQS as well.
For reference, it looks like these are the current dependencies on ex_aws
and ex_aws_sqs
on master.
ExAws.SQS.send_message/2
ExAws.SQS.request/2
ExAws.SQS.request!/2
ExAws.SQS.create_queue/1
ExAws.SQS.receive_message/2
ExAws.SQS.delete_message_batch/2
ExAws.Request.HttpClient behaviour used in tests
Does this proposal sound reasonable to you? And if so, would you like help implementing the changes? Any input or guidance is greatly appreciated.
Thanks for your time and for an excellent project!
Discussion started here dashbitco/broadway#68.
Will I have to write a custom Producer to make this possible? Maybe with just a custom SqsClient?
Currently, the only option would be to create a custom SQSClient. However, we believe it would be a good idea to extend the API to allow this, so here's is a first suggestion:
We can provide a new config option for the client called token_generator
, which is a {M, f, a}
to be called before each receive/delete request. An initial implementation of this generator could be a GenServer
that periodically updates the token (maybe on an :ets
table with read_concurrency: true
?).
In case you see other scenarios where we might need to update other config options, we could even consider expanding this concept to a more generic one like extra_options_generator
that could be used to merge any other option to the config.
Thoughts?
I have the following module configured:
module:
{BroadwaySQS.Producer,
queue_url: "https://us-east-2.queue.amazonaws.com/100000000001/some_queue"
}
When I start the application, Broadway starts polling against a queue region based on its own config: region:
value or the ex_aws: region:
config value (default: us-east-1
):
[debug] ExAws: Request URL: "https://sqs.us-east-1.amazonaws.com/"
If I change the module config to the following (or the ex_aws: region:
value):
{BroadwaySQS.Producer,
queue_url: "https://us-east-2.queue.amazonaws.com/100000000001/some_queue",
config: [
region: "us-west-1"
]}
It polls the following queue:
[debug] ExAws: Request URL: "https://sqs.us-west-1.amazonaws.com/"
region
(strictly based on the queue_url
)?I'm not understanding the purpose of providing the full queue url (required) if it's just going to use the region from the region
setting. Thanks!
Hi all, getting this error when running a broadway app inside a docker container on AWS fargate. All the AWS credentials are being correctly set and passed to the producer:
Broadway.start_link(__MODULE__,
name: __MODULE__,
producers: [
default: [
module: {
BroadwaySQS.Producer,
attribute_names: :all,
message_attribute_names: :all,
queue_name: System.get_env("QUEUE_NAME"),
config: get_aws_credentials(),
receive_interval: 1000
}
]
],
processors: [
default: []
],
batchers: [
default: [
batch_size: 10,
batch_timeout: 2000
]
]
)
Where get_aws_credentials() is:
def get_aws_credentials do
region = System.get_env("AWS_REGION")
[
secret_access_key: System.get_env("AWS_SECRET_ACCESS_KEY"),
access_key_id: System.get_env("AWS_ACCESS_KEY_ID"),
region: region,
host: "sqs.#{region}.amazonaws.com",
scheme: "https"
]
end
All those env vars are present and correct inside the container.
I do notice that the queue url appears to not be constructed correctly (i think it's missing the AWS account id):
It's https://sqs.eu-west-1.amazonaws.com/my_queue
, when it should be https://sqs.eu-west-1.amazonaws.com/12345678/my_queue
(where 12345678 is the account id).
For completion this account has roles to be able to access test and production environments, so that could be partially the cause. Is there a setting to explicitly pass the account id to the producer?
Cheers!
Hello! 👋 We're currently in a situation where we're using broadway_sqs
0.2.0
, which is currently pulling in broadway 0.3.0
. We're interested in leveraging testing features that were introduced in 0.4.0
, such as Broadway.DummyProducer
.
For the moment we've recreated that producer in our project, but it'd be awesome if we could use Broadway 0.4.0
alongside broadway_sqs
.
EDIT: I created a PR for this!
$ mix deps.get
Resolving Hex dependencies...
Failed to use "broadway" (version 0.4.0) because
apps/manifold/mix.exs requires ~> 0.4.0
broadway_sqs (version 0.2.0) requires ~> 0.3.0
** (Mix) Hex dependency resolution failed, change the version requirements of your dependencies or unlock them (by using mix deps.update or mix deps.unlock). If you are unable to resolve the conflicts you can try overriding with {:dependency, "~> 1.0", override: true}
I think there's a bug in a way producer handles the demand.
Consider following scenario:
10
messages9
messages from the server herereceive
here9
messages to a consumer via return value of handle_demand
callbackhandle_receive_messages
kicks in, fetches message number 10
from the queue and... returns it into nowhere, since it isn't called from within a handle_demand
callback?I am not familiar with SQS, but I'd assume this would delay processing of message number 10
for at least the value of visibility timeout
I wrote a test to reproduce the bug, please find it here
Sorry for not providing a full fix, I don't quite understand how this feature was desgined to work.
Is there a way to send each message received in a batch to a different processor? From my local testing, it seems all messages received in a batch always go to the same processor (pid). I would expect that if I have producer concurrency = 1, and processor concurrency = 10, if I receive 10 messages each message would get processed in parallel. Instead, each message consumed in the batch is processed synchronously by the same processor. My config:
[
producer: [
module: {
BroadwaySQS.Producer,
queue_url: "http://example.com/myQueue",
wait_time_seconds: 20,
max_number_of_messages: 10,
receive_interval: 100,
},
concurrency: 1
],
processors: [
default: [
concurrency: 10
]
]
]
It seems like the only way to do what I am looking for is to set max_number_of_messages
to 1
, which will come with significant performance and cost implications.
Am I missing anything?
Thanks!
Hey guys, great work on both Broadway and BSQS!
I was reading the documentation and it was not immediately obvious to me how the messages get acknowledged (or not) back to AWS in the handle_batch function.
Am i correct to assume that any messages that are returned as the return value get acknowledged and those who are not do not? If so, i think the phrasing on the docs could reflect this a bit better for newcomers. If not, do i have to somehow change the message to make sure it doesn't get acknowledged when i fail to process it?
Thanks in advance!
Pedro
With the upgrade to ex_aws_sqs
from 2.x
to 3.x
, the example app is broken. Only the queue name is given, not the queue url as required now.
Hey guys,
First of all. Thanks a lot for putting so much work into this and making it available for us.
What happened with us is that we deployed to production Broadway + Broadway SQS and we noticed that the messages just wasn't being "acknowledged" on the AWS side so all messages were coming through but they weren't being deleted so they just stayed as "in flight" and then appeared again to be processed and eventually went to DLQ.
We spent many many hours trying to understand what was wrong because since there was no errors, exceptions of any sort on our logs... everything seemed to be just fine.
Naturally, we suspected that something was wrong with our implementation, read and re-read docs and examples everywhere, checked networking, CPU resources, everything! But in the end the AWS ACCESS KEY used to connect to SQS just didn't have enough permissions to "delete" messages or delete messages in batch (I didn't created the API key myself). As always the error was really stupid but since it wasn't obvious straight away we spent many hours looking at the wrong places.
My goal with this issue is at first to document what happened to us but also let you know that these silent errors can also be very tricky. Would be nice to have on the docs or to have a option on the client like a debug mode so we could have more verbose information regarding those "silent" errors during desperate times.
Thanks!
According to AWS documentation there is support for wildcards when specifying which message attributes to retrieve.
For testing purposes we use https://github.com/softwaremill/elasticmq which doesn't seem to support the All
parameter but does indeed support .*
I've tried specifying it as message_attribute_names
and it works as expected. Would it be useful to add it to the documentation?
producer: [
module: {BroadwaySQS.Producer,
queue_url: "https://sqs.amazonaws.com/0000000000/my_queue",
message_attribute_names: [".*"],
}
]
Hi, I'm trying to get broadway working with localstack and was having some trouble. I specify queue_name
as http:/localhost:4576/queue/events
.
When requests are made the following url is used:
https://sqs.us-east-1.amazonaws.com/http:/localhost:4576/queue/events
resulting in a 404.
This PR adds the ability to specify the queue url instead which could be used to specify the exact url to work with localstack. This functionality is included in release 3.
My application consumes messages from multiple SQS queues. If a queue is deleted, I get an error message like this:
[error] Unable to fetch events from AWS. Reason: {:http_error, 400, %{
code: "AWS.SimpleQueueService.NonExistentQueue",
message: "The specified queue does not exist for this wsdl version.",
type: "Sender",
request_id: "5436c6c6-1c58-5e85-bf11-9b5c27e3bc93",
detail: nil
}}
However, I can't tell which queue the error refers to because AWS doesn't include its name in the response neither broadway_sqs
includes it in the error message:
defp wrap_received_messages({:error, reason}, _) do
Logger.error("Unable to fetch events from AWS. Reason: #{inspect(reason)}")
[]
end
Would it be possible to include the opts.queue_url
in the error message?
It appears that in wrap_messages only the body
from the message ExAws generates here is kept. I have a situation with an encrypted body that delivers the initialization vector needed for decryption in a MessageAttribute. Is there some other way of accessing this information?
The following worker (c&p from the docs) fails to connect to the queue.
defmodule MyBroadway.Worker do
use Broadway
def start_link(_) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producers: [
default: [
module:
{BroadwaySQS.Producer,
queue_name: "second",
config: [
access_key_id: "x",
secret_access_key: "y",
region: "us-east-2"
]}
]
],
processors: [
default: []
],
batchers: [
default: [
batch_size: 10,
batch_timeout: 2000
]
]
)
end
def handle_message(_, message, _) do
receipt = %{
id: message.metadata.message_id,
receipt_handle: message.metadata.receipt_handle
}
IO.inspect(receipt)
end
end
It gets the following errors:
18:33:39.373 [debug] ExAws: Request URL: "https://sqs.us-east-2.amazonaws.com/" HEADERS: [{"Authorization", "AWS4-HMAC-SHA256 Credential=x/20190924/us-east-2/sqs/aws4
_request,SignedHeaders=content-encoding;content-type;host;x-amz-date,Signature=z"}, {"host", "sqs.us-east-2.amazonaws.com"
}, {"x-amz-date", "20190924T223339Z"}, {"content-type", "application/x-www-form-urlencoded"}, {"content-encoding", "identity"}] BODY: "Action=ReceiveMessage&MaxNumberOfMessages=10&Queue
Url=second" ATTEMPT: 1
18:33:39.420 [error] GenServer MyBroadway.Worker.Broadway.Producer_default_1 terminating
** (UndefinedFunctionError) function Poison.decode/1 is undefined (module Poison is not available)
Poison.decode("<?xml version=\"1.0\"?><ErrorResponse xmlns=\"http://queue.amazonaws.com/doc/2012-11-05/\"><Error><Type>Sender</Type><Code>InvalidAddress</Code><Message>The address s
econd is not valid for this endpoint.</Message><Detail/></Error><RequestId>4939ea47-c4a1-5715-934f-bc4358a24cee</RequestId></ErrorResponse>")
(ex_aws) lib/ex_aws/request.ex:108: ExAws.Request.client_error/2
(ex_aws) lib/ex_aws/request.ex:57: ExAws.Request.request_and_retry/7
(ex_aws) lib/ex_aws/operation/query.ex:41: ExAws.Operation.ExAws.Operation.Query.perform/2
(broadway_sqs) lib/broadway_sqs/ex_aws_client.ex:50: BroadwaySQS.ExAwsClient.receive_messages/2
(broadway_sqs) lib/broadway_sqs/producer.ex:196: BroadwaySQS.Producer.handle_receive_messages/1
(broadway) lib/broadway/producer.ex:66: Broadway.Producer.handle_demand/2
(gen_stage) lib/gen_stage.ex:2099: GenStage.noreply_callback/3
Last message: {:"$gen_producer", {#PID<0.316.0>, #Reference<0.1588154580.3783000066.245470>}, {:ask, 10}}
State: %{consumers: [{#PID<0.316.0>, #Reference<0.1588154580.3783000066.245470>}], module: BroadwaySQS.Producer, module_state: %{demand: 0, receive_interval: 5000, receive_timer: nil, s
qs_client: {BroadwaySQS.ExAwsClient, %{ack_ref: #Reference<0.1588154580.3783000065.247160>, config: [access_key_id: "x", secret_access_key: "y", region: "us-east-2"], queue_name: "second", receive_messages_opts: [max_number_of_messages: 10]}}}, transformer: nil}
These are my deps
defp deps do
[
{:broadway_sqs, "~> 0.3.0"},
{:hackney, github: "benoitc/hackney", override: true}
]
end
When i downgrade to broadway_sqs 0.2.0 it forms the request url successfully:
18:41:13.192 [debug] ExAws: Request URL: "https://sqs.us-east-2.amazonaws.com/second" HEADERS: [{"Authorization", "AWS4-HMAC-SHA256 Credential=x/20190924/us-east-2/sqs/aws4_request,SignedHeaders=content-encoding;content-type;host;x-amz-date,Signature=z"}, {"host", "sqs.us-east-2.amazonaws.com"}, {"x-amz-date", "20190924T224113Z"}, {"content-type", "application/x-www-form-urlencoded"}, {"content-encoding", "identity"}] BODY: "Action=ReceiveMessage&MaxNumberOfMessages=10" ATTEMPT: 1
You can see that second
gets appended to the base sqs url.
Hello,
When using partitioning on a FIFO queue on SQS, I am seeing some unexpected behavior. My broadway config is:
[
producer: [
module: {
MyModule,
queue_url: "http://my/queue/,
on_failure: :ack,
wait_time_seconds: 20,
max_number_of_messages: 10,
receive_interval: 10,
attribute_names: :all,
config: my_aws_config
},
concurrency: 1
],
processors: [
default: [
concurrency: 75,
max_demand: 1
partition_by: &partition/1 #see below
]
],
batchers: [
default: [
batch_size: 100,
batch_timeout: 300,
concurrency: 3
]
]
]
The partition function set on the processors is:
def partition(%Message{
metadata: %{
attributes: %{
"message_group_id" => message_group_id
}
}
}) do
:erlang.phash2(message_group_id)
end
When running the pipeline like this, the observed behavior is:
N
available messages are pulled down from SQS (all with the same message group id). Assume the queue is empty after this pollhandle_message()
handle_batch()
and ack'd / deletedwait_time_seconds
From the behavior, it seems like in step 5 we are sending a poll request to SQS for more messages. Since none exist on the queue, it is waiting for the full duration of wait_time_seconds
. In my opinion, since we already have messages ready to be processed from the previous poll, we shouldn't need to send a request to SQS for more messages yet. To counteract this behavior, I have set max_demand: 10
on the processor config and it seems to fix the problem as far as I can tell. Even though a fix exists, I think the existing behavior is confusing and most likely not what a user expects to happen. Is there something wrong with my config that I should be changing instead?
Thanks!
When running mix deps.update --all
on this project and then mix test
2 new tests are failing.
1) test prepare_for_start/2 validation when the queue url is not present (BroadwaySQS.BroadwaySQS.ProducerTest)
test/broadway_sqs/producer_test.exs:89
Wrong message for ArgumentError
expected:
"invalid configuration given to SQSBroadway.prepare_for_start/2, required option :queue_url not found, received options: []"
actual:
"invalid configuration given to SQSBroadway.prepare_for_start/2, required :queue_url option not found, received options: []"
code: assert_raise(ArgumentError, message, fn ->
stacktrace:
test/broadway_sqs/producer_test.exs:95: (test)
This appears to be related to the allowed mime version in hackney.
Maybe this is not an issue, but if a project is using this package then the tests would effectively fail when the packages are updated?
I am a little paranoid as we want to start using this library.
Hello there!
I started trying Broadway [and BroadwaySqs] Today. Whenever I use SQS, I try using a local instance via ElasticMQ for development purposes via docker (docker run -p 9324:9324 -p 9325:9325 softwaremill/elasticmq-native
).
Is it possible to point to a local ElasticMQ instance using BroadwaySqs? I don't see anything related in the docs.
I tried setting the queue_url parameter to http://localhost:9324/<queue_name>
, but I got the following traceback:
22:14:43.438 [error] GenServer ExAws.Config.AuthCache terminating
** (RuntimeError) Instance Meta Error: {:error, %{reason: :timeout}}
You tried to access the AWS EC2 instance meta, but it could not be reached.
This happens most often when trying to access it from your local computer,
which happens when environment variables are not set correctly prompting
ExAws to fallback to the Instance Meta.
Please check your key config and make sure they're configured correctly:
For Example:
ExAws.Config.new(:s3)
ExAws.Config.new(:dynamodb)
(ex_aws 2.1.7) lib/ex_aws/instance_meta.ex:29: ExAws.InstanceMeta.request/2
(ex_aws 2.1.7) lib/ex_aws/instance_meta.ex:66: ExAws.InstanceMeta.instance_role_credentials/1
(ex_aws 2.1.7) lib/ex_aws/instance_meta.ex:74: ExAws.InstanceMeta.security_credentials/1
(ex_aws 2.1.7) lib/ex_aws/config/auth_cache.ex:83: ExAws.Config.AuthCache.refresh_config/2
(ex_aws 2.1.7) lib/ex_aws/config/auth_cache.ex:44: ExAws.Config.AuthCache.handle_call/3
(stdlib 3.11.2) gen_server.erl:661: :gen_server.try_handle_call/4
(stdlib 3.11.2) gen_server.erl:690: :gen_server.handle_msg/6
(stdlib 3.11.2) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message (from WorldTemp.TempProcessor.Broadway.Producer_0): {:refresh_config, %{access_key_id: [{:system, "AWS_ACCESS_KEY_ID"}, :instance_role], host: "sqs.us-east-1.amazonaws.com", http_client: ExAws.Request.Hackney, json_codec: Jason, normalize_path: true, port: 443, region: "us-east-1", retries: [max_attempts: 10, base_backoff_in_ms: 10, max_backoff_in_ms: 10000], scheme: "https://", secret_access_key: [{:system, "AWS_SECRET_ACCESS_KEY"}, :instance_role]}}
State: ExAws.Config.AuthCache
Client WorldTemp.TempProcessor.Broadway.Producer_0 is alive
(stdlib 3.11.2) gen.erl:167: :gen.do_call/4
(elixir 1.10.2) lib/gen_server.ex:1020: GenServer.call/3
(ex_aws 2.1.7) lib/ex_aws/config.ex:84: ExAws.Config.retrieve_runtime_value/2
(elixir 1.10.2) lib/stream.ex:572: anonymous fn/4 in Stream.map/2
(elixir 1.10.2) lib/enum.ex:3686: Enumerable.List.reduce/3
(elixir 1.10.2) lib/stream.ex:1609: Enumerable.Stream.do_each/4
(elixir 1.10.2) lib/enum.ex:959: Enum.find/3
(ex_aws 2.1.7) lib/ex_aws/config.ex:71: anonymous fn/2 in ExAws.Config.retrieve_runtime_config/1
22:14:43.448 [error] GenServer WorldTemp.TempProcessor.Broadway.Producer_0 terminating
** (stop) exited in: GenServer.call(ExAws.Config.AuthCache, {:refresh_config, %{access_key_id: [{:system, "AWS_ACCESS_KEY_ID"}, :instance_role], host: "sqs.us-east-1.amazonaws.com", http_client: ExAws.Request.Hackney, json_codec: Jason, normalize_path: true, port: 443, region: "us-east-1", retries: [max_attempts: 10, base_backoff_in_ms: 10, max_backoff_in_ms: 10000], scheme: "https://", secret_access_key: [{:system, "AWS_SECRET_ACCESS_KEY"}, :instance_role]}}, 30000)
** (EXIT) an exception was raised:
** (RuntimeError) Instance Meta Error: {:error, %{reason: :timeout}}
I noticed it is assuming the region us-east-1
, so I tried adding the region parameter as part of the producer config (config: [region: "elasticmq"]
):
22:17:36.260 [error] GenServer ExAws.Config.AuthCache terminating
** (RuntimeError) Instance Meta Error: {:error, %{reason: :timeout}}
You tried to access the AWS EC2 instance meta, but it could not be reached.
This happens most often when trying to access it from your local computer,
which happens when environment variables are not set correctly prompting
ExAws to fallback to the Instance Meta.
Please check your key config and make sure they're configured correctly:
For Example:
ExAws.Config.new(:s3)
ExAws.Config.new(:dynamodb)
(ex_aws 2.1.7) lib/ex_aws/instance_meta.ex:29: ExAws.InstanceMeta.request/2
(ex_aws 2.1.7) lib/ex_aws/instance_meta.ex:66: ExAws.InstanceMeta.instance_role_credentials/1
(ex_aws 2.1.7) lib/ex_aws/instance_meta.ex:74: ExAws.InstanceMeta.security_credentials/1
(ex_aws 2.1.7) lib/ex_aws/config/auth_cache.ex:83: ExAws.Config.AuthCache.refresh_config/2
(ex_aws 2.1.7) lib/ex_aws/config/auth_cache.ex:44: ExAws.Config.AuthCache.handle_call/3
(stdlib 3.11.2) gen_server.erl:661: :gen_server.try_handle_call/4
(stdlib 3.11.2) gen_server.erl:690: :gen_server.handle_msg/6
(stdlib 3.11.2) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message (from WorldTemp.TempProcessor.Broadway.Producer_0): {:refresh_config, %{access_key_id: [{:system, "AWS_ACCESS_KEY_ID"}, :instance_role], host: nil, http_client: ExAws.Request.Hackney, json_codec: Jason, normalize_path: true, port: 443, region: "elasticmq", retries: [max_attempts: 10, base_backoff_in_ms: 10, max_backoff_in_ms: 10000], scheme: "https://", secret_access_key: [{:system, "AWS_SECRET_ACCESS_KEY"}, :instance_role]}}
State: ExAws.Config.AuthCache
Client WorldTemp.TempProcessor.Broadway.Producer_0 is alive
(stdlib 3.11.2) gen.erl:167: :gen.do_call/4
(elixir 1.10.2) lib/gen_server.ex:1020: GenServer.call/3
(ex_aws 2.1.7) lib/ex_aws/config.ex:84: ExAws.Config.retrieve_runtime_value/2
(elixir 1.10.2) lib/stream.ex:572: anonymous fn/4 in Stream.map/2
(elixir 1.10.2) lib/enum.ex:3686: Enumerable.List.reduce/3
(elixir 1.10.2) lib/stream.ex:1609: Enumerable.Stream.do_each/4
(elixir 1.10.2) lib/enum.ex:959: Enum.find/3
(ex_aws 2.1.7) lib/ex_aws/config.ex:71: anonymous fn/2 in ExAws.Config.retrieve_runtime_config/1
22:17:36.272 [error] GenServer WorldTemp.TempProcessor.Broadway.Producer_0 terminating
** (stop) exited in: GenServer.call(ExAws.Config.AuthCache, {:refresh_config, %{access_key_id: [{:system, "AWS_ACCESS_KEY_ID"}, :instance_role], host: nil, http_client: ExAws.Request.Hackney, json_codec: Jason, normalize_path: true, port: 443, region: "elasticmq", retries: [max_attempts: 10, base_backoff_in_ms: 10, max_backoff_in_ms: 10000], scheme: "https://", secret_access_key: [{:system, "AWS_SECRET_ACCESS_KEY"}, :instance_role]}}, 30000)
** (EXIT) an exception was raised:
** (RuntimeError) Instance Meta Error: {:error, %{reason: :timeout}}
I see host is now nil, yet I could not go much further. I tried setting scheme to http://
instead of https://
but it had no effect.
Then I tried following the source code, which led me to https://github.com/dashbitco/broadway_sqs/blob/master/lib/broadway_sqs/ex_aws_client.ex#L28
@impl true
def receive_messages(demand, opts) do
receive_messages_opts = build_receive_messages_opts(opts, demand)
opts.queue_url
|> ExAws.SQS.receive_message(receive_messages_opts)
|> ExAws.request(opts.config)
|> wrap_received_messages(opts.ack_ref)
end
Then I checked ExAws.SQS.receive_message/2, which relies on ExAws.SQS.request/3. This might not be only a BroadwaySQS issue because I don't see how would ExAws make the AWS host configurable.
Just so I wrap it up with a question: Is it possible to point BroadwaySQS to a local ElasticMQ instance?
P.S.: I haven't touched any Elixir code in many months, so I am probably missing some important detail here
I am using standard sqs, this won't delete the message after reading the message from the queue. In broadwat_sqs how I will delete message manually from the queue.
I already have an AWSMock
which I use in several places in my codebase. When testing BroadwaySQS, BroadwaySQS.ExAwsClient
is used for calls and by that the app uses ExAws.request
, is there a way to override this so I can use my mock in this setup?
Hey,
Thank you so much for the work and effort you've put into this package. It's really a breeze. I think i discovered an issue with the current :broadway_sqs ~> "0.5.0"
. According to the main documentation in broadway:
This callback must return the same messages given to it, possibly updated. For example, you could update the message data or use Broadway.Message.configure_ack/2 in a centralized place to configure how to ack the message based on the failure reason.
According to the code inside broadway_sqs/lib/broadway_sqs/ex_aws_client.ex
:
@impl true
def ack(ack_ref, successful, _failed) do
successful
|> Enum.chunk_every(@max_num_messages_allowed_by_aws)
|> Enum.each(fn messages -> delete_messages(messages, ack_ref) end)
end
_failed
is never used and the messages just remain in the queue. In the end i wrote this, just to clean up:
defp get_ack_ref({_, ack_ref, _}) do
ack_ref
end
def handle_failed(messages, context) do
message = List.first(messages)
message.acknowledger
|> get_ack_ref()
|> BroadwaySQS.ExAwsClient.ack(messages, [])
messages
end
I've seen you're discussing it and perhaps it wasn't implemented yet, but just incase anyone else comes across this or there is something i'm missing.
Thanks again!
As we did for Cloud Pub/Sub (dashbitco/broadway_cloud_pub_sub#37), should we look to implement pipeline-specific connection pools for BroadwaySQS?
I was about to start on a PR for this when I realized it might be a little more involved than the Pub/Sub change, as we don't have a hard requirement on :hackney
here. Should we add one?
Hi guys,
first of all, thanks for this project. I'm looking forward to test in an environment to consume around 1.5 million SQS messages per day.
I have been playing with it, but I didn't find a way to authenticate my app using AWS profile (like `~/.aws/config, [1]). If it's not possible, but you think that it would be nice to have it in the project, I'm wiling to send a PR.
[1] search for Setting an Alternate Credentials Profile
in https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html
When using non FIFO queues, it happens quite often that I receive the same message more than once. That's ok during processing, but when broadway wants to acknowledge the messages (which are in the same batch) if throws an error and move the messages to the DLQ.
Could we just filter out duplicates during acknowledging?
I've been using broadway_sqs
to consume AWS SQS FIFO queues and I noticed some unexpected behaviours when processing the messages since sometimes those were processed out of order or more than one time.
Initially I didn't had the Broadway partition_by configured and once I did that, things seemed to improve but I can still see some double processing and out of order processing occurring. For example, looking at the below logs – organized by process identifier to help readability – we can see that:
16:39:34.610 [info] [#PID<0.382.0>] Handling message: "B" / "B1"
16:39:37.270 [info] [#PID<0.382.0>] Message acknowledge: "B" / "B1"
16:39:37.300 [info] [#PID<0.382.0>] Handling message: "B" / "B2"
16:39:39.633 [info] [#PID<0.382.0>] Message acknowledge: "B" / "B2"
16:39:34.610 [info] [#PID<0.427.0>] Handling message: "C" / "C1"
16:39:39.830 [info] [#PID<0.427.0>] Message processing failed : "C" / "C1"
16:39:39.830 [info] [#PID<0.427.0>] timeout: "C" / "C1"
16:39:39.830 [info] [#PID<0.427.0>] Handling message: "C" / "C2"
16:39:42.194 [info] [#PID<0.427.0>] Message acknowledge: "C" / "C2"
16:39:45.814 [info] [#PID<0.427.0>] Handling message: "C" / "C1"
16:39:47.528 [info] [#PID<0.427.0>] Message acknowledge: "C" / "C1"
16:39:34.610 [info] [#PID<0.419.0>] Handling message: "A" / "A1"
16:39:37.504 [info] [#PID<0.419.0>] Message acknowledge: "A" / "A1"
16:39:37.521 [info] [#PID<0.419.0>] Handling message: "A" / "A2"
16:39:40.242 [info] [#PID<0.419.0>] Message acknowledge: "A" / "A2"
16:39:40.263 [info] [#PID<0.419.0>] Handling message: "A" / "A3"
16:39:42.404 [info] [#PID<0.419.0>] Message acknowledge: "A" / "A3"
16:39:42.422 [info] [#PID<0.419.0>] Handling message: "A" / "A4"
16:39:44.933 [info] [#PID<0.419.0>] Message acknowledge: "A" / "A4"
16:39:44.954 [info] [#PID<0.419.0>] Handling message: "A" / "A5"
16:39:47.037 [info] [#PID<0.419.0>] Message acknowledge: "A" / "A5"
16:39:47.056 [info] [#PID<0.419.0>] Handling message: "A" / "A5"
16:39:50.001 [info] [#PID<0.419.0>] Message acknowledge: "A" / "A5"
Before setting up the partition_by the behaviour was even more awkward with different consumers handling messages from the same message_group_id:
16:25:33.076 [info] [#PID<0.423.0>] Handling message: "A" / "A1"
16:25:36.304 [info] [#PID<0.423.0>] Message acknowledge: "A" / "A1"
16:25:36.326 [info] [#PID<0.423.0>] Handling message: "A" / "A2"
16:25:38.859 [info] [#PID<0.423.0>] Message acknowledge: "A" / "A2"
16:25:38.869 [info] [#PID<0.423.0>] Handling message: "A" / "A3"
16:25:41.709 [info] [#PID<0.423.0>] Message acknowledge: "A" / "A3"
16:25:41.728 [info] [#PID<0.423.0>] Handling message: "A" / "A4"
16:25:44.206 [info] [#PID<0.423.0>] Message acknowledge: "A" / "A4"
16:25:44.230 [info] [#PID<0.423.0>] Handling message: "A" / "A5"
16:25:46.592 [info] [#PID<0.423.0>] Message acknowledge: "A" / "A5"
16:25:46.613 [info] [#PID<0.423.0>] Handling message: "C" / "C1"
16:25:48.219 [info] [#PID<0.423.0>] Message acknowledge: "C" / "C1"
16:25:48.233 [info] [#PID<0.423.0>] Handling message: "C" / "C2"
16:25:50.134 [info] [#PID<0.423.0>] Message acknowledge: "C" / "C2"
16:25:50.152 [info] [#PID<0.423.0>] Handling message: "B" / "B1"
16:25:52.073 [info] [#PID<0.423.0>] Message acknowledge: "B" / "B1"
16:25:52.087 [info] [#PID<0.423.0>] Handling message: "B" / "B2"
16:25:53.323 [info] [#PID<0.423.0>] Message acknowledge: "B" / "B2"
16:25:44.286 [info] [#PID<0.424.0>] Handling message: "C" / "C1"
16:25:46.866 [info] [#PID<0.424.0>] Message acknowledge: "C" / "C1"
16:25:46.877 [info] [#PID<0.424.0>] Handling message: "C" / "C2"
16:25:48.653 [info] [#PID<0.424.0>] Message acknowledge: "C" / "C2"
16:25:48.661 [info] [#PID<0.424.0>] Handling message: "A" / "A5"
16:25:50.938 [info] [#PID<0.424.0>] Message acknowledge: "A" / "A5"
16:25:50.958 [info] [#PID<0.424.0>] Handling message: "B" / "B1"
16:25:52.517 [info] [#PID<0.424.0>] Message acknowledge: "B" / "B1"
16:25:52.536 [info] [#PID<0.424.0>] Handling message: "B" / "B2"
16:25:54.045 [info] [#PID<0.424.0>] Message acknowledge: "B" / "B2"
My understanding is that AWS SQS FIFO queues, using the message_group_id, should guarantee message order within the same message group identifier and that once a message has been received, during its visibility timeout, no other consumer can receive the same message.
I'll leave here the code for my SQS producer:
defmodule ElixirBroadwayPlayground.SQSProducer do
use Broadway
require Logger
alias Broadway.Message
def start_link(config) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {
BroadwaySQS.Producer,
queue_url: Keyword.get(config, :queue_url),
receive_interval: 1000,
on_success: :ack,
on_failure: :noop,
visibility_timeout: 10,
attribute_names: [:message_group_id, :approximate_first_receive_timestamp]
}
],
processors: [
default: [concurrency: Keyword.get(config, :num_workers, 1)]
],
partition_by: &partition_by/1
)
end
@impl true
def handle_message(
_,
%Message{
data: data,
metadata: %{attributes: %{"message_group_id" => message_group_id}}
} = message,
_
) do
log_event("Handling message", data, message_group_id)
HTTPoison.get("https://swapi.dev/api/people/1", [{"Content-Type", "application/json"}],
ssl: [verify: :verify_none]
)
|> handle_response(message)
end
defp handle_response(
{:ok, _},
%Message{
data: data,
metadata: %{attributes: %{"message_group_id" => message_group_id}}
} = message
) do
log_event("Message acknowledge", data, message_group_id)
Message.ack_immediately(message)
end
defp handle_response(
{:error, %HTTPoison.Error{reason: reason}},
%Message{
data: data,
metadata: %{attributes: %{"message_group_id" => message_group_id}}
} = message
) do
log_event("Message processing failed ", data, message_group_id)
log_event(reason, data, message_group_id)
Message.failed(message, reason)
end
defp log_event(text, data, msg_group_id) do
message_id = Jason.decode!(data)["id"]
Logger.info("[#{inspect(self())}] #{text}: #{inspect(msg_group_id)} / #{inspect(message_id)}")
end
defp partition_by(%Message{
metadata: %{attributes: %{"message_group_id" => message_group_id}}
}) do
:erlang.phash2(message_group_id)
end
end
I'm I misinterpreting the behaviour that should be expected? Anyone has experienced the same behaviour?
As I mentioned in #9, I would like to extend the SQS visibility timeout dynamically at runtime which is possible through the underlying ExAws.SQS
module. There are many ways it could be done but I wanted to know if you thought it would be an acceptable proposal to extend the Broadway.SQSClient
behaviour in such a manner as;
@type message_visibility :: {receipt_handle :: String.t(), visibility_timeout :: pos_integer()}
@type message_visibility_batch_entry :: {unique_id :: String.t(), visibility :: message_visibility()}
@type message_visibility_batch :: [message_visibility_batch_entry()]
@callback change_message_visibility(queue_name :: String.t(), visibility :: message_visibility()) :: {:ok, response :: map()} | {:error, details :: map()}
@callback change_message_visibility_batch(queue_name :: String.t(), batch :: message_visibility_batch()) :: {:ok, response :: map()} | {:error, details :: map()}
Or do you consider this to be too specific? Ideas welcomed of course.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.