Giter Site home page Giter Site logo

No rejoin after "payload connection down :shutdown, :tcp_closed}" deadlock on race between assigments_revoked call and handle DOWN message about broadway_kafka HOT 16 CLOSED

anoskov avatar anoskov commented on June 2, 2024
No rejoin after "payload connection down :shutdown, :tcp_closed}" deadlock on race between assigments_revoked call and handle DOWN message

from broadway_kafka.

Comments (16)

anoskov avatar anoskov commented on June 2, 2024 1

@josevalim Hello! We tried fix for a week and unfortunately it didn't help. Looks like its stuck on Process.exit https://github.com/dashbitco/broadway_kafka/blob/main/lib/broadway_kafka/producer.ex#L430

** (exit) exited in: :sys.get_state(Messaging.Events.Kafka.Consumer.Broadway.Producer_1)
    ** (EXIT) time out
    (stdlib 4.0.1) sys.erl:338: :sys.send_system_msg/2
    (stdlib 4.0.1) sys.erl:139: :sys.get_state/1
    iex:4: (file)

producer stacktrace

{:current_stacktrace,
 [
   {BroadwayKafka.Producer, :handle_info, 2,
    [file: 'lib/broadway_kafka/producer.ex', line: 430]},
   {Broadway.Topology.ProducerStage, :handle_info, 2,
    [file: 'lib/broadway/topology/producer_stage.ex', line: 229]},
   {GenStage, :noreply_callback, 3, [file: 'lib/gen_stage.ex', line: 2117]},
   {:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 1120]},
   {:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 1197]},
   {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 240]}
 ]}

coordinator stacktrace

{:current_stacktrace,
 [
   {:gen, :do_call, 4, [file: 'gen.erl', line: 237]},
   {GenServer, :call, 3, [file: 'lib/gen_server.ex', line: 1035]},
   {BroadwayKafka.Producer, :"-assignments_revoked/1-fun-1-", 2,
    [file: 'lib/broadway_kafka/producer.ex', line: 539]},
   {:telemetry, :span, 3,
    [file: '/builds/ccs/messaging/deps/telemetry/src/telemetry.erl', line: 321]},
   {:brod_group_coordinator, :stabilize, 3,
    [
      file: '/builds/ccs/messaging/deps/brod/src/brod_group_coordinator.erl',
      line: 502
    ]},
   {:brod_group_coordinator, :handle_info, 2,
    [
      file: '/builds/ccs/messaging/deps/brod/src/brod_group_coordinator.erl',
      line: 372
    ]},
   {:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 1120]},
   {:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 1197]}
 ]}

additional info

