Giter Site home page Giter Site logo

broadway_kafka's People

Contributors

alexandrexaviersm avatar amacciola avatar danmarcab avatar escobera avatar eulixir avatar gabrielgiordan avatar goose97 avatar jamespeterschinner avatar josevalim avatar juanperi avatar junaid1460 avatar jvzeller avatar kianmeng avatar matreyes avatar msaraiva avatar nandofarias avatar oliveigah avatar philss avatar polvalente avatar qhwa avatar rewritten avatar robertkeizer avatar slashmili avatar tschmidleithner avatar v0idpwn avatar victorolinasc avatar vitortrin avatar vovayartsev avatar wojtekmach avatar yordis avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

broadway_kafka's Issues

Shutdown Pipeline Gracefully when Topic is removed

Hello,

I was wondering what the best way to go about handling the shutdown of one of my Broadway Pipelines is, if the Topic its ingesting from is deleted while its still running.

I know when that happens currently it spits out a console error like

iex([email protected])1> 13:49:33.954 [error] GenServer :"ED-de2b90fa-c8b5-5cf7-834d-c62441245829-cab4c94c-ccb4-11ec-8fda-ee15ec8cad74Pipeline.Broadway.Producer_1" terminating
** (RuntimeError) cannot fetch records from Kafka (topic=residents partition=1 offset=2). Reason: :unknown_topic_or_partition
    (broadway_kafka 0.3.4) lib/broadway_kafka/producer.ex:537: BroadwayKafka.Producer.fetch_messages_from_kafka/3
    (broadway_kafka 0.3.4) lib/broadway_kafka/producer.ex:303: BroadwayKafka.Producer.handle_info/2
    (broadway 1.0.3) lib/broadway/topology/producer_stage.ex:229: Broadway.Topology.ProducerStage.handle_info/2
    (gen_stage 1.1.2) lib/gen_stage.ex:2117: GenStage.noreply_callback/3
    (stdlib 3.17) gen_server.erl:695: :gen_server.try_dispatch/4
    (stdlib 3.17) gen_server.erl:771: :gen_server.handle_msg/6
    (stdlib 3.17) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
Last message: {:poll, {1, "residents", 1}}

But then it will keep trying to restart a new Pipeline and rebase but there still wont be any Topic. So it will just fail over and over again.

i tried placing a Process.monitor() on the PID of the BroadwayPipeline but i never see any "Terminated" or "Killed" messaged sent to it.

Where should i looking to handle this ?

Custom offset producer

From documentation the only thing I could find about offset is:

:offset_reset_policy - Optional. Defines the offset to be used when there's no initial offset in Kafka or if the current offset has expired. Possible values are :earliest or :latest. Default is :latest.

Is there a possibility to specify a custom offset from where the producer can start?

Drain After Revoke Error

When stopping and restarting pipelines i periodically am getting these errors.

Along with my pipeline being stuck in a rebalancing loop before it recovers after a while. Any insight into why after stopping a pipeline i am seeing these errors ? Thanks