[
  current_function: {:gen, :do_call, 4},
  initial_call: {:proc_lib, :init_p, 5},
  status: :waiting,
  message_queue_len: 2,
  links: [],
  dictionary: [
    "$initial_call": {:brod_group_coordinator, :init, 1},
    "$ancestors": [Messaging.Events.Kafka.Consumer.Broadway.Producer_8,
     Messaging.Events.Kafka.Consumer.Broadway.ProducerSupervisor,
     Messaging.Events.Kafka.Consumer.Broadway.Supervisor,
     Messaging.Events.Kafka.Consumer, Messaging.Supervisor, #PID<0.4769.0>]
  ],
  trap_exit: true,
  error_handler: :error_handler,
  priority: :normal,
  group_leader: #PID<0.4768.0>,
  total_heap_size: 10961,
  heap_size: 4185,
  stack_size: 48,
  reductions: 19899,
  garbage_collection: [
    max_heap_size: %{error_logger: true, kill: true, size: 0},
    min_bin_vheap_size: 46422,
    min_heap_size: 233,
    fullsweep_after: 20,
    minor_gcs: 7
  ],
  suspending: []
]
[
  registered_name: Messaging.Events.Kafka.Consumer.Broadway.Producer_1,
  current_function: {BroadwayKafka.Producer, :handle_info, 2},
  initial_call: {:proc_lib, :init_p, 5},
  status: :waiting,
  message_queue_len: 7,
  links: [#PID<0.5024.0>],
  dictionary: [
    {:"$initial_call", {GenStage, :init, 1}},
    {:"$ancestors",
     [Messaging.Events.Kafka.Consumer.Broadway.ProducerSupervisor,
      Messaging.Events.Kafka.Consumer.Broadway.Supervisor,
      Messaging.Events.Kafka.Consumer, Messaging.Supervisor, #PID<0.4769.0>]},
    {63, []},
    {62, []},
    {61, []},
    {60, []},
    {59, []},
    {58, []},
    {57, []},
    {56, []},
    {55, []},
    {54, []},
    {53, []},
    {52, []},
    {51, []},
    {50, []},
    {49, []},
    {48, []},
    {47, []},
    {46, []},
    {45, []},
    {44, []},
    {43, []},
    {42, []},
    {41, []},
    {40, []},
    {39, []},
    {38, []},
    {37, []},
    {36, []},
    {35, []},
    {34, []},
    {33, []},
    {32, []},
    {31, []},
    {30, []},
    {29, []},
    {28, []},
    {27, []},
    {26, []},
    {25, []},
    {24, ...},
    {...},
    ...
  ],
  trap_exit: true,
  error_handler: :error_handler,
  priority: :normal,
  group_leader: #PID<0.4768.0>,
  total_heap_size: 35914,
  heap_size: 6772,
  stack_size: 28,
  reductions: 6413879,
  garbage_collection: [
    max_heap_size: %{error_logger: true, kill: true, size: 0},
    min_bin_vheap_size: 46422,
    min_heap_size: 233,
    fullsweep_after: 20,
    minor_gcs: 8
  ],
  suspending: []
]

from broadway_kafka.

josevalim avatar josevalim commented on June 2, 2024 1

Apologies, I clearly pushed the wrong commit. It is there now.

from broadway_kafka.

josevalim avatar josevalim commented on June 2, 2024

Hi @anoskov! I recommend reaching out to ElixirForum for general help/questions, as it is more likely someone with a similar issue as yours can see your report.

from broadway_kafka.

anoskov avatar anoskov commented on June 2, 2024

Thank you for answer. I asked a question there.

I noticed that disconnect occurs 10 minutes after connection. This value is simillar to connections.max.idle.ms and looks like kafka disonnects idle connections. I know that some kafka clients supports reconnect on idle feature. Does the brod/brodway_kafka support it?

UPD:
brod_client process is alive and BroadwayKafka.BrodClient.connected?/1 says that it is connected and :brod.fetch/4 also returns messages. But in state we see dead_since

iex(admin@)32> :sys.get_state(:erlang.whereis(Admin.Kafka.Consumer.Broadway.Producer_0.Client))
{:state, Admin.Kafka.Consumer.Broadway.Producer_0.Client, [{"kafka", 9092}],
 #PID<0.31736.27>,
 [
   {:conn, {"kafka-0", 9092},
    {:dead_since, {1658, 257147, 463027}, {:shutdown, :tcp_closed}}}
 ], #PID<0.5428.4>, #PID<0.5430.4>, [connect_timeout: 10000],
 Admin.Kafka.Consumer.Broadway.Producer_0.Client}
iex(admin@7)33> BroadwayKafka.BrodClient.connected?(Admin.Kafka.Consumer.Broadway.Producer_0.Client)
true

looks like brod client reconnects but consumer doesn't rejoin

UPD2:

I checked Producer state and got timeout on :sys.get_state because it stuck in handle_info callback on BrodClient.stop_group_coordinator -> :brod_group_coordinator.stop

iex(admin@)13> :erlang.process_info(:erlang.whereis(Admin.Kafka.Consumer.Broadway.Producer_1))
[
  registered_name: Admin.Kafka.Consumer.Broadway.Producer_1,
  current_function: {:brod_group_coordinator, :stop, 1},
  initial_call: {:proc_lib, :init_p, 5},
  status: :waiting,
  message_queue_len: 4,
  links: [#PID<0.3880.0>],
  dictionary: [
    {63, []},
    {62, []},
    {61, []},
    {60, []},
    {59, []},
    {58, []},
    {57, []},
    {56, []},
    {55, []},
    {54, []},
    {53, []},
    {52, []},
    {51, []},
    {50, []},
    {49, []},
    {:"$initial_call", {GenStage, :init, 1}},
    {48, []},
    {:"$ancestors",
     [Admin.Kafka.Consumer.Broadway.ProducerSupervisor,
      Admin.Kafka.Consumer.Broadway.Supervisor, Admin.Kafka.Consumer,
      Admin.Supervisor, #PID<0.3569.0>]},
    {47, []},
    {46, []},
    {45, []},
    {44, []},
    {43, []},
    {42, []},
    {41, []},
    {40, []},
    {39, []},
    {38, []},
    {37, []},
    {36, []},
    {35, []},
    {34, []},
    {33, []},
    {32, []},
    {31, []},
    {30, []},
    {29, []},
    {28, []},
    {27, []},
    {26, []},
    {25, []},
    {24, ...},
    {...},
    ...
  ],
  trap_exit: true,
  error_handler: :error_handler,
  priority: :normal,
  group_leader: #PID<0.3568.0>,
  total_heap_size: 20338,
  heap_size: 2586,
  stack_size: 29,
  reductions: 20656517,
  garbage_collection: [
    max_heap_size: %{error_logger: true, kill: true, size: 0},
    min_bin_vheap_size: 46422,
    min_heap_size: 233,
    fullsweep_after: 65535,
    minor_gcs: 2858
  ],
  suspending: []
]

from broadway_kafka.

anoskov avatar anoskov commented on June 2, 2024

@josevalim @slashmili Hello! It still occurs on 'main' branch.
If for some time consumer doesn't receive message kafka disconnects idle connections. After this broadway producer and brod coordinator deadlock each other

consumer group info

I have no name!@kafka-0:/opt/bitnami/kafka$ ./bin/kafka-consumer-groups.sh --describe --group messaging --bootstrap-server localhost:9092
Consumer group 'messaging' has no active members.

brod client state

{:state, Messaging.Events.Kafka.Consumer.Broadway.Producer_0.Client,
 [{"kafka", 9092}], :undefined,
 [
   {:conn, {"kafka-0.cluster.local", 9092},
    {:dead_since, {1675, 174014, 110349}, {:shutdown, :tcp_closed}}}
 ], #PID<0.12743.1>, #PID<0.12744.1>,
 [connect_timeout: 10000, request_timeout: 240000],
 Messaging.Events.Kafka.Consumer.Broadway.Producer_0.Client}

trying get producer state

** (exit) exited in: :sys.get_state(#PID<0.12240.1>)
    ** (EXIT) time out

but it stuck on :brod_group_coordinator.stop

[
  registered_name: Messaging.Events.Kafka.Consumer.Broadway.Producer_0,
  current_function: {:brod_group_coordinator, :stop, 1},
  initial_call: {:proc_lib, :init_p, 5},
  status: :waiting,
  message_queue_len: 3,
  links: [#PID<0.5133.0>],
  dictionary: [
    {63, []},
    {62, []},
    {61, []},
    {60, []},
    {59, []},
    {58, []},
    {57, []},
    {56, []},
    {55, []},
    {54, []},
    {53, []},
    {52, []},
    {51, []},
    {50, []},
    {49, []},
    {48, []},
    {47, []},
    {46, []},
    {45, []},
    {44, []},
    {43, []},
    {42, []},
    {41, []},
    {40, []},
    {:"$initial_call", {GenStage, :init, 1}},
    {39, []},
    {38, []},
    {37, []},
    {:"$ancestors",
     [Messaging.Events.Kafka.Consumer.Broadway.ProducerSupervisor,
      Messaging.Events.Kafka.Consumer.Broadway.Supervisor,
      Messaging.Events.Kafka.Consumer, Messaging.Supervisor, #PID<0.4877.0>]},
    {36, []},
    {35, []},
    {34, []},
    {33, []},
    {32, []},
    {31, []},
    {30, []},
    {29, []},
    {28, []},
    {27, []},
    {26, []},
    {25, []},
    {24, ...},
    {...},
    ...
  ],
  trap_exit: true,
  error_handler: :error_handler,
  priority: :normal,
  group_leader: #PID<0.4876.0>,
  total_heap_size: 32885,
  heap_size: 4185,
  stack_size: 29,
  reductions: 3299758,
  garbage_collection: [
    max_heap_size: %{error_logger: true, kill: true, size: 0},
    min_bin_vheap_size: 46422,
    min_heap_size: 233,
    fullsweep_after: 20,
    minor_gcs: 7
  ],
  suspending: []
]

coordinator stuck on assigments_revoked call

[
  current_function: {:gen, :do_call, 4},
  initial_call: {:proc_lib, :init_p, 5},
  status: :waiting,
  message_queue_len: 3,
  links: [],
  dictionary: [
    "$initial_call": {:brod_group_coordinator, :init, 1},
    "$ancestors": [Messaging.Events.Kafka.Consumer.Broadway.Producer_1,
     Messaging.Events.Kafka.Consumer.Broadway.ProducerSupervisor,
     Messaging.Events.Kafka.Consumer.Broadway.Supervisor,
     Messaging.Events.Kafka.Consumer, Messaging.Supervisor, #PID<0.4877.0>]
  ],
  trap_exit: true,
  error_handler: :error_handler,
  priority: :normal,
  group_leader: #PID<0.4876.0>,
  total_heap_size: 17734,
  heap_size: 6772,
  stack_size: 48,
  reductions: 19501,
  garbage_collection: [
    max_heap_size: %{error_logger: true, kill: true, size: 0},
    min_bin_vheap_size: 46422,
    min_heap_size: 233,
    fullsweep_after: 20,
    minor_gcs: 3
  ],
  suspending: []
]

coordinator stacktrace

{:current_stacktrace,
 [
   {:gen, :do_call, 4, [file: 'gen.erl', line: 237]},
   {GenServer, :call, 3, [file: 'lib/gen_server.ex', line: 1035]},
   {BroadwayKafka.Producer, :"-assignments_revoked/1-fun-1-", 2,
    [file: 'lib/broadway_kafka/producer.ex', line: 532]},
   {:telemetry, :span, 3,
    [file: '/builds/ccs/messaging/deps/telemetry/src/telemetry.erl', line: 320]},
   {:brod_group_coordinator, :stabilize, 3,
    [
      file: '/builds/ccs/messaging/deps/brod/src/brod_group_coordinator.erl',
      line: 502
    ]},
   {:brod_group_coordinator, :handle_info, 2,
    [
      file: '/builds/ccs/messaging/deps/brod/src/brod_group_coordinator.erl',
      line: 372
    ]},
   {:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 1120]},
   {:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 1197]}
 ]}

producer stacktrace

{:current_stacktrace,
 [
   {:brod_group_coordinator, :stop, 1,
    [
      file: '/builds/ccs/messaging/deps/brod/src/brod_group_coordinator.erl',
      line: 311
    ]},
   {BroadwayKafka.Producer, :terminate, 2,
    [file: 'lib/broadway_kafka/producer.ex', line: 540]},
   {:gen_server, :try_terminate, 3, [file: 'gen_server.erl', line: 1158]},
   {:gen_server, :terminate, 10, [file: 'gen_server.erl', line: 1348]},
   {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 240]}
 ]}

from broadway_kafka.

josevalim avatar josevalim commented on June 2, 2024

@anoskov can you please try the main branch again?

from broadway_kafka.

anoskov avatar anoskov commented on June 2, 2024

@anoskov can you please try the main branch again?

Thank you! I'll check

from broadway_kafka.

josevalim avatar josevalim commented on June 2, 2024

This is very unexpected because Process.exit is, afaik, asynchronous.

from broadway_kafka.

anoskov avatar anoskov commented on June 2, 2024

@josevalim maybe stacktrace not accurate and it stucks on https://github.com/dashbitco/broadway_kafka/blob/main/lib/broadway_kafka/producer.ex#L432 ? Although I don't understand how that could be.

from broadway_kafka.

josevalim avatar josevalim commented on June 2, 2024

Can you consistently reproduce it now or it only happens from time to time?

from broadway_kafka.

anoskov avatar anoskov commented on June 2, 2024

I can't reproduce this manual but it happens few times a week when some of our servers is idle and kafka closes connect.

I tried to emulate two gen servers, where the second calls the first while he process msg in handle_info and do Process.exit for second. And it works good, first server got msg in receive block after Process.exit.

If you need any additional information I can collect it next time

from broadway_kafka.

anoskov avatar anoskov commented on June 2, 2024

@josevalim seems to have found the problem. brod_group_coordinator sets trap_exit: true on start https://github.com/kafka4beam/brod/blob/master/src/brod_group_coordinator.erl#L320

So

Process.exit(pid, reason)
If pid is trapping exits, the exit signal is transformed into a message {:EXIT, from, reason} and delivered to the message queue of pid

coordinator should handle it here
https://github.com/kafka4beam/brod/blob/master/src/brod_group_coordinator.erl#L374-L386
but stucks on call to Producer which waiting for {:DOWN, _, _, ^coord, _} from coordinator

from broadway_kafka.

anoskov avatar anoskov commented on June 2, 2024

@josevalim Hello. Can we just remove receive block in producer's handle_info or he should wait end of exit?

from broadway_kafka.

josevalim avatar josevalim commented on June 2, 2024

@anoskov I think we can remove it. I have just pushed a commit that does so, please give it a try.

from broadway_kafka.

anoskov avatar anoskov commented on June 2, 2024

@josevalim hm. I still see it in main branch https://github.com/dashbitco/broadway_kafka/blob/main/lib/broadway_kafka/producer.ex#L428
commit doesn't include this 6ef6f41

from broadway_kafka.

anoskov avatar anoskov commented on June 2, 2024

Looks like fix helped. Thank you

from broadway_kafka.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.