19:37:20.225 [error] GenServer #PID<0.17856.0> terminating
** (stop) exited in: GenServer.call(#PID<0.17844.0>, :drain_after_revoke, :infinity)
    ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
    (elixir) lib/gen_server.ex:989: GenServer.call/3
    (broadway_kafka) lib/producer.ex:415: BroadwayKafka.Producer.assignments_revoked/1
    (brod) /app/deps/brod/src/brod_group_coordinator.erl:477: :brod_group_coordinator.stabilize/3
    (brod) /app/deps/brod/src/brod_group_coordinator.erl:391: :brod_group_coordinator.handle_info/2
    (stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:711: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3

offset_reset_policy earliest resets offset to earliest on consumer reballance

It appears the setting offset_reset_policy: :earliest is not working correctly (Or I have misunderstood how it should work). Consumer offsets are always reset to earliest on app restart or when brod reballances partition assignments.

In BroadwayKafka.BrodClient.resolve_offset/4 there is no condition where the consumer offset will be used when :earliest is used.

I expect offset_reset_policy: :earliest should only reset to the earliest offset if no offset is found not at any time consumer_offset > earliest_offset

Stop consuming messages after failure

How can I configure kafka_broadway to stop once an error occurs?
Once an error occurs, I would like to stop consuming messages and keep the same offset, this may be due to the publisher sending wrong information or something that needs to be changed on my application pipeline.

Right now I'm using the handle_failed to publish into a dead-letter-topic, but its not the ideal behavior for my use case.

Is it possible to change the offset of broadway consumer group to skip some messages or even replay?

Support for pipelines using Registry / via tuple

Hello!

I'm busy playing with Broadway for a use-case with many pipeline's that are created & destroyed dynamically. The changes introduced in dashbitco/broadway#239 seem perfect for this by allowing pipeline naming using a via tuple / Registry.

The broadway_kafka producer still assumes pipeline's are named using an atom, and tries to use Module.concat/2 to build it's own child process names.

Is there a sane way to get the Kafka producer to also support naming its children using the pipeline's process_name/2 function?

Major Assignments Issue In Master Branch

Hello,

I posted this issue
#43
last week and we got a PR submitted to Master to fix this crash from happening. However i believe there has been an unexpected side effect from fixing this crash.

The scenario i am seeing the issue is when a pipeline is started with a consumer_group_id, stopped, and then started using the same consumer_group_id. When the pipeline starts it does not seem to register or fetch messages from all the partitions for the topic. In the below screenshot the top output is after i started and shortly after stopped the pipeline. Then the below output is when i start it again and let all the messages ingest.

Screen Shot 2021-01-20 at 10 18 00 AM

When i reverted from using the master branch and went back to using the latest release tag. This issue went away. I will show my logs here from both cases.

LOGS WHEN USING MASTER BRANCH W/ ISSUE:

13:05:12.258 [info] Servers.PubSub.IngestPubSub: Channel: "ingest_channel", Received message: START PIPELINE

05:12.274 [info] [supervisor: {:local, :brod_sup}, started: [pid: #PID<0.23121.0>, id: :"Elixir.EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-0a422e20-5b4a-11eb-a49d-a6c0fcfc4157Pipeline.Broadway.Producer_0.Client", mfargs: {:brod_client, :start_link, [[{"kafka", 9072}], :"Elixir.EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-0a422e20-5b4a-11eb-a49d-a6c0fcfc4157Pipeline.Broadway.Producer_0.Client", [connect_timeout: 30000]]}, restart_type: {:permanent, 10}, shutdown: 5000, child_type: :worker]]

13:05:12.275 [info] [supervisor: {:local, :brod_sup}, started: [pid: #PID<0.23125.0>, id: :"Elixir.EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-0a422e20-5b4a-11eb-a49d-a6c0fcfc4157Pipeline.Broadway.Producer_1.Client", mfargs: {:brod_client, :start_link, [[{"kafka", 9072}], :"Elixir.EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-0a422e20-5b4a-11eb-a49d-a6c0fcfc4157Pipeline.Broadway.Producer_1.Client", [connect_timeout: 30000]]}, restart_type: {:permanent, 10}, shutdown: 5000, child_type: :worker]]

13:05:12.276 [info] [supervisor: {:local, :brod_sup}, started: [pid: #PID<0.23129.0>, id: :"Elixir.EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-0a422e20-5b4a-11eb-a49d-a6c0fcfc4157Pipeline.Broadway.Producer_2.Client", mfargs: {:brod_client, :start_link, [[{"kafka", 9072}], :"Elixir.EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-0a422e20-5b4a-11eb-a49d-a6c0fcfc4157Pipeline.Broadway.Producer_2.Client", [connect_timeout: 30000]]}, restart_type: {:permanent, 10}, shutdown: 5000, child_type: :worker]]

13:05:12.276 [info] [supervisor: {:local, :brod_sup}, started: [pid: #PID<0.23133.0>, id: :"Elixir.EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-0a422e20-5b4a-11eb-a49d-a6c0fcfc4157Pipeline.Broadway.Producer_3.Client", mfargs: {:brod_client, :start_link, [[{"kafka", 9072}], :"Elixir.EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-0a422e20-5b4a-11eb-a49d-a6c0fcfc4157Pipeline.Broadway.Producer_3.Client", [connect_timeout: 30000]]}, restart_type: {:permanent, 10}, shutdown: 5000, child_type: :worker]]

13:05:12.277 [info] [supervisor: {:local, :brod_sup}, started: [pid: #PID<0.23137.0>, id: :"Elixir.EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-0a422e20-5b4a-11eb-a49d-a6c0fcfc4157Pipeline.Broadway.Producer_4.Client", mfargs: {:brod_client, :start_link, [[{"kafka", 9072}], :"Elixir.EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-0a422e20-5b4a-11eb-a49d-a6c0fcfc4157Pipeline.Broadway.Producer_4.Client", [connect_timeout: 30000]]}, restart_type: {:permanent, 10}, shutdown: 5000, child_type: :worker]]

13:05:12.279 [info] [supervisor: {:local, :brod_sup}, started: [pid: #PID<0.23144.0>, id: :"Elixir.EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-0a422e20-5b4a-11eb-a49d-a6c0fcfc4157Pipeline.Broadway.Producer_5.Client", mfargs: {:brod_client, :start_link, [[{"kafka", 9072}], :"Elixir.EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-0a422e20-5b4a-11eb-a49d-a6c0fcfc4157Pipeline.Broadway.Producer_5.Client", [connect_timeout: 30000]]}, restart_type: {:permanent, 10}, shutdown: 5000, child_type: :worker]]

13:05:12.279 [info] [supervisor: {:local, :brod_sup}, started: [pid: #PID<0.23149.0>, id: :"Elixir.EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-0a422e20-5b4a-11eb-a49d-a6c0fcfc4157Pipeline.Broadway.Producer_6.Client", mfargs: {:brod_client, :start_link, [[{"kafka", 9072}], :"Elixir.EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-0a422e20-5b4a-11eb-a49d-a6c0fcfc4157Pipeline.Broadway.Producer_6.Client", [connect_timeout: 30000]]}, restart_type: {:permanent, 10}, shutdown: 5000, child_type: :worker]]

13:05:12.280 [info] [supervisor: {:local, :brod_sup}, started: [pid: #PID<0.23153.0>, id: :"Elixir.EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-0a422e20-5b4a-11eb-a49d-a6c0fcfc4157Pipeline.Broadway.Producer_7.Client", mfargs: {:brod_client, :start_link, [[{"kafka", 9072}], :"Elixir.EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-0a422e20-5b4a-11eb-a49d-a6c0fcfc4157Pipeline.Broadway.Producer_7.Client", [connect_timeout: 30000]]}, restart_type: {:permanent, 10}, shutdown: 5000, child_type: :worker]]

13:05:12.280 [info] [supervisor: {:local, :brod_sup}, started: [pid: #PID<0.23157.0>, id: :"Elixir.EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-0a422e20-5b4a-11eb-a49d-a6c0fcfc4157Pipeline.Broadway.Producer_8.Client", mfargs: {:brod_client, :start_link, [[{"kafka", 9072}], :"Elixir.EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-0a422e20-5b4a-11eb-a49d-a6c0fcfc4157Pipeline.Broadway.Producer_8.Client", [connect_timeout: 30000]]}, restart_type: {:permanent, 10}, shutdown: 5000, child_type: :worker]]

13:05:12.280 [info] [supervisor: {:local, :brod_sup}, started: [pid: #PID<0.23161.0>, id: :"Elixir.EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-0a422e20-5b4a-11eb-a49d-a6c0fcfc4157Pipeline.Broadway.Producer_9.Client", mfargs: {:brod_client, :start_link, [[{"kafka", 9072}], :"Elixir.EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-0a422e20-5b4a-11eb-a49d-a6c0fcfc4157Pipeline.Broadway.Producer_9.Client", [connect_timeout: 30000]]}, restart_type: {:permanent, 10}, shutdown: 5000, child_type: :worker]]

13:05:14.020 [info] Elixir.CogyntWorkstationIngest.Servers.PubSub.IngestPubSub: Channel: "ingest_channel", Received message: STOP PIPELINE

13:05:18.284 [info] Group member (EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-0a422e20-5b4a-11eb-a49d-a6c0fcfc4157,coor=#PID<0.23142.0>,cb=#PID<0.23136.0>,generation=1):
elected=false

13:05:18.285 [info] Group member (EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-0a422e20-5b4a-11eb-a49d-a6c0fcfc4157,coor=#PID<0.23123.0>,cb=#PID<0.23120.0>,generation=1):
elected=true

13:05:18.285 [info] Group member (EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-0a422e20-5b4a-11eb-a49d-a6c0fcfc4157,coor=#PID<0.23155.0>,cb=#PID<0.23152.0>,generation=1):
elected=false

13:05:18.285 [info] Group member (EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-0a422e20-5b4a-11eb-a49d-a6c0fcfc4157,coor=#PID<0.23159.0>,cb=#PID<0.23156.0>,generation=1):
elected=false

13:05:18.285 [info] Group member (EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-0a422e20-5b4a-11eb-a49d-a6c0fcfc4157,coor=#PID<0.23135.0>,cb=#PID<0.23132.0>,generation=1):
elected=false

13:05:18.285 [info] Group member (EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-0a422e20-5b4a-11eb-a49d-a6c0fcfc4157,coor=#PID<0.23127.0>,cb=#PID<0.23124.0>,generation=1):
elected=false

13:05:18.285 [info] Group member (EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-0a422e20-5b4a-11eb-a49d-a6c0fcfc4157,coor=#PID<0.23147.0>,cb=#PID<0.23143.0>,generation=1):
elected=false

13:05:18.285 [info] Group member (EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-0a422e20-5b4a-11eb-a49d-a6c0fcfc4157,coor=#PID<0.23165.0>,cb=#PID<0.23160.0>,generation=1):
elected=false

13:05:18.287 [info] Group member (EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-0a422e20-5b4a-11eb-a49d-a6c0fcfc4157,coor=#PID<0.23151.0>,cb=#PID<0.23148.0>,generation=1):
elected=false

13:05:18.288 [info] Group member (EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-0a422e20-5b4a-11eb-a49d-a6c0fcfc4157,coor=#PID<0.23131.0>,cb=#PID<0.23128.0>,generation=1):
elected=false

13:05:18.294 [info] Group member (EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-0a422e20-5b4a-11eb-a49d-a6c0fcfc4157,coor=#PID<0.23155.0>,cb=#PID<0.23152.0>,generation=1):
assignments received:[]

13:05:18.294 [info] Group member (EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-0a422e20-5b4a-11eb-a49d-a6c0fcfc4157,coor=#PID<0.23165.0>,cb=#PID<0.23160.0>,generation=1):
assignments received:[]

13:05:18.294 [info] Group member (EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-0a422e20-5b4a-11eb-a49d-a6c0fcfc4157,coor=#PID<0.23159.0>,cb=#PID<0.23156.0>,generation=1):
assignments received:[]

13:05:18.294 [info] Group member (EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-0a422e20-5b4a-11eb-a49d-a6c0fcfc4157,coor=#PID<0.23147.0>,cb=#PID<0.23143.0>,generation=1):
assignments received:[]

13:05:18.294 [info] Group member (EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-0a422e20-5b4a-11eb-a49d-a6c0fcfc4157,coor=#PID<0.23135.0>,cb=#PID<0.23132.0>,generation=1):
assignments received:[]

13:05:18.294 [info] Group member (EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-0a422e20-5b4a-11eb-a49d-a6c0fcfc4157,coor=#PID<0.23127.0>,cb=#PID<0.23124.0>,generation=1):
assignments received:[]

13:05:18.295 [info] Group member (EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-0a422e20-5b4a-11eb-a49d-a6c0fcfc4157,coor=#PID<0.23131.0>,cb=#PID<0.23128.0>,generation=1):
assignments received:[]

13:05:18.295 [info] Group member (EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-0a422e20-5b4a-11eb-a49d-a6c0fcfc4157,coor=#PID<0.23151.0>,cb=#PID<0.23148.0>,generation=1):
assignments received:[]

13:05:18.295 [info] Group member (EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-0a422e20-5b4a-11eb-a49d-a6c0fcfc4157,coor=#PID<0.23123.0>,cb=#PID<0.23120.0>,generation=1):
assignments received:
  atm_accounts_entities:
    partition=0 begin_offset=undefined
13:05:18.296 [info] Group member (EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-0a422e20-5b4a-11eb-a49d-a6c0fcfc4157,coor=#PID<0.23142.0>,cb=#PID<0.23136.0>,generation=1):
assignments received:
  atm_accounts_entities:
    partition=1 begin_offset=undefined

Then when i restart the pipeline the partition issue and message inconsistency issues start to occur

LOGS WHEN USING LATEST RELEASE TAG WITH NO ISSUE:

14:19:58.590 [info] Elixir.CogyntWorkstationIngest.Servers.PubSub.IngestPubSub: Channel: "ingest_channel", Received message: START PIPELINE

14:19:58.603 [info] [supervisor: {:local, :brod_sup}, started: [pid: #PID<0.4211.0>, id: :"Elixir.EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-7c515af4-5b54-11eb-b0a7-921d006621acPipeline.Broadway.Producer_0.Client", mfargs: {:brod_client, :start_link, [[{"kafka", 9072}], :"Elixir.EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-7c515af4-5b54-11eb-b0a7-921d006621acPipeline.Broadway.Producer_0.Client", [connect_timeout: 30000]]}, restart_type: {:permanent, 10}, shutdown: 5000, child_type: :worker]]

14:19:58.604 [info] [supervisor: {:local, :brod_sup}, started: [pid: #PID<0.4215.0>, id: :"Elixir.EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-7c515af4-5b54-11eb-b0a7-921d006621acPipeline.Broadway.Producer_1.Client", mfargs: {:brod_client, :start_link, [[{"kafka", 9072}], :"Elixir.EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-7c515af4-5b54-11eb-b0a7-921d006621acPipeline.Broadway.Producer_1.Client", [connect_timeout: 30000]]}, restart_type: {:permanent, 10}, shutdown: 5000, child_type: :worker]]

14:19:58.604 [info] [supervisor: {:local, :brod_sup}, started: [pid: #PID<0.4219.0>, id: :"Elixir.EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-7c515af4-5b54-11eb-b0a7-921d006621acPipeline.Broadway.Producer_2.Client", mfargs: {:brod_client, :start_link, [[{"kafka", 9072}], :"Elixir.EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-7c515af4-5b54-11eb-b0a7-921d006621acPipeline.Broadway.Producer_2.Client", [connect_timeout: 30000]]}, restart_type: {:permanent, 10}, shutdown: 5000, child_type: :worker]]

14:19:58.605 [info] [supervisor: {:local, :brod_sup}, started: [pid: #PID<0.4223.0>, id: :"Elixir.EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-7c515af4-5b54-11eb-b0a7-921d006621acPipeline.Broadway.Producer_3.Client", mfargs: {:brod_client, :start_link, [[{"kafka", 9072}], :"Elixir.EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-7c515af4-5b54-11eb-b0a7-921d006621acPipeline.Broadway.Producer_3.Client", [connect_timeout: 30000]]}, restart_type: {:permanent, 10}, shutdown: 5000, child_type: :worker]]

14:19:58.605 [info] [supervisor: {:local, :brod_sup}, started: [pid: #PID<0.4227.0>, id: :"Elixir.EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-7c515af4-5b54-11eb-b0a7-921d006621acPipeline.Broadway.Producer_4.Client", mfargs: {:brod_client, :start_link, [[{"kafka", 9072}], :"Elixir.EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-7c515af4-5b54-11eb-b0a7-921d006621acPipeline.Broadway.Producer_4.Client", [connect_timeout: 30000]]}, restart_type: {:permanent, 10}, shutdown: 5000, child_type: :worker]]

14:19:58.607 [info] [supervisor: {:local, :brod_sup}, started: [pid: #PID<0.4231.0>, id: :"Elixir.EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-7c515af4-5b54-11eb-b0a7-921d006621acPipeline.Broadway.Producer_5.Client", mfargs: {:brod_client, :start_link, [[{"kafka", 9072}], :"Elixir.EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-7c515af4-5b54-11eb-b0a7-921d006621acPipeline.Broadway.Producer_5.Client", [connect_timeout: 30000]]}, restart_type: {:permanent, 10}, shutdown: 5000, child_type: :worker]]

14:19:58.607 [info] [supervisor: {:local, :brod_sup}, started: [pid: #PID<0.4235.0>, id: :"Elixir.EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-7c515af4-5b54-11eb-b0a7-921d006621acPipeline.Broadway.Producer_6.Client", mfargs: {:brod_client, :start_link, [[{"kafka", 9072}], :"Elixir.EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-7c515af4-5b54-11eb-b0a7-921d006621acPipeline.Broadway.Producer_6.Client", [connect_timeout: 30000]]}, restart_type: {:permanent, 10}, shutdown: 5000, child_type: :worker]]

14:19:58.608 [info] [supervisor: {:local, :brod_sup}, started: [pid: #PID<0.4242.0>, id: :"Elixir.EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-7c515af4-5b54-11eb-b0a7-921d006621acPipeline.Broadway.Producer_7.Client", mfargs: {:brod_client, :start_link, [[{"kafka", 9072}], :"Elixir.EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-7c515af4-5b54-11eb-b0a7-921d006621acPipeline.Broadway.Producer_7.Client", [connect_timeout: 30000]]}, restart_type: {:permanent, 10}, shutdown: 5000, child_type: :worker]]

14:19:58.609 [info] [supervisor: {:local, :brod_sup}, started: [pid: #PID<0.4246.0>, id: :"Elixir.EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-7c515af4-5b54-11eb-b0a7-921d006621acPipeline.Broadway.Producer_8.Client", mfargs: {:brod_client, :start_link, [[{"kafka", 9072}], :"Elixir.EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-7c515af4-5b54-11eb-b0a7-921d006621acPipeline.Broadway.Producer_8.Client", [connect_timeout: 30000]]}, restart_type: {:permanent, 10}, shutdown: 5000, child_type: :worker]]

14:19:58.609 [info] [supervisor: {:local, :brod_sup}, started: [pid: #PID<0.4250.0>, id: :"Elixir.EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-7c515af4-5b54-11eb-b0a7-921d006621acPipeline.Broadway.Producer_9.Client", mfargs: {:brod_client, :start_link, [[{"kafka", 9072}], :"Elixir.EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-7c515af4-5b54-11eb-b0a7-921d006621acPipeline.Broadway.Producer_9.Client", [connect_timeout: 30000]]}, restart_type: {:permanent, 10}, shutdown: 5000, child_type: :worker]]

14:20:00.499 [info] Elixir.CogyntWorkstationIngest.Servers.PubSub.IngestPubSub: Channel: "ingest_channel", Received message: STOP PIPELINE

14:20:04.619 [info] Group member (EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-7c515af4-5b54-11eb-b0a7-921d006621ac,coor=#PID<0.4217.0>,cb=#PID<0.4214.0>,generation=1):
elected=false

14:20:04.619 [info] Group member (EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-7c515af4-5b54-11eb-b0a7-921d006621ac,coor=#PID<0.4244.0>,cb=#PID<0.4241.0>,generation=1):
elected=false

14:20:04.619 [info] Group member (EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-7c515af4-5b54-11eb-b0a7-921d006621ac,coor=#PID<0.4225.0>,cb=#PID<0.4222.0>,generation=1):
elected=false

14:20:04.619 [info] Group member (EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-7c515af4-5b54-11eb-b0a7-921d006621ac,coor=#PID<0.4213.0>,cb=#PID<0.4210.0>,generation=1):
elected=false

14:20:04.621 [info] Group member (EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-7c515af4-5b54-11eb-b0a7-921d006621ac,coor=#PID<0.4240.0>,cb=#PID<0.4234.0>,generation=1):
elected=false

14:20:04.621 [info] Group member (EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-7c515af4-5b54-11eb-b0a7-921d006621ac,coor=#PID<0.4229.0>,cb=#PID<0.4226.0>,generation=1):
elected=false

14:20:04.621 [info] Group member (EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-7c515af4-5b54-11eb-b0a7-921d006621ac,coor=#PID<0.4259.0>,cb=#PID<0.4249.0>,generation=1):
elected=false

14:20:04.621 [info] Group member (EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-7c515af4-5b54-11eb-b0a7-921d006621ac,coor=#PID<0.4221.0>,cb=#PID<0.4218.0>,generation=1):
elected=false

14:20:04.621 [info] Group member (EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-7c515af4-5b54-11eb-b0a7-921d006621ac,coor=#PID<0.4233.0>,cb=#PID<0.4230.0>,generation=1):
elected=false

14:20:04.621 [info] Group member (EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-7c515af4-5b54-11eb-b0a7-921d006621ac,coor=#PID<0.4248.0>,cb=#PID<0.4245.0>,generation=1):
elected=false

14:20:04.630 [info] Group member (EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-7c515af4-5b54-11eb-b0a7-921d006621ac,coor=#PID<0.4213.0>,cb=#PID<0.4210.0>,generation=1):
assignments received:[]

14:20:04.631 [info] Group member (EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-7c515af4-5b54-11eb-b0a7-921d006621ac,coor=#PID<0.4233.0>,cb=#PID<0.4230.0>,generation=1):
assignments received:[]

14:20:04.631 [info] Group member (EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-7c515af4-5b54-11eb-b0a7-921d006621ac,coor=#PID<0.4240.0>,cb=#PID<0.4234.0>,generation=1):
assignments received:[]

14:20:04.631 [info] Group member (EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-7c515af4-5b54-11eb-b0a7-921d006621ac,coor=#PID<0.4229.0>,cb=#PID<0.4226.0>,generation=1):
assignments received:[]

14:20:04.631 [info] Group member (EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-7c515af4-5b54-11eb-b0a7-921d006621ac,coor=#PID<0.4248.0>,cb=#PID<0.4245.0>,generation=1):
assignments received:[]

14:20:04.631 [info] Group member (EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-7c515af4-5b54-11eb-b0a7-921d006621ac,coor=#PID<0.4221.0>,cb=#PID<0.4218.0>,generation=1):
assignments received:[]

14:20:04.631 [info] Group member (EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-7c515af4-5b54-11eb-b0a7-921d006621ac,coor=#PID<0.4259.0>,cb=#PID<0.4249.0>,generation=1):
assignments received:[]

14:20:04.631 [info] Group member (EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-7c515af4-5b54-11eb-b0a7-921d006621ac,coor=#PID<0.4217.0>,cb=#PID<0.4214.0>,generation=1):
assignments received:
  atm_accounts_entities:
    partition=2 begin_offset=undefined
14:20:04.632 [info] Group member (EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-7c515af4-5b54-11eb-b0a7-921d006621ac,coor=#PID<0.4225.0>,cb=#PID<0.4222.0>,generation=1):
assignments received:
  atm_accounts_entities:
    partition=7 begin_offset=undefined
14:20:04.637 [info] Group member (EventDefinition-6a044d07-3625-4043-b6c5-69629fa28dc8-7c515af4-5b54-11eb-b0a7-921d006621ac,coor=#PID<0.4244.0>,cb=#PID<0.4241.0>,generation=1):
assignments received:
  atm_accounts_entities:
    partition=5 begin_offset=undefined

Then when i restart the pipeline i still get the Genserver Crash error that we were trying to fix with the last PR

14:21:44.660 [error] GenServer #PID<0.4056.0> terminating
** (stop) exited in: GenServer.call(#PID<0.4053.0>, :drain_after_revoke, :infinity)
    ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
    (elixir) lib/gen_server.ex:989: GenServer.call/3
    (broadway_kafka) lib/producer.ex:415: BroadwayKafka.Producer.assignments_revoked/1
    (brod) /app/deps/brod/src/brod_group_coordinator.erl:477: :brod_group_coordinator.stabilize/3
    (brod) /app/deps/brod/src/brod_group_coordinator.erl:391: :brod_group_coordinator.handle_info/2
    (stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:711: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message: {:msg, #PID<0.4119.0>, {:kpro_rsp, #Reference<0.2515379551.3856400387.31330>, :heartbeat, 0, %{error_code: :rebalance_in_progress}}}

However even with the crash i still get the correct data ingested and partion offsets fetched. Where with the "fix" it seems to have broken that.

I dont know what the fix is for fixing the Genserver crash and at the same time not breaking the partition offset counts or the draining but that change in master should not be released until something has been found out

Only one processors receives messages

Hi, I have a case that I set concurrency in my processors config, only one of the processors get the job all the time.

My setup

  1. I'm running the Kafka broker locally but I've seen the same issue in production as well.
  2. I have a topic with 20 partitions
  3. The Broadway module is like:
defmodule MyApp.Broadway do
  use Broadway

  alias Broadway.Message
  require Logger

  def start_link(_opts) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: producer_module(),
        concurrency: 1
      ],
      processors: [
        default: [
          concurrency: 10
        ]
      ]
    )
  end

  defp producer_module do
    {BroadwayKafka.Producer,
     [
       hosts: "localhost:9092",
       group_id: "foo_group_id2",
       topics: ["event.foo"]
     ]}
  end

  @impl true
  def handle_message(_, message, _) do
    Logger.info("#{inspect(self())}: producing data #{inspect(message)}")
    Process.sleep(2000)
    message
  end
end

Issue

When I start the iex terminal I see that the connection is established

[info] Group member (foo_group_id2,coor=#PID<0.579.0>,cb=#PID<0.574.0>,generation=29):
assignments received:
  event.foo:
    partition=0 begin_offset=3183
    partition=1 begin_offset=undefined
    partition=2 begin_offset=2399
    partition=3 begin_offset=undefined
    partition=4 begin_offset=2506
    partition=5 begin_offset=2766
    partition=6 begin_offset=2215
    partition=7 begin_offset=2527
    partition=8 begin_offset=2018
    partition=9 begin_offset=12503
    partition=10 begin_offset=2085
    partition=11 begin_offset=2011
    partition=12 begin_offset=1959
    partition=13 begin_offset=1945
    partition=14 begin_offset=2055
    partition=15 begin_offset=1928
    partition=16 begin_offset=1986
    partition=17 begin_offset=2043
    partition=18 begin_offset=2028
    partition=19 begin_offset=undefined
[info] #PID<0.582.0>: producing data %Broadway.Message{acknowledger: {BroadwayKafka.Acknowledger, {#PID<0.574.0>, {29, "event.foo", 0}}, %{offset: 3183}}, batch_key: {"event.foo", 0}, batch_mode: :bulk, batcher: :default, data: "body 3839", metadata: %{headers: [], key: "1076B8A25E736A2A4E37BB25303876D6", offset: 3183, partition: 0, topic: "event.foo", ts: 1651870249942}, status: :ok}
[info] #PID<0.582.0>: producing data %Broadway.Message{acknowledger: {BroadwayKafka.Acknowledger, {#PID<0.574.0>, {29, "event.foo", 0}}, %{offset: 3184}}, batch_key: {"event.foo", 0}, batch_mode: :bulk, batcher: :default, data: "body 3846", metadata: %{headers: [], key: "F9F1DC1EC6CDE5846300B152CDC01033", offset: 3184, partition: 0, topic: "event.foo", ts: 1651870249948}, status: :ok}
[info] #PID<0.582.0>: producing data %Broadway.Message{acknowledger: {BroadwayKafka.Acknowledger, {#PID<0.574.0>, {29, "event.foo", 0}}, %{offset: 3185}}, batch_key: {"event.foo", 0}, batch_mode: :bulk, batcher: :default, data: "body 3847", metadata: %{headers: [], key: "C34C91322862A23470A4001FC4A7CB38", offset: 3185, partition: 0, topic: "event.foo", ts: 1651870249949}, status: :ok}
[info] #PID<0.582.0>: producing data %Broadway.Message{acknowledger: {BroadwayKafka.Acknowledger, {#PID<0.574.0>, {29, "event.foo", 0}}, %{offset: 3186}}, batch_key: {"event.foo", 0}, batch_mode: :bulk, batcher: :default, data: "body 3855", metadata: %{headers: [], key: "DBE92A4125651EE89BFA0CAA47ECDFAD", offset: 3186, partition: 0, topic: "event.foo", ts: 1651870249955}, status: :ok}
[info] #PID<0.582.0>: producing data %Broadway.Message{acknowledger: {BroadwayKafka.Acknowledger, {#PID<0.574.0>, {29, "event.foo", 0}}, %{offset: 3187}}, batch_key: {"event.foo", 0}, batch_mode: :bulk, batcher: :default, data: "body 3898", metadata: %{headers: [], key: "9783F5B0A4BA741051D5CA2915D97ABA", offset: 3187, partition: 0, topic: "event.foo", ts: 1651870249989}, status: :ok}
[info] #PID<0.582.0>: producing data %Broadway.Message{acknowledger: {BroadwayKafka.Acknowledger, {#PID<0.574.0>, {29, "event.foo", 0}}, %{offset: 3188}}, batch_key: {"event.foo", 0}, batch_mode: :bulk, batcher: :default, data: "body 3899", metadata: %{headers: [], key: "65F16D7346D86C4DE978E569D2FDF410", offset: 3188, partition: 0, topic: "event.foo", ts: 1651870249990}, status: :ok}
....

I've left the terminal open for 30 min and only see that the messages are always delivered to the same PID<0.582.0> process. In addition it always pulls from partition 0.

I also double checked it using live dashboard always proc_0 is busy and others are not receiving any tasks.

Screenshot 2022-05-06 at 23 26 52

Just in case it helps this my consumer group details on this topic

Screenshot 2022-05-06 at 23 28 06

As you see our consumer is lagged behind the end-offset which is the case we have sometimes in our production, to reproduce that locally, you can do :

key = fn -> Base.encode16(:crypto.strong_rand_bytes(16)) end
Enum.each(1..100000, fn i ->   :brod.produce_sync(:foo_producer, "event.foo", :hash, key.(), "body #{i}") end)

Expect to see

I'd expect to see two things:

  1. I'd expect that the producer distributes the messages to all processors.
  2. I'd expect that the producer pulls data from all partitions(not sure how it will do it, randomly or round-robin!)

Could you please kindly help me to understand if:
a. This is an expected behaviour.
b. This is not an expected behaviour and is a bug.
c. There is a problem in my Broadway process config.

exactly once delivery (EOD) in stream processing

Hi!

For stream processing Kafka has a transaction API that exposes a "transaction coordinator" which commits the offset to the source topic and commits the messages sent to the target topic allowing exactly once semantics. an explanation here

Neither Brod or KafkaEx exposes the transactional api, but their protocols (kafka_protocol and kayrock) does.

I don't know if you would like broadway to implement this end-to-end integration. If we are doing something transactional, maybe we should use just one process per partition, each one with a transactional_id for commiting the transaction (we wouldn't need back pressure), and that's it!. But it would bee nice to have a single Broadway API to handle all types of streaming use cases.

What do you think? how would you handle exactly once delivery?

BroadwayKafka 0.3.1 is skipping the current offset

Hi folks ๐Ÿ‘‹

BroadwayKafka version 0.3.1 (specifically this PR #72) introduced an undesirable behavior where the consumer doesn't go back to consuming the offsets from where it left off, now it's always taking the lastest offset. I think this is a Major issue because you can end up losing a lot of messages if your consumer is restarted.

For example, suppose you have a constant flow of messages, let's say that a topic receives about 10 messages per second and you decide to stop the server to make a deployment. The desired behavior is: When it becomes active again, the consumer starts consuming since the last committed offset, so it would continue the flow normally without losing any message. But with the behavior introduced in 0.3.1, the current_offset (last committed offset) is ignored and instead, we are only reading the new messages that arrive after the consumer is active again.
In this example, If the consumer takes 10 seconds to come back up again after the deployment, you will have missed 100 messages.

Maybe there was a misinterpretation of the configuration offset_reset_policy and now we're using it in cases we shouldn't.

:offset_reset_policy - Optional. Defines the offset to be used when there's no initial offset in Kafka or if the current offset has expired.
Possible values are :earliest or :latest. Default is :latest.

As shown in the docs, I believe we should use this policy only when the offset is :undefined (new consumers) or the current_offset is already expired. If your application already knows the offset it should use and it is still active, then I think it's wrong using the :latest or :erliest offset option. I think this undesirable behavior is also related to this issue #74

I created the PR #75 and I think it should fix the problem that we were trying to fix in the issue #71, without introducing the side effects that I described here and also in the one described in the issue #74.

How to reproduce:

After initializing Kafka, create a topic

kafka-topics.sh --create --zookeeper localhost:2181 --partitions 1 --topic test --replication-factor 1

Starting a new project

mix new kafka_consumer --sup
  defp deps do
    [
      {:broadway, "~> 1.0"},
      {:broadway_kafka, "~> 0.3"}
    ]
  end

Define a basic pipeline configuration

defmodule MyBroadway do
  use Broadway

  alias Broadway.Message

  def start_link(_opts) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module:
          {BroadwayKafka.Producer,
           [
             hosts: [localhost: 9092],
             group_id: "group_1",
             topics: ["test"]
           ]},
        concurrency: 1
      ],
      processors: [
        default: [
          concurrency: 10
        ]
      ]
    )
  end

  @impl true
  def handle_message(_, message, _) do
    message
    |> Message.update_data(fn data ->
      IO.inspect(data, label: "Got messsage")

      {data, String.to_integer(data) * 2}
    end)
  end
end

Add it as a child in a supervision tree

    children = [MyBroadway]

    Supervisor.start_link(children, strategy: :one_for_one)

You can now test the pipeline by entering an iex session:

iex -S mix

Open another terminal window and send messages to Kafka

kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
>1
>2
>3

You should see this output

iex> Got messsage: "1"
iex> Got messsage: "2"
iex> Got messsage: "3"

Now hit Ctrl-C twice to stop the broadway consumer and send more messages to kafka:

kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
>4
>5
>6

Start your Elixir application again:

iex -S mix

You can wait for a while, but new messages that were sent while the consumer was offline will not be consumed.

Try to send a new message:

kafka-console-producer.sh --topic test --bootstrap-server localhost:9092

7

You should see this output

iex> Got messsage: "7"

This means that offsets 3, 4, and 5 were skipped

The desired behavior for a kafka consumer is that it doesn't skip any available messages, so if the last ack was offset 2, it would have to continue from offset 3 when you start it again, and it would have to consume messages received while it was offline.

Cut release 0.3.6 ?

Currently have a workaround to solve for #104 - would be nice to be able to specify a specific version.

I'll happily create a PR but it seemed a bit presumptuous without at least creating an issue and confirming whether that would be wanted.

When a new node joins, two consumers never go to a balancing state

Issue

I've faced a problem in production that when we add a new node(Node b) and the existing consumers are busy(in Node A), NodeB joining the consumer group, causes the NodeA to go to rebalanced mode but by the time it's ready to join, The Kafka coordinator times out and take NodeA out of the group. When NodeA tries to join again, it cause NodeB to go to rebalancing mode and it continues sometimes for 40 min until they settle!

This is the screenrecording showing the case https://github.com/slashmili/talks/blob/tmp-screen-recording/rec/kafka-nodes.mov

When watching the video pay attention to the app1 console, it says:

re-joining group, reason::rebalance_in_progress

and after a while the Kafka log says:

[2022-05-13 09:07:43,401] INFO [GroupCoordinator 1001]: Group foo_group_id2 remove dynamic members who haven't joined: Set('app1@Milads-MBP'/<0.474.0>-d94ec1de-a5b9-4e37-955d-d5d2c8a71e14) (kafka.coordinator.group.GroupCoordinator)

Why?

So you might ask why does it happen? What takes long for the NodeA to join again?

I've debugged brod and it all comes to this part :

https://github.com/kafka4beam/brod/blob/master/src/brod_group_coordinator.erl#L499-L502

brod wants to start rebalancing but it's waiting for line 502 which takes a long time to finish.

This calls BrodwayKafka which makes a call to the process and waits until all the ack messages are received.

https://github.com/dashbitco/broadway_kafka/blob/main/lib/broadway_kafka/producer.ex#L485-L488

How to fix this

Solution

The fix that seems working consistently is to add rebalance_timeout_seconds option. In the local env setting it to 200 did the trick. I suppose you won't mind if I create PR adding that?

Extra

Since I spent almost a week chasing these unexpected behaviour, I'd like to purpose to make it easier for the future users of this library.
What I'd like to do is after the process receives :drain_after_revoke message, stores the time it received the message. In other part of the code if state.revoke_caller exists and it's been more than rebalance_timeout_seconds seconds, emit a warning to give a hint to the users about looking into rebalance_timeout_seconds option


Another case but related

So I started looking at this issue in a different environment. We have 100 partitions for a topic and we started seeing this round of rebalancing. I think adding rebalance_timeout_seconds solves part of the problem.

Another case is that imagine we have 2 nodes running and consuming 100 partition, each are polling from 50 partitions. When there is a new rebalancing(a new node comes online or broker has some maintenance). brod triggers assignments_revoked which triggers this call.

We saw that it also takes a long time and causes rebalancing. Looking at the process' mailbox it shows that there are ~50 scheduled poll message and then at the end of the mailbox there is the :drain_after_revoke message.

Which means all the poll messages are going to be execute and hand over events to the GenStage consumers without the knowledge of that the coordinator has asked to drain due to revoke of assignments. It adds additional waiting time because by the time :drain_after_revoke is processed we have to wait for ack of all other new messages we handed over to the consumers.

I thought of a solution to keep the state of revoked assignment in another process. But not really happy with the solution and would like to get your feedback on how to handle this case.

Bump version to include bugfix

Hey ๐Ÿ‘‹ โ€“ we bumped our version from 0.3.0 to 0.3.3 to solve the problem mentioned in #76, but when trying to deploy we realized we were having another bug (that was mentioned in #78).

we are currently pointing to 61bf8b7 to benefit from the bug fixes that were lovely done:

{:broadway_kafka, github: "dashbitco/broadway_kafka", ref: "61bf8b7"},

I was wondering if it's possible to bump the version officially to 0.3.4 and include that change.

sorry if it's not appropriate to ask this type of question here ๐Ÿ˜…

Offset reset policy isn't applied for offsets out of range.

When the BroadwayKafka.Producer receives new assignments from from the :brod_group_coordinator the offset reset policy isn't applied for offsets which are out of range, but rather only for offsets that are :undefined. See: https://github.com/dashbitco/broadway_kafka/blob/main/lib/broadway_kafka/brod_client.ex#L130.

I expected the :offset_reset_policy to behave as per the :brod_consumer documentation

Seems like the :offset_reset_policy should be handled in the fetch function (catch the :offset_out_of_range error and try again?)

fetch_messages_from_kafka/3 Error: :unknown_server_error

In one of my production clusters i just switched over our Auditing application to also start using BroadwayKafka for its ingest pipeline

However i am seeing errors from this method:

case client.fetch(client_id, topic, partition, offset, config[:fetch_config], config) do

03:25:25.668 [error] GenServer :"AuditLogPipeline.Broadway.Producer_2" terminating
** (RuntimeError) cannot fetch records from Kafka (topic=_cogynt_audit_log partition=6 offset=0). Reason: :unknown_server_error
    (broadway_kafka 0.3.4) lib/broadway_kafka/producer.ex:537: BroadwayKafka.Producer.fetch_messages_from_kafka/3
    (broadway_kafka 0.3.4) lib/broadway_kafka/producer.ex:303: BroadwayKafka.Producer.handle_info/2
    (broadway 1.0.3) lib/broadway/topology/producer_stage.ex:229: Broadway.Topology.ProducerStage.handle_info/2
    (gen_stage 1.1.2) lib/gen_stage.ex:2117: GenStage.noreply_callback/3
    (stdlib 3.17) gen_server.erl:695: :gen_server.try_dispatch/4
    (stdlib 3.17) gen_server.erl:771: :gen_server.handle_msg/6
    (stdlib 3.17) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
Last message: {:poll, {17108, "_cogynt_audit_log", 6}}
State: %{consumers: [{#PID<0.2859.0>, #Reference<0.1885855812.438304784.200151>}, {#PID<0.2858.0>, #Reference<0.1885855812.438304784.200140>}, {#PID<0.2857.0>, #Reference<0.1885855812.438304784.200126>}, {#PID<0.2856.0>, #Reference<0.1885855812.438304784.200107>}, {#PID<0.2855.0>, #Reference<0.1885855812.438304784.200096>}, {#PID<0.2854.0>, #Reference<0.1885855812.438304784.200081>}, {#PID<0.2853.0>, #Reference<0.1885855812.438304784.200066>}, {#PID<0.2852.0>, #Reference<0.1885855812.438304776.200786>}, {#PID<0.2851.0>, #Reference<0.1885855812.438304784.200049>}, {#PID<0.2850.0>, #Reference<0.1885855812.438304769.200498>}], module: BroadwayKafka.Producer, module_state: %{acks: %{{17108, "_cogynt_audit_log", 6} => {[], 0, []}}, allocator_names: {2, [AuditLogPipeline.Allocator_processor_default], [AuditLogPipeline.Allocator_batcher_consumer_default]}, buffer: {[], []}, client: BroadwayKafka.BrodClient, client_id: AuditLogPipeline.Broadway.Producer_2.Client, config: %{client_config: [connect_timeout: 30000], fetch_config: %{}, group_config: [offset_commit_policy: :commit_to_kafka_v2, session_timeout_seconds: 30], group_id: "AuditLog-df7078f6-7693-4725-8b88-2a138fc847ce", hosts: [{"kafka", 9071}], offset_commit_on_ack: true, offset_reset_policy: :earliest, receive_interval: 2000, reconnect_timeout: 1000, topics: ["_cogynt_audit_log"]}, demand: 100, group_coordinator: #PID<0.2811.0>, receive_interval: 2000, receive_timer: #Reference<0.1885855812.438304783.200132>, reconnect_timeout: 1000, revoke_caller: nil, shutting_down?: false}, rate_limiting: nil, transformer: {CogyntAudit.Broadway.AuditLogPipeline, :transform, []}}
03:25:25.668 [error] GenServer :"AuditLogPipeline.Broadway.Producer_4" terminating
** (RuntimeError) cannot fetch records from Kafka (topic=_cogynt_audit_log partition=5 offset=0). Reason: :unknown_server_error
    (broadway_kafka 0.3.4) lib/broadway_kafka/producer.ex:537: BroadwayKafka.Producer.fetch_messages_from_kafka/3
    (broadway_kafka 0.3.4) lib/broadway_kafka/producer.ex:303: BroadwayKafka.Producer.handle_info/2
    (broadway 1.0.3) lib/broadway/topology/producer_stage.ex:229: Broadway.Topology.ProducerStage.handle_info/2
    (gen_stage 1.1.2) lib/gen_stage.ex:2117: GenStage.noreply_callback/3
    (stdlib 3.17) gen_server.erl:695: :gen_server.try_dispatch/4
    (stdlib 3.17) gen_server.erl:771: :gen_server.handle_msg/6
    (stdlib 3.17) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
Last message: {:poll, {17108, "_cogynt_audit_log", 5}}

This is causing my Producers to restart over and over again. They are at a very high restart count now

Group member (AuditLog-df7078f6-7693-4725-8b88-2a138fc847ce,coor=#PID<0.2843.0>,cb=#PID<0.2839.0>,generation=17115):

Any help with regards to this error (which looks like an error returned from the brod client) would be helpful. Thanks !

Publish documentation

It would be great to get the docs for broadway_kafka published. I know it is in a pre-release state, but it would be helpful for those who are trying to evaluate the library (such as my team).

Thank you for the great work on this, and for Hugo for letting me know about the progress here! ๐Ÿ’ฏ

Broadway suspend pipeline

Is there a way to suspend a Broadway kafka pipeline from ingesting data without having to fully kill the Genserver ?

Connecting via SSL

Certain Kafka providers (I'm looking at you Heroku) require connecting over SSL. I didn't see any explicit options for this when poking around so I'm wondering if there are any plans to eventually support SSL.

Also, thanks for working on a Kafka broadway implementation - super excited to see this come together โค๏ธ

How to stop a Broadway Kafka pipeline?

Team, I tried Broadway.stop(__ MODULE __) in a broadway_kafka pipeline. But the pipeline gets restarted after a minute by the brod supervisor :brod_sup. The restart strategy is fixed to permanent in brod library (see ref) - could it be the cause? Is there a way I can start my broadway kafka producer with 'restart' arg set to temporary/transient?

:supervisor: {:local, :brod_sup}
    :started: [
  pid: #PID<0.1456.0>,
  id: TestBroadway.Broadway.Producer_0.Client,
  mfargs: {:brod_client, :start_link,
   [[localhost: 9092], TestBroadway.Broadway.Producer_0.Client, []]},
  restart_type: {:permanent, 10},
  shutdown: 5000,
  child_type: :worker
]

Reference: https://github.com/kafka4beam/brod/blob/master/src/brod_sup.erl#L164

Avro Schema Support

Just opening the conversation on if any thoughts have gone into supporting Avro Schema for this

Kafka hosts standard notation

Hi @msaraiva !

First of all, thanks a lot for your work here!

I'd like to propose to also accept a String.t() as hosts. I am not an expert on Kafka tools, but it seems some tools accept that as it is easier to pass as an environment variable a dynamic number of values. The official libraries seem to support this too.

I believe this is the most common way of initializing this for the ecosystem. I could open a PR if that is desired/wanted. It would be a very small code change for a nice developer happiness in my opinion. We would avoid doing something like this currently:

# Something like
opts
|> Keyword.get(:brokers, "")
|> String.split(",", trim: true)
|> Enum.map(&String.split(&1, ":", trim: true))
|> Keyword.new(fn [key, val] -> {String.to_atom(key), String.to_integer(val)} end)

Once again, thanks for your work!

Allow passing :connect_timeout to brod client

Hello,

I need to setup a longer timeout than the 5 seconds default one, brod allows this via the connect_timeout client option.

Is it possible to allow this property to be set through broadway_kafka?

Producer terminate

Hello, I am new to Broadway. I am working with the Broadway Kafka producer and when I stopped the pipeline i realized that the brod processes remain running.
When I looked at the Broadway.Topology.ProducerStage I see that it is not set to trap exits (unlike the Broadway.Topology.ProcessorStage) and so the GenServer terminate function is not called so the terminate in the KafkaBroadway.Producer never gets called either and no clean up is performed.
Please let me know if I am getting something wrong or this is indeed sort of a bug.

Drop support of topic/partition for `topics` option

Given the following exception trying to connect to Kafka with a keyword list of topic/partition [test: n], I think that we maybe should support only a list of topics:

09:48:54.434 [info]  [supervisor: {:local, :brod_sup}, started: [pid: #PID<0.241.0>, id: Consumer.Broadway.Producer_0.Client, mfargs: {:brod_client, :start_link, [["##############.aws.confluent.cloud": 9092], Consumer.Broadway.Producer_0.Client, [sasl: {:plain, "#################", #Function<11.113019958/0 in :brod_utils.init_sasl_opt/1>}, ssl: [cacertfile: "./priv/ssl/server.crt", keyfile: "./priv/ssl/server.key", certfile: "./priv/ssl/server.crt"]]]}, restart_type: {:permanent, 10}, shutdown: 5000, child_type: :worker]]
Interactive Elixir (1.9.4) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> 
09:48:56.857 [info]  Group member (test,coor=#PID<0.245.0>,cb=#PID<0.240.0>,generation=0):
Leaving group, reason: {{:function_clause, [:topics, :protocol_metadata, :group_protocols, {:join_group, 0}], [group_id: "test", session_timeout: 30000, member_id: "", protocol_type: "consumer", group_protocols: [[protocol_name: :roundrobin_v2, protocol_metadata: [version: 0, topics: [test: 0], user_data: ""]]]]}, [{:kpro_lib, :encode, [:string, {:test, 0}], []}, {:kpro_req_lib, :enc_struct_field, 3, [file: 'src/kpro_req_lib.erl', line: 398]}, {:kpro_req_lib, :"-enc_struct_field/3-lc$^0/1-0-", 3, [file: 'src/kpro_req_lib.erl', line: 389]}, {:kpro_req_lib, :enc_struct_field, 3, [file: 'src/kpro_req_lib.erl', line: 389]}, {:kpro_req_lib, :enc_struct, 3, [file: 'src/kpro_req_lib.erl', line: 379]}, {:kpro_req_lib, :enc_struct, 3, [file: 'src/kpro_req_lib.erl', line: 380]}, {:kpro_req_lib, :translate, 2, [file: 'src/kpro_req_lib.erl', line: 410]}, {:kpro_req_lib, :enc_struct, 3, [file: 'src/kpro_req_lib.erl', line: 378]}]}

 
09:48:56.863 [error] GenServer #PID<0.245.0> terminating
** (stop) {:function_clause, [:topics, :protocol_metadata, :group_protocols, {:join_group, 0}], [group_id: "test", session_timeout: 30000, member_id: "", protocol_type: "consumer", group_protocols: [[protocol_name: :roundrobin_v2, protocol_metadata: [version: 0, topics: [test: 0], user_data: ""]]]]}
    (kafka_protocol) :kpro_lib.encode(:string, {:test, 0})
    (kafka_protocol) src/kpro_req_lib.erl:398: :kpro_req_lib.enc_struct_field/3
    (kafka_protocol) src/kpro_req_lib.erl:389: :kpro_req_lib."-enc_struct_field/3-lc$^0/1-0-"/3
    (kafka_protocol) src/kpro_req_lib.erl:389: :kpro_req_lib.enc_struct_field/3
    (kafka_protocol) src/kpro_req_lib.erl:379: :kpro_req_lib.enc_struct/3
    (kafka_protocol) src/kpro_req_lib.erl:380: :kpro_req_lib.enc_struct/3
    (kafka_protocol) src/kpro_req_lib.erl:410: :kpro_req_lib.translate/2
    (kafka_protocol) src/kpro_req_lib.erl:378: :kpro_req_lib.enc_struct/3
Last message: {:lo_cmd_stabilize, 0, :undefined}
State: {:state, Consumer.Broadway.Producer_0.Client, "test", "", :undefined, 0, [test: 0], :undefined, :undefined, [], false, #PID<0.240.0>, BroadwayKafka.Producer, [], :undefined, :roundrobin_v2, 30, 5, 5, 2, :undefined, :commit_to_kafka_v2, 1, :roundrobin_v2} 

-type topic() :: kpro:topic().

https://github.com/klarna/brod/blob/master/src/brod.erl

-type topic() :: binary().

https://github.com/klarna/kafka_protocol/blob/master/src/kpro.erl

:lists.foldl/3 error in Producer from :brod_utils.drop_aborted/2

I'm getting this error with no idea how to debug.

Error stacktrace is:

12:10:29.038 [error] GenServer Ingestor.Broadway.Producer_0 terminating
** (FunctionClauseError) no function clause matching in :lists.foldl/3
    (stdlib 3.12.1) lists.erl:1262: :lists.foldl(#Function<18.113019958/2 in :brod_utils.drop_aborted/2>, [{%{is_control: false, is_transaction: false, last_offset: 2684, max_ts: 1595567970645, producer_id: 0}, [{:kafka_message, 2284, "", "{\"id\":\"1234\",  ...{more JSON}...  (truncated)

Broadway module

defmodule Ingestor do
  use Broadway

  require Logger

  alias Broadway.Message

  def start_link(opts) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: opts[:producer],
        concurrency: 1
      ],
      processors: [
        default: [
          concurrency: 1
        ]
      ],
      batchers: [
        default: []
      ]
    )
  end

  @impl true
  def handle_message(_, %Message{} = message, _) do
    Logger.debug(inspect(message))

    message
    |> Message.update_data(&process_data/1)
  end

  @impl true
  def handle_batch(_default, messages, _batch_info, _context) do
    messages
  end

  def process_data(data) do
    data
    |> Jason.decode!()
  end
end

opts passed in

config :ingestor, Ingestor,
  producer: {
    BroadwayKafka.Producer,
    hosts: [{"localhost", 9093}],
    group_id: "foo",
    topics: ["bar"],
    offset_reset_policy: :earliest,
    client_config: [
      ssl: true,
      sasl: {:plain, "username", "password"}
    ],
    on_success: :noop
  }

I am at a loss as to why lists.foldl is failing here.

Support :query_api_versions brod option

Hi. With some kafka setups, brod fails to request the api versions because apparently the connection is not upgraded to TLS.

Creating a raw brod client with

[
  query_api_versions: false,
  ssl: [...]
]

does not raise any error, but that option is not supported by BroadwayKafka.BrodClient.

Would you consider adding it? Meanwhile I will copy over the module and whitelist the option locally.

Move INFO level logging to DEBUG level

There is a lot of logs right now when the Broadway pipeline is starting and connecting the the kafka brokers that are all INFO level logs. As i would not always want to see this in a production ENV it would be nice to only be able to see these when debugging.

Just a large sample for example:
10:17:00.406 [info] [supervisor: {:local, :brod_sup}, started: [pid: #PID<0.900.0>, id: :"Elixir.Drilldown-119465517-b31791c8-e888-11ea-9675-3af9d3cfb692Pipeline.Broadway.Producer_0.Client", mfargs: {:brod_client, :start_link, [[{"127.0.0.1", 9092}], :"Elixir.Drilldown-119465517-b31791c8-e888-11ea-9675-3af9d3cfb692Pipeline.Broadway.Producer_0.Client", [connect_timeout: 15000]]}, restart_type: {:permanent, 10}, shutdown: 5000, child_type: :worker]] 10:17:00.410 [info] [supervisor: {:local, :brod_sup}, started: [pid: #PID<0.904.0>, id: :"Elixir.Drilldown-119465517-b31791c8-e888-11ea-9675-3af9d3cfb692Pipeline.Broadway.Producer_1.Client", mfargs: {:brod_client, :start_link, [[{"127.0.0.1", 9092}], :"Elixir.Drilldown-119465517-b31791c8-e888-11ea-9675-3af9d3cfb692Pipeline.Broadway.Producer_1.Client", [connect_timeout: 15000]]}, restart_type: {:permanent, 10}, shutdown: 5000, child_type: :worker]] 10:17:00.411 [info] [supervisor: {:local, :brod_sup}, started: [pid: #PID<0.908.0>, id: :"Elixir.Drilldown-119465517-b31791c8-e888-11ea-9675-3af9d3cfb692Pipeline.Broadway.Producer_2.Client", mfargs: {:brod_client, :start_link, [[{"127.0.0.1", 9092}], :"Elixir.Drilldown-119465517-b31791c8-e888-11ea-9675-3af9d3cfb692Pipeline.Broadway.Producer_2.Client", [connect_timeout: 15000]]}, restart_type: {:permanent, 10}, shutdown: 5000, child_type: :worker]] 10:17:00.411 [info] [supervisor: {:local, :brod_sup}, started: [pid: #PID<0.912.0>, id: :"Elixir.Drilldown-119465517-b31791c8-e888-11ea-9675-3af9d3cfb692Pipeline.Broadway.Producer_3.Client", mfargs: {:brod_client, :start_link, [[{"127.0.0.1", 9092}], :"Elixir.Drilldown-119465517-b31791c8-e888-11ea-9675-3af9d3cfb692Pipeline.Broadway.Producer_3.Client", [connect_timeout: 15000]]}, restart_type: {:permanent, 10}, shutdown: 5000, child_type: :worker]] 10:17:00.412 [info] [supervisor: {:local, :brod_sup}, started: [pid: #PID<0.916.0>, id: :"Elixir.Drilldown-119465517-b31791c8-e888-11ea-9675-3af9d3cfb692Pipeline.Broadway.Producer_4.Client", mfargs: {:brod_client, :start_link, [[{"127.0.0.1", 9092}], :"Elixir.Drilldown-119465517-b31791c8-e888-11ea-9675-3af9d3cfb692Pipeline.Broadway.Producer_4.Client", [connect_timeout: 15000]]}, restart_type: {:permanent, 10}, shutdown: 5000, child_type: :worker]] 10:17:00.412 [info] [supervisor: {:local, :brod_sup}, started: [pid: #PID<0.920.0>, id: :"Elixir.Drilldown-119465517-b31791c8-e888-11ea-9675-3af9d3cfb692Pipeline.Broadway.Producer_5.Client", mfargs: {:brod_client, :start_link, [[{"127.0.0.1", 9092}], :"Elixir.Drilldown-119465517-b31791c8-e888-11ea-9675-3af9d3cfb692Pipeline.Broadway.Producer_5.Client", [connect_timeout: 15000]]}, restart_type: {:permanent, 10}, shutdown: 5000, child_type: :worker]] 10:17:00.413 [info] [supervisor: {:local, :brod_sup}, started: [pid: #PID<0.924.0>, id: :"Elixir.Drilldown-119465517-b31791c8-e888-11ea-9675-3af9d3cfb692Pipeline.Broadway.Producer_6.Client", mfargs: {:brod_client, :start_link, [[{"127.0.0.1", 9092}], :"Elixir.Drilldown-119465517-b31791c8-e888-11ea-9675-3af9d3cfb692Pipeline.Broadway.Producer_6.Client", [connect_timeout: 15000]]}, restart_type: {:permanent, 10}, shutdown: 5000, child_type: :worker]] 10:17:00.413 [info] [supervisor: {:local, :brod_sup}, started: [pid: #PID<0.928.0>, id: :"Elixir.Drilldown-119465517-b31791c8-e888-11ea-9675-3af9d3cfb692Pipeline.Broadway.Producer_7.Client", mfargs: {:brod_client, :start_link, [[{"127.0.0.1", 9092}], :"Elixir.Drilldown-119465517-b31791c8-e888-11ea-9675-3af9d3cfb692Pipeline.Broadway.Producer_7.Client", [connect_timeout: 15000]]}, restart_type: {:permanent, 10}, shutdown: 5000, child_type: :worker]] 10:17:00.413 [info] [supervisor: {:local, :brod_sup}, started: [pid: #PID<0.932.0>, id: :"Elixir.Drilldown-119465517-b31791c8-e888-11ea-9675-3af9d3cfb692Pipeline.Broadway.Producer_8.Client", mfargs: {:brod_client, :start_link, [[{"127.0.0.1", 9092}], :"Elixir.Drilldown-119465517-b31791c8-e888-11ea-9675-3af9d3cfb692Pipeline.Broadway.Producer_8.Client", [connect_timeout: 15000]]}, restart_type: {:permanent, 10}, shutdown: 5000, child_type: :worker]] 10:17:00.414 [info] [supervisor: {:local, :brod_sup}, started: [pid: #PID<0.936.0>, id: :"Elixir.Drilldown-119465517-b31791c8-e888-11ea-9675-3af9d3cfb692Pipeline.Broadway.Producer_9.Client", mfargs: {:brod_client, :start_link, [[{"127.0.0.1", 9092}], :"Elixir.Drilldown-119465517-b31791c8-e888-11ea-9675-3af9d3cfb692Pipeline.Broadway.Producer_9.Client", [connect_timeout: 15000]]}, restart_type: {:permanent, 10}, shutdown: 5000, child_type: :worker]] 10:17:00.446 [info] Group member (Drilldown-119465517-b31791c8-e888-11ea-9675-3af9d3cfb692,coor=#PID<0.914.0>,cb=#PID<0.911.0>,generation=1): elected=false 10:17:00.446 [info] Group member (Drilldown-119465517-b31791c8-e888-11ea-9675-3af9d3cfb692,coor=#PID<0.930.0>,cb=#PID<0.927.0>,generation=1): elected=false 10:17:00.446 [info] Group member (Drilldown-119465517-b31791c8-e888-11ea-9675-3af9d3cfb692,coor=#PID<0.918.0>,cb=#PID<0.915.0>,generation=1): elected=false 10:17:00.447 [info] Group member (Drilldown-119465517-b31791c8-e888-11ea-9675-3af9d3cfb692,coor=#PID<0.910.0>,cb=#PID<0.907.0>,generation=1): elected=false 10:17:00.447 [info] Group member (Drilldown-119465517-b31791c8-e888-11ea-9675-3af9d3cfb692,coor=#PID<0.934.0>,cb=#PID<0.931.0>,generation=2): elected=true 10:17:00.447 [info] Group member (Drilldown-119465517-b31791c8-e888-11ea-9675-3af9d3cfb692,coor=#PID<0.902.0>,cb=#PID<0.896.0>,generation=1): elected=false 10:17:00.447 [info] Group member (Drilldown-119465517-b31791c8-e888-11ea-9675-3af9d3cfb692,coor=#PID<0.914.0>,cb=#PID<0.911.0>,generation=1):

Error after upgrading to v0.3.2

Hey there, I seem to be facing an issue that I think got introduced in 595d7a2

Here, :brod.fetch is expected to return {:ok, _} or {:error, :offset_out_of_range}
But the spec of :brod.fetch says the error tuple is :error, any, and I seem to be facing that, as what I get is

{
  :error,
  [
    {
      {:"host-1.com", port1},
      {
        {
          {:kpro_req, #Reference<0.3873234864.3221487618.224752>, :api_versions, 0, false, []},
          :closed
        },
        [
          {:kpro_lib, :send_and_recv_raw, 4, [file: '/Users/jperi/workspace/sylar/deps/kafka_protocol/src/kpro_lib.erl', line: 70]},
          {:kpro_lib, :send_and_recv, 5, [file: '/Users/jperi/workspace/sylar/deps/kafka_protocol/src/kpro_lib.erl', line: 81]},
          {:kpro_connection, :query_api_versions, 4, [file: '/Users/jperi/workspace/sylar/deps/kafka_protocol/src/kpro_connection.erl', line: 246]},
          {:kpro_connection, :init_connection, 2, [file: '/Users/jperi/workspace/sylar/deps/kafka_protocol/src/kpro_connection.erl', line: 233]},
          {:kpro_connection, :init, 4, [file: '/Users/jperi/workspace/sylar/deps/kafka_protocol/src/kpro_connection.erl', line: 170]},
          {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 226]}
        ]
      }
    },
    {
      {:"host-1.com", port2},
      {
        {
          {:kpro_req, #Reference<0.3873234864.3221487618.224757>, :api_versions, 0, false, []},
          :closed
        },
        [
          {:kpro_lib, :send_and_recv_raw, 4, [file: '/Users/jperi/workspace/sylar/deps/kafka_protocol/src/kpro_lib.erl', line: 70]},
          {:kpro_lib, :send_and_recv, 5, [file: '/Users/jperi/workspace/sylar/deps/kafka_protocol/src/kpro_lib.erl', line: 81]},
          {:kpro_connection, :query_api_versions, 4, [file: '/Users/jperi/workspace/sylar/deps/kafka_protocol/src/kpro_connection.erl', line: 246]},
          {:kpro_connection, :init_connection, 2, [file: '/Users/jperi/workspace/sylar/deps/kafka_protocol/src/kpro_connection.erl', line: 233]},
          {:kpro_connection, :init, 4, [file: '/Users/jperi/workspace/sylar/deps/kafka_protocol/src/kpro_connection.erl', line: 170]},
          {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 226]}
        ]
      }
    }
  ]
}

I'm not familiar with the inner workings of this library, but should the error tuple be relaxed to any error tuple?
My setup works again if I downgrade to v0.3.1

Thank you!!

Question: About a pipeline with a Batcher

Hey Have a simple question i think. I am using a Broadway Kafka pipeline with

  • producer -> 1
  • processors -> 10
  • batcher -> 10

Seen in the image attached.

My questions is just i was wondering if my assumption here about how this is working is correct

  • 10 processors running
    each ones job is to grab 1 message from Kafka, execute whatever logic is within the handle_message callback and send to batcher as long as i am saying Message.put_batcher(:name_of_batcher) which i am

  • 1 batcher running
    this ones job is just to aggregate each message sent to it and when it reaches a threshold of batch_size then it will send the batch off to its downstream processors

  • 10 batcher processors running
    each ones job is just to take each batch of messages it gets and execute the logic for handle_batch

Screen Shot 2022-06-02 at 4 05 09 PM

Proposal: Use per partition internal buffer

This is a fix of what I initially described at #92

I'd like to purpose to change how we are keeping buffers internally in the producer.

This proposal:

  • Increase concurrency.
  • Speed up rebalancing time.

How it works now

sequenceDiagram
    autonumber
    participant ProducerBuffer
    actor Producer
    actor GenStagePartitionDispatcher
    actor Consumer1
    actor Consumer2
    actor Consumer3

    Producer->>+Producer: Fetch Partition 0 data
    Producer->>-ProducerBuffer: insert 20 messages into internal queue
    Producer->>+Producer: Fetch Partition 1 data
    Producer->>-ProducerBuffer: insert 27 messages into internal queue
    Producer->>+Producer: Fetch Partition 2 data
    Producer->>-ProducerBuffer: insert 1 messages into internal queue

    Consumer1->>GenStagePartitionDispatcher: ask for 5
    GenStagePartitionDispatcher->>+Producer: handle_demand 5
    Producer->>ProducerBuffer: give me 5 messgaes
    ProducerBuffer->>Producer: Pop 5 messgaes
    Producer->>-GenStagePartitionDispatcher: hand 5 msgs over
    GenStagePartitionDispatcher->>+GenStagePartitionDispatcher: choose Consumer1 as a partition to hand partition 0 data
    GenStagePartitionDispatcher->>-Consumer1: give 5 messages 
    Consumer2->>GenStagePartitionDispatcher: ask for 5
    GenStagePartitionDispatcher->>+Producer: handle_demand 5
    Producer->>ProducerBuffer: give me 5 messgaes
    ProducerBuffer->>Producer: Pop 5 messgaes
    Producer->>-GenStagePartitionDispatcher: hand 5 msgs over
    GenStagePartitionDispatcher->>GenStagePartitionDispatcher: all partition 0 data, save it in internal buffer for Consumer1
    Consumer3->>GenStagePartitionDispatcher: ask for 5
    GenStagePartitionDispatcher->>+Producer: handle_demand 5
    Producer->>ProducerBuffer: give me 5 messgaes
    ProducerBuffer->>Producer: Pop 5 messgaes
    Producer->>-GenStagePartitionDispatcher: hand 5 msgs over
    GenStagePartitionDispatcher->>GenStagePartitionDispatcher: all partition 0 data, save it in internal buffer for Consumer1

It has impact on:

  • concurrency: Consumer2 and Consumer3 are ready to accept data but are staying idle until GenStagePartitionDispatcher gets some data that can hand it over them
  • rebalancing time: Imagine at this moment a assignment revoked is issued(caused by rebalancing) at this moment, we have to wait for Consumer1 to consume 15 messages before we can hand back control to brod to join the new generation.

How the new setup will work:

I have already have a "working" version slashmili@4a4a69d

I still needs to be tested and hash out some part that it was not clear to me but it's good enough to show you a demo!

Basically we have a map of buffers each time we fetch data from Kafka, we push it to it's own queue

sequenceDiagram
    autonumber
    participant ProducerBuffer
    actor Producer
    actor GenStagePartitionDispatcher
    actor Consumer1
    actor Consumer2
    actor Consumer3

    Producer->>+Producer: Fetch Partition 0 data
    Producer->>-ProducerBuffer: insert 20 messages into Partition0 queue
    Producer->>+Producer: Fetch Partition 1 data
    Producer->>-ProducerBuffer: insert 27 messages into Partition1 queue
    Producer->>+Producer: Fetch Partition 2 data
    Producer->>-ProducerBuffer: insert 1 messages into Partition2 queue

    Consumer1->>GenStagePartitionDispatcher: ask for 5
    GenStagePartitionDispatcher->>+Producer: handle_demand 5
    Producer->>-GenStagePartitionDispatcher: Pop 5 messgaes and hand it over (from which queue?)
    Consumer2->>GenStagePartitionDispatcher: ask for 5
    GenStagePartitionDispatcher->>+Producer: handle_demand 5
    Producer->>-GenStagePartitionDispatcher: Pop 5 messgaes and hand it over (from which queue?)
    Consumer3->>GenStagePartitionDispatcher: ask for 5
    GenStagePartitionDispatcher->>+Producer: handle_demand 5
    Producer->>-GenStagePartitionDispatcher: Pop 5 messgaes and hand it over (from which queue?)

But of course it's not easy! If you see 9, 12, 15 steps, how does Producer knows which queue to pop from!

In my change I came up with a logic that works ok:
slashmili@4a4a69d#diff-972ac1f8e8719ed6755418a0e80ab45d206f244c2ace9a0454eaca9897690dc1R702-R712

It finds the first queue which has less than max_demand acks and the queue is not empty. There are also extra logics to rotate the queue that it's looking on each turn. you may ignore that for now.

It has impact on:

  • concurrency: all the processors will get job to work on all the time. The estimating of whom needs seems work well. Even if it doesn't eventually in next few seconds as long there is a demand we hand over task
  • rebalancing time: Since PartitionDispatcher doesn't buffer extra events, when the rebalancing happens, the processors only need finish the job at hand. Producer doesn't give any event to PartitionDispatcher. Hence the rebalancing will be done in timely manner.

Demo time!

Concurrency

This screenrecoding shows that on the panel left, using main branch, only events for partition 0 are being handled by process 0.269.0. On the right, you'll see that the events for all partition are handled.

Rebalancing

This screenrecoding shows that on the right panel(using the proposal approach). after the rebalance_in_progress happens, no more events are being processed because buffer of PartitionDispatcher is not overloaded by one partition events. The old node joins the new node in 10 seconds.

On the left it takes longer than expect(almost 1 minute). by the time it's ready, the timeout occurs and it starts a new generation. this will trigger rebalance on the new node and they'll keep taking each other off.

Question about :receive_interval and topology.

I mainly use this library to get an easy setup instead of using brod directly, as I don't really understand the brod documentation.

My first question is: If I have concurrency set to 1 for the producer, and to 10 for the processors (and I have 10 partitions on all topics), is each Kafka message routed directly from a brod consumer to the processor ? Or are all messages bottlenecked to the producer before ?

Now, I saw that with the default receive_interval, a message sent to Kafka is not delivered directly, (I guess the delay is up to 2000ms because of the default value). What if I set this option to 100 or 1 ? It looks like it works great with low traffic. Would it flood the consumers with constant messages ? I've set min_demand to 1 because I want messages delivered as soon as they are available in Kafka but I am not even sure it has an impact.

I know it sounds that Kafka is not really the right tool here but I did not really have a choice.

Thank you.

Erlkaf as an optional kafka client

Hello there,

Would it be possible (and desireable) to offer an option to use erlkaf instead of brod as the underlying kafka client? We had to move to erlkaf from brod after some issues in our infrastructure and had to let go of broadway for that ๐Ÿ˜ข

I can try to come up with a PR for that if there's no problem.

I'm sorry if this is the wrong place to ask such questions, you can close the issue if that's the case.

[Docs] Documentation about handling failure

Hey there, in the Broadway's documentation says the following https://hexdocs.pm/broadway/Broadway.html#module-acknowledgements-and-failures

If there are no batchers, the acknowledgement will be done by processors.

As well as

Note however, that Broadway does not provide any sort of retries out of the box. This is left completely as a responsibility of the producer

So I am wondering how this producer deals with the failure.

Is there an opportunity to document the topic?

How can I apply after fetch transformations to message list? (before sending it to batchers)

Hi there!

First of all I just want to make clear that I've jumped into this project as part of a learning Elixir exercise I've been working on. So, even though I have a backend background and a bit of Kafka knowledge, I'm quite new into Elixir and it's my first time going through the Broadway library.

A bit of context

I've been writing down a Kafka consumer, that is supposed to consume messages from a topic, which is configured with compaction (it's there to share the state of a specific entity across many services).

The consumer I pretended to write, just takes messages from the topic and upserts or deletes records in a database, assuming the message key as the primary key of the entity.
I've written the consumer in a way that it sends deletions (tombstones), and upserts (messages with data) to separated batchers.

Now the problem

I wanted to apply a logical "compaction" to the given message list just after fetching it, so that I can take only the most recent message of each entity and discard the others that come in the same fetched batch.

To put an example:
If I receive A1 | A2 | B1 | C1 | A3 in a fetch, I only want to process B1 | C1 | A3. So, I would aim for having a callback to apply messages |> reverse |> uniq_by |> reverse as a previous step before sending the different kind of messages to their corresponding batchers.

I've been playing around with the Broadway callbacks, and digging into the documentation, but I didn't get to find the proper way of making it work. So, I ended up forking this project and adding a configurable after_fetch callback that I use this way:
https://github.com/aviscasillas/elixir_kafka_consumer/blob/master/lib/elixir_kafka_consumer/handling/handler.ex#L24

As I'm totally new to this, I'm pretty sure that theres something I missed on how to achieve what I want with the actual implementation of this library, and I would be very thankful if someone could put some light on it ๐Ÿ˜… .

Thanks!

[Question] Obtain metadata in handle_message

Hi!
Thank you for a great library!

I have a use case where we index incoming data by topic, partition, offset, timestamp where I think broadway would be a good fit.
But I can't seem to find a way to get at the metadata at the message level?

A way to access brod client

Hello there,
this is more a question than a issue. Is there a way to access the brod client that's started from broadway?
I need to produce messages back to the same kafka topic that I'm reading with broadway, so starting another brod client process seems a bit overkill.

Latest Release (0.3.1) seems to always start from earliest offset

When upgrading from 0.3.0 -> 0.3.1. I started to notice that when i restart my application, my Broadway Kafka Ingest Pipelines would come back up as well (using the same name and group_id that it was before the application restarted). However now on the new release version when the pipeline comes back up it starts Ingesting from the earliest offset every single time. Which is a major issue.

Did something change with how i need to be setting the Group_id or the Pipeline name ? Or is this just a Regression ?

Multiple processors when having a single partition/producer

I'm in the situation where there is one kafka topic with a single partition that I consume. The definition of this topic is outside of my control, and it's based on a strong requirement of order of messages, for all messages across multiple systems.

But In my consumer in my particular system, I could support concurrency. I have retry logic, waits, dead letter queue, etc that takes care of all the possible problems of not consuming everything in order.
So, I tried setting up the processors to have 10 consumers, even thought there is a single producer

      producer: [
        module: producer_module,
        concurrency: 1
      ],
      processors: [
        default: [
          concurrency: 10
        ]
      ],

but this doesn't work. I'm still consuming every message one by one. My understanding from the docs is because of the single partition, all the messages are being routed to the same processor.
Is there any way of sidestepping this, and forcing the single partition to be processed by multiple processors?

Support for brod_gssapi and other custom brod sasl authenticators

As stated in the documentation the BroadwayKafka.Producer module is able to pass any configuration on the client_config option down to the :brod.start_client/3.

Right now the code contains a validation that restricts the sasl configuration options to the values:

  • {:sasl, {:plain, user, password}}
  • {:sasl, {:scram_sha_256, user, password}}
  • {:sasl, {:scram_sha_512, user, password}}

But :brod also supports using authentication plugins as:

  • {:sasl, {:callback :module, opts}}

For example you can use GSSAPI auth with kerberos by using the brod_gssapi plugin and the following client configuration:

  • {:sasl, {:callback, :brod_gssapi, {:gssapi, keytab, principal}}}

Are you considering to allow this functionality?

Specific client_id for the :brod client

I'd like to contribute a PR that would allow us to set the client_id of the :brod client. This is because when we have many instances of a service (say, k8s PODs) and you graph your consumer usage, I'd like to pass the instance name (POD name) as a prefix to the client_id to identify better possible discrepancies.

Would a PR here be welcomed? Haven't looked deep into the code yet but I believe that would be an extra client config option.

Integration tests instructions and CI

From PR #60 I've noticed we are not actually running the integration tests on CI. Also, the instructions recommend installing Kafka locally which I'd say is not common practice IMHO.

From what I've seen on Ecto, an Earthfile is used for integration testing. I think this could be applied here too or some other approach. In my experience, adding a docker-compose here would make the integration tests much easier and a similar approach would make the CI configuration easier too.

Would a PR with these changes be accepted? If yes, which approach would be preferred here?

Proper way to stop/pause a Pipeline && start again using same consumer_group_id

The scenario i am in is that i am starting a Broadway Pipeline under a DynamicSupervisor and passing all the BroadwayKafka configs to the start_link.

My application gives the option to pause ingestion of a specific topic so how i am handling that currently is that i am calling
DynamicSupervisor.terminate_child(__MODULE__, child_pid) where the child_pid is the pid of the BroadwayPipeline.

But i am running into an issue when starting the BroadwayPipeline again and passing the same exact consumer_group_id.

It logs info logs like:

12:17:49.807 [info] Group member (EventDefinition-70cf5723-1ef5-4d9b-adaa-61093f88b404-e97865e0-f787-11ea-8197-acde48001122,coor=#PID<0.2772.0>,cb=#PID<0.2769.0>,generation=3):
re-joining group, reason::rebalance_in_progress
12:17:49.808 [info] Group member (EventDefinition-70cf5723-1ef5-4d9b-adaa-61093f88b404-e97865e0-f787-11ea-8197-acde48001122,coor=#PID<0.2772.0>,cb=#PID<0.2769.0>,generation=3):
Leaving group, reason: {:noproc, {GenServer, :call, [#PID<0.2769.0>, :drain_after_revoke, :infinity]}}

12:17:49.808 [info] Group member (EventDefinition-70cf5723-1ef5-4d9b-adaa-61093f88b404-e97865e0-f787-11ea-8197-acde48001122,coor=#PID<0.2776.0>,cb=#PID<0.2773.0>,generation=3):
re-joining group, reason::rebalance_in_progress
12:17:49.809 [info] Group member (EventDefinition-70cf5723-1ef5-4d9b-adaa-61093f88b404-e97865e0-f787-11ea-8197-acde48001122,coor=#PID<0.2776.0>,cb=#PID<0.2773.0>,generation=3):
Leaving group, reason: {:noproc, {GenServer, :call, [#PID<0.2773.0>, :drain_after_revoke, :infinity]}}

Then logs error logs:

12:17:49.821 [error] GenServer #PID<0.2772.0> terminating
** (stop) exited in: GenServer.call(#PID<0.2769.0>, :drain_after_revoke, :infinity)
    ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
    (elixir 1.10.3) lib/gen_server.ex:1023: GenServer.call/3
    (broadway_kafka 0.1.4) lib/producer.ex:415: BroadwayKafka.Producer.assignments_revoked/1
    (brod 3.14.0) /Users/amacciola/Desktop/CogilityDev/cogynt-workstation-ingest/deps/brod/src/brod_group_coordinator.erl:477: :brod_group_coordinator.stabilize/3
    (brod 3.14.0) /Users/amacciola/Desktop/CogilityDev/cogynt-workstation-ingest/deps/brod/src/brod_group_coordinator.erl:391: :brod_group_coordinator.handle_info/2
    (stdlib 3.13) gen_server.erl:680: :gen_server.try_dispatch/4
    (stdlib 3.13) gen_server.erl:756: :gen_server.handle_msg/6
    (stdlib 3.13) proc_lib.erl:226: :proc_lib.init_p_do_apply/3

Before starting the pipeline successfully. But the problem is that it starts the pipeline with partition=2 begin_offset=undefined so it start re-ingesting all the kafka data again instead of starting from the last committed offset.

Any help would be appreciated !!

Pipelines rebalancing errors

Hello,

I seem to see a lot of rebalancing errors happening over and over again in my application. Eventually it logs that it has hit its :max_rejoin_attempts and the Genserver crashes. I am not sure that i see anything that is actually effecting the data ingestion happening for theses pipelines but i just wanted to bring this up to see if this was an issue or not

assignments received:[]
10:57:24.193 [info] Group member (EventDefinition-98fae331-542c-4756-973d-2802223fb70b-d70a81a6-58d8-11eb-b1e0-1a845abff810,coor=#PID<0.10875.1>,cb=#PID<0.3029.0>,generation=106):
assignments received:[]
10:57:24.193 [info] Group member (EventDefinition-98fae331-542c-4756-973d-2802223fb70b-d70a81a6-58d8-11eb-b1e0-1a845abff810,coor=#PID<0.10872.1>,cb=#PID<0.2981.0>,generation=106):
assignments received:[]
10:57:24.194 [info] Group member (EventDefinition-98fae331-542c-4756-973d-2802223fb70b-d70a81a6-58d8-11eb-b1e0-1a845abff810,coor=#PID<0.10867.1>,cb=#PID<0.2977.0>,generation=106):
assignments received:[]
10:57:24.195 [info] Group member (EventDefinition-98fae331-542c-4756-973d-2802223fb70b-d70a81a6-58d8-11eb-b1e0-1a845abff810,coor=#PID<0.21458.0>,cb=#PID<0.2973.0>,generation=106):
assignments received:
  insider_threat:
    partition=3 begin_offset=357
10:57:24.195 [info] Group member (EventDefinition-98fae331-542c-4756-973d-2802223fb70b-d70a81a6-58d8-11eb-b1e0-1a845abff810,coor=#PID<0.10863.1>,cb=#PID<0.3005.0>,generation=106):
assignments received:
  insider_threat:
    partition=6 begin_offset=23566
10:57:24.196 [info] Group member (EventDefinition-98fae331-542c-4756-973d-2802223fb70b-d70a81a6-58d8-11eb-b1e0-1a845abff810,coor=#PID<0.22050.0>,cb=#PID<0.2985.0>,generation=106):
assignments received:
  insider_threat:
    partition=5 begin_offset=418
10:57:24.274 [info] Group member (EventDefinition-98fae331-542c-4756-973d-2802223fb70b-d70a81a6-58d8-11eb-b1e0-1a845abff810,coor=#PID<0.10877.1>,cb=#PID<0.7057.1>,generation=106):
re-joining group, reason::unknown_member_id
10:57:27.149 [info] client :"Elixir.Drilldown-35673568-f092dd84-585c-11eb-948c-62ea5af83cf6Pipeline.Broadway.Producer_1.Client": payload connection down kafka.local:9072
reason:{:shutdown, :tcp_closed}
10:57:27.346 [info] client :"Elixir.Drilldown-35673568-f092dd84-585c-11eb-948c-62ea5af83cf6Pipeline.Broadway.Producer_2.Client": payload connection down kafka.local:9072
reason:{:shutdown, :tcp_closed}
10:57:29.194 [info] Group member (EventDefinition-98fae331-542c-4756-973d-2802223fb70b-d70a81a6-58d8-11eb-b1e0-1a845abff810,coor=#PID<0.10875.1>,cb=#PID<0.3029.0>,generation=106):
re-joining group, reason::rebalance_in_progress
10:57:29.195 [info] Group member (EventDefinition-98fae331-542c-4756-973d-2802223fb70b-d70a81a6-58d8-11eb-b1e0-1a845abff810,coor=#PID<0.10872.1>,cb=#PID<0.2981.0>,generation=106):
re-joining group, reason::rebalance_in_progress
10:57:29.195 [info] Group member (EventDefinition-98fae331-542c-4756-973d-2802223fb70b-d70a81a6-58d8-11eb-b1e0-1a845abff810,coor=#PID<0.10867.1>,cb=#PID<0.2977.0>,generation=106):
re-joining group, reason::rebalance_in_progress
10:57:29.196 [info] Group member (EventDefinition-98fae331-542c-4756-973d-2802223fb70b-d70a81a6-58d8-11eb-b1e0-1a845abff810,coor=#PID<0.21458.0>,cb=#PID<0.2973.0>,generation=106):
re-joining group, reason::rebalance_in_progress
10:57:29.197 [info] Group member (EventDefinition-98fae331-542c-4756-973d-2802223fb70b-d70a81a6-58d8-11eb-b1e0-1a845abff810,coor=#PID<0.22050.0>,cb=#PID<0.2985.0>,generation=106):
re-joining group, reason::rebalance_in_progress
10:57:29.197 [info] Group member (EventDefinition-98fae331-542c-4756-973d-2802223fb70b-d70a81a6-58d8-11eb-b1e0-1a845abff810,coor=#PID<0.10863.1>,cb=#PID<0.3005.0>,generation=106):
re-joining group, reason::rebalance_in_progress
10:57:29.266 [info] Group member (Drilldown-35673568-f092dd84-585c-11eb-948c-62ea5af83cf6,coor=#PID<0.9369.1>,cb=#PID<0.3522.0>,generation=100):
Leaving group, reason: :max_rejoin_attempts

10:57:29.266 [error] GenServer #PID<0.9369.1> terminating
** (stop) :max_rejoin_attempts
Last message: {:lo_cmd_stabilize, 5, :rebalance_in_progress}
State: {:state, :"Elixir.Drilldown-35673568-f092dd84-585c-11eb-948c-62ea5af83cf6Pipeline.Broadway.Producer_7.Client", "Drilldown-35673568-f092dd84-585c-11eb-948c-62ea5af83cf6", "'[email protected]'/<0.9369.1>-14121921-36ff-4422-8384-0bf334bfac20", "'[email protected]'/<0.10298.1>-b9fe9ac8-408d-4643-af63-2a0dc1b15e95", 100, ["template_solutions", "template_solution_events"], #PID<0.9371.1>, :undefined, [], false, #PID<0.3522.0>, BroadwayKafka.Producer, [], #Reference<0.958412003.957087746.224270>, :roundrobin_v2, 30, 5, 5, 10, :undefined, :commit_to_kafka_v2, 5, :roundrobin_v2}

Some of these hit generation=300 in attempts. I feel like something is not right here.

Offsets accumulating in the producer ack state

Hi all! ๐Ÿ‘‹

First of all, thanks for the great libraries!

We are running into a strange issue where every now and then (roughly once a day) we start seeing offsets accumulating in the ack state of a producer (for one or more partitions).

From our debugging we understand the ack state values are of shape:

{pending_list, last, seen_list}

Looking into the affected producers we can see the pending and seen lists keep growing indefinitely until the VM is OOM killed.

The producer and processors seem to keep fetching and processing messages as evidenced by the seen list growth. The issue seems to be a small number of messages were never acked and so they remain in the front of the pending list.

We've been digging in the source code of broadway/broadway kafka and cannot find any point where messages/acks can get lost without trace (because we are not seeing any error logs or crash reports).

As a very hacky workaround we considering periodically checking for offset lag and then manually sending an ack message for the missing offset_ids in the front of the pending to the producer if the lag is too big:

for producer <- Broadway.producer_names(MyBroadway) do
  acks = :sys.get_state(producer).state.module_state.acks
  Enum.each(acks, fn {k, {pending, _, seen}} -> 
    missing_ack = MapSet.difference(MapSet.new(pending), MapSet.new(seen))
    
    missing_from_front = Enum.take_while(pending, &MapSet.member?(missing_ack, &1))
    missing_count = Enum.count(missing_from_front)
    
    if Enum.count(seen) > 5000 do
      IO.inspect("#{missing_count} missing ack in the front for partition #{elem(k,2)}")
      send(producer, {:ack, k, missing_from_front})
    end      
  end)
end

This 'could' work for us since we mostly care about being up to date with the topic and we can assume missing a few messages, but it's far from ideal.

In case it's useful, our broadway pipeline is very simple, being roughly:

  def start_link(_opts) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module:
          {BroadwayKafka.Producer,
           [
             hosts: [{"hostname", 9092}],
             group_id: "group_1",
             topics: ["topic"]
           ]},
        concurrency: 8
      ],
      processors: [
        default: [
          concurrency: 32
        ]
      ],
      batchers: [
        publish: [concurrency: 32, batch_size: 50],
        notify_errors: [concurrency: 10, batch_size: 50],
        ignore: [concurrency: 10, batch_size: 500]
      ]
    )
  end

  @impl true
  def handle_message(_, message, _) do
    if interested_in?(Map.new(message.metadata.headers)) do
      case parse_data(message.data) do
        {:ok, parsed_data} ->
          message
          |> Broadway.Message.put_data(parsed_data)
          |> Broadway.Message.put_batcher(:publish)

        {:error, reason} ->
          message
          |> Broadway.Message.put_data(reason)
          |> Broadway.Message.put_batcher(:notify_errors)
      end
    else
      Broadway.Message.put_batcher(message, :ignore)
    end
  end

  @impl true
  def handle_batch(:publish, messages, _batch_info, _context) do
    Enum.each(messages, fn data ->
      publish(data)
    end)

    messages
  end

  def handle_batch(:ignore, messages, _batch_info, _context) do
    messages
  end

  def handle_batch(:notify_errors, messages, _batch_info, _context) do
    Enum.each(messages, fn message ->
      Logger.error("Error processing message", reason: message.data)
    end)

    messages
  end

The error happens even when the interested_in? function returns false, and therefore no processing is done at all, just forwarding to the ignore batcher which does nothing.

Is there anything obvious we are missing?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.