Comments (16)
@josevalim Hello! We tried fix for a week and unfortunately it didn't help. Looks like its stuck on Process.exit
https://github.com/dashbitco/broadway_kafka/blob/main/lib/broadway_kafka/producer.ex#L430
** (exit) exited in: :sys.get_state(Messaging.Events.Kafka.Consumer.Broadway.Producer_1)
** (EXIT) time out
(stdlib 4.0.1) sys.erl:338: :sys.send_system_msg/2
(stdlib 4.0.1) sys.erl:139: :sys.get_state/1
iex:4: (file)
producer stacktrace
{:current_stacktrace,
[
{BroadwayKafka.Producer, :handle_info, 2,
[file: 'lib/broadway_kafka/producer.ex', line: 430]},
{Broadway.Topology.ProducerStage, :handle_info, 2,
[file: 'lib/broadway/topology/producer_stage.ex', line: 229]},
{GenStage, :noreply_callback, 3, [file: 'lib/gen_stage.ex', line: 2117]},
{:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 1120]},
{:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 1197]},
{:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 240]}
]}
coordinator stacktrace
{:current_stacktrace,
[
{:gen, :do_call, 4, [file: 'gen.erl', line: 237]},
{GenServer, :call, 3, [file: 'lib/gen_server.ex', line: 1035]},
{BroadwayKafka.Producer, :"-assignments_revoked/1-fun-1-", 2,
[file: 'lib/broadway_kafka/producer.ex', line: 539]},
{:telemetry, :span, 3,
[file: '/builds/ccs/messaging/deps/telemetry/src/telemetry.erl', line: 321]},
{:brod_group_coordinator, :stabilize, 3,
[
file: '/builds/ccs/messaging/deps/brod/src/brod_group_coordinator.erl',
line: 502
]},
{:brod_group_coordinator, :handle_info, 2,
[
file: '/builds/ccs/messaging/deps/brod/src/brod_group_coordinator.erl',
line: 372
]},
{:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 1120]},
{:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 1197]}
]}
additional info
[
current_function: {:gen, :do_call, 4},
initial_call: {:proc_lib, :init_p, 5},
status: :waiting,
message_queue_len: 2,
links: [],
dictionary: [
"$initial_call": {:brod_group_coordinator, :init, 1},
"$ancestors": [Messaging.Events.Kafka.Consumer.Broadway.Producer_8,
Messaging.Events.Kafka.Consumer.Broadway.ProducerSupervisor,
Messaging.Events.Kafka.Consumer.Broadway.Supervisor,
Messaging.Events.Kafka.Consumer, Messaging.Supervisor, #PID<0.4769.0>]
],
trap_exit: true,
error_handler: :error_handler,
priority: :normal,
group_leader: #PID<0.4768.0>,
total_heap_size: 10961,
heap_size: 4185,
stack_size: 48,
reductions: 19899,
garbage_collection: [
max_heap_size: %{error_logger: true, kill: true, size: 0},
min_bin_vheap_size: 46422,
min_heap_size: 233,
fullsweep_after: 20,
minor_gcs: 7
],
suspending: []
]
[
registered_name: Messaging.Events.Kafka.Consumer.Broadway.Producer_1,
current_function: {BroadwayKafka.Producer, :handle_info, 2},
initial_call: {:proc_lib, :init_p, 5},
status: :waiting,
message_queue_len: 7,
links: [#PID<0.5024.0>],
dictionary: [
{:"$initial_call", {GenStage, :init, 1}},
{:"$ancestors",
[Messaging.Events.Kafka.Consumer.Broadway.ProducerSupervisor,
Messaging.Events.Kafka.Consumer.Broadway.Supervisor,
Messaging.Events.Kafka.Consumer, Messaging.Supervisor, #PID<0.4769.0>]},
{63, []},
{62, []},
{61, []},
{60, []},
{59, []},
{58, []},
{57, []},
{56, []},
{55, []},
{54, []},
{53, []},
{52, []},
{51, []},
{50, []},
{49, []},
{48, []},
{47, []},
{46, []},
{45, []},
{44, []},
{43, []},
{42, []},
{41, []},
{40, []},
{39, []},
{38, []},
{37, []},
{36, []},
{35, []},
{34, []},
{33, []},
{32, []},
{31, []},
{30, []},
{29, []},
{28, []},
{27, []},
{26, []},
{25, []},
{24, ...},
{...},
...
],
trap_exit: true,
error_handler: :error_handler,
priority: :normal,
group_leader: #PID<0.4768.0>,
total_heap_size: 35914,
heap_size: 6772,
stack_size: 28,
reductions: 6413879,
garbage_collection: [
max_heap_size: %{error_logger: true, kill: true, size: 0},
min_bin_vheap_size: 46422,
min_heap_size: 233,
fullsweep_after: 20,
minor_gcs: 8
],
suspending: []
]
from broadway_kafka.
Apologies, I clearly pushed the wrong commit. It is there now.
from broadway_kafka.
Hi @anoskov! I recommend reaching out to ElixirForum for general help/questions, as it is more likely someone with a similar issue as yours can see your report.
from broadway_kafka.
Thank you for answer. I asked a question there.
I noticed that disconnect occurs 10 minutes after connection. This value is simillar to connections.max.idle.ms
and looks like kafka disonnects idle connections. I know that some kafka clients supports reconnect on idle
feature. Does the brod/brodway_kafka support it?
UPD:
brod_client process is alive and BroadwayKafka.BrodClient.connected?/1
says that it is connected and :brod.fetch/4
also returns messages. But in state we see dead_since
iex(admin@)32> :sys.get_state(:erlang.whereis(Admin.Kafka.Consumer.Broadway.Producer_0.Client))
{:state, Admin.Kafka.Consumer.Broadway.Producer_0.Client, [{"kafka", 9092}],
#PID<0.31736.27>,
[
{:conn, {"kafka-0", 9092},
{:dead_since, {1658, 257147, 463027}, {:shutdown, :tcp_closed}}}
], #PID<0.5428.4>, #PID<0.5430.4>, [connect_timeout: 10000],
Admin.Kafka.Consumer.Broadway.Producer_0.Client}
iex(admin@7)33> BroadwayKafka.BrodClient.connected?(Admin.Kafka.Consumer.Broadway.Producer_0.Client)
true
looks like brod client reconnects but consumer doesn't rejoin
UPD2:
I checked Producer state and got timeout on :sys.get_state
because it stuck in handle_info callback on BrodClient.stop_group_coordinator -> :brod_group_coordinator.stop
iex(admin@)13> :erlang.process_info(:erlang.whereis(Admin.Kafka.Consumer.Broadway.Producer_1))
[
registered_name: Admin.Kafka.Consumer.Broadway.Producer_1,
current_function: {:brod_group_coordinator, :stop, 1},
initial_call: {:proc_lib, :init_p, 5},
status: :waiting,
message_queue_len: 4,
links: [#PID<0.3880.0>],
dictionary: [
{63, []},
{62, []},
{61, []},
{60, []},
{59, []},
{58, []},
{57, []},
{56, []},
{55, []},
{54, []},
{53, []},
{52, []},
{51, []},
{50, []},
{49, []},
{:"$initial_call", {GenStage, :init, 1}},
{48, []},
{:"$ancestors",
[Admin.Kafka.Consumer.Broadway.ProducerSupervisor,
Admin.Kafka.Consumer.Broadway.Supervisor, Admin.Kafka.Consumer,
Admin.Supervisor, #PID<0.3569.0>]},
{47, []},
{46, []},
{45, []},
{44, []},
{43, []},
{42, []},
{41, []},
{40, []},
{39, []},
{38, []},
{37, []},
{36, []},
{35, []},
{34, []},
{33, []},
{32, []},
{31, []},
{30, []},
{29, []},
{28, []},
{27, []},
{26, []},
{25, []},
{24, ...},
{...},
...
],
trap_exit: true,
error_handler: :error_handler,
priority: :normal,
group_leader: #PID<0.3568.0>,
total_heap_size: 20338,
heap_size: 2586,
stack_size: 29,
reductions: 20656517,
garbage_collection: [
max_heap_size: %{error_logger: true, kill: true, size: 0},
min_bin_vheap_size: 46422,
min_heap_size: 233,
fullsweep_after: 65535,
minor_gcs: 2858
],
suspending: []
]
from broadway_kafka.
@josevalim @slashmili Hello! It still occurs on 'main' branch.
If for some time consumer doesn't receive message kafka disconnects idle connections. After this broadway producer and brod coordinator deadlock each other
consumer group info
I have no name!@kafka-0:/opt/bitnami/kafka$ ./bin/kafka-consumer-groups.sh --describe --group messaging --bootstrap-server localhost:9092
Consumer group 'messaging' has no active members.
brod client state
{:state, Messaging.Events.Kafka.Consumer.Broadway.Producer_0.Client,
[{"kafka", 9092}], :undefined,
[
{:conn, {"kafka-0.cluster.local", 9092},
{:dead_since, {1675, 174014, 110349}, {:shutdown, :tcp_closed}}}
], #PID<0.12743.1>, #PID<0.12744.1>,
[connect_timeout: 10000, request_timeout: 240000],
Messaging.Events.Kafka.Consumer.Broadway.Producer_0.Client}
trying get producer state
** (exit) exited in: :sys.get_state(#PID<0.12240.1>)
** (EXIT) time out
but it stuck on :brod_group_coordinator.stop
[
registered_name: Messaging.Events.Kafka.Consumer.Broadway.Producer_0,
current_function: {:brod_group_coordinator, :stop, 1},
initial_call: {:proc_lib, :init_p, 5},
status: :waiting,
message_queue_len: 3,
links: [#PID<0.5133.0>],
dictionary: [
{63, []},
{62, []},
{61, []},
{60, []},
{59, []},
{58, []},
{57, []},
{56, []},
{55, []},
{54, []},
{53, []},
{52, []},
{51, []},
{50, []},
{49, []},
{48, []},
{47, []},
{46, []},
{45, []},
{44, []},
{43, []},
{42, []},
{41, []},
{40, []},
{:"$initial_call", {GenStage, :init, 1}},
{39, []},
{38, []},
{37, []},
{:"$ancestors",
[Messaging.Events.Kafka.Consumer.Broadway.ProducerSupervisor,
Messaging.Events.Kafka.Consumer.Broadway.Supervisor,
Messaging.Events.Kafka.Consumer, Messaging.Supervisor, #PID<0.4877.0>]},
{36, []},
{35, []},
{34, []},
{33, []},
{32, []},
{31, []},
{30, []},
{29, []},
{28, []},
{27, []},
{26, []},
{25, []},
{24, ...},
{...},
...
],
trap_exit: true,
error_handler: :error_handler,
priority: :normal,
group_leader: #PID<0.4876.0>,
total_heap_size: 32885,
heap_size: 4185,
stack_size: 29,
reductions: 3299758,
garbage_collection: [
max_heap_size: %{error_logger: true, kill: true, size: 0},
min_bin_vheap_size: 46422,
min_heap_size: 233,
fullsweep_after: 20,
minor_gcs: 7
],
suspending: []
]
coordinator stuck on assigments_revoked call
[
current_function: {:gen, :do_call, 4},
initial_call: {:proc_lib, :init_p, 5},
status: :waiting,
message_queue_len: 3,
links: [],
dictionary: [
"$initial_call": {:brod_group_coordinator, :init, 1},
"$ancestors": [Messaging.Events.Kafka.Consumer.Broadway.Producer_1,
Messaging.Events.Kafka.Consumer.Broadway.ProducerSupervisor,
Messaging.Events.Kafka.Consumer.Broadway.Supervisor,
Messaging.Events.Kafka.Consumer, Messaging.Supervisor, #PID<0.4877.0>]
],
trap_exit: true,
error_handler: :error_handler,
priority: :normal,
group_leader: #PID<0.4876.0>,
total_heap_size: 17734,
heap_size: 6772,
stack_size: 48,
reductions: 19501,
garbage_collection: [
max_heap_size: %{error_logger: true, kill: true, size: 0},
min_bin_vheap_size: 46422,
min_heap_size: 233,
fullsweep_after: 20,
minor_gcs: 3
],
suspending: []
]
coordinator stacktrace
{:current_stacktrace,
[
{:gen, :do_call, 4, [file: 'gen.erl', line: 237]},
{GenServer, :call, 3, [file: 'lib/gen_server.ex', line: 1035]},
{BroadwayKafka.Producer, :"-assignments_revoked/1-fun-1-", 2,
[file: 'lib/broadway_kafka/producer.ex', line: 532]},
{:telemetry, :span, 3,
[file: '/builds/ccs/messaging/deps/telemetry/src/telemetry.erl', line: 320]},
{:brod_group_coordinator, :stabilize, 3,
[
file: '/builds/ccs/messaging/deps/brod/src/brod_group_coordinator.erl',
line: 502
]},
{:brod_group_coordinator, :handle_info, 2,
[
file: '/builds/ccs/messaging/deps/brod/src/brod_group_coordinator.erl',
line: 372
]},
{:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 1120]},
{:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 1197]}
]}
producer stacktrace
{:current_stacktrace,
[
{:brod_group_coordinator, :stop, 1,
[
file: '/builds/ccs/messaging/deps/brod/src/brod_group_coordinator.erl',
line: 311
]},
{BroadwayKafka.Producer, :terminate, 2,
[file: 'lib/broadway_kafka/producer.ex', line: 540]},
{:gen_server, :try_terminate, 3, [file: 'gen_server.erl', line: 1158]},
{:gen_server, :terminate, 10, [file: 'gen_server.erl', line: 1348]},
{:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 240]}
]}
from broadway_kafka.
@anoskov can you please try the main branch again?
from broadway_kafka.
@anoskov can you please try the main branch again?
Thank you! I'll check
from broadway_kafka.
This is very unexpected because Process.exit is, afaik, asynchronous.
from broadway_kafka.
@josevalim maybe stacktrace not accurate and it stucks on https://github.com/dashbitco/broadway_kafka/blob/main/lib/broadway_kafka/producer.ex#L432 ? Although I don't understand how that could be.
from broadway_kafka.
Can you consistently reproduce it now or it only happens from time to time?
from broadway_kafka.
I can't reproduce this manual but it happens few times a week when some of our servers is idle and kafka closes connect.
I tried to emulate two gen servers, where the second calls the first while he process msg in handle_info and do Process.exit for second. And it works good, first server got msg in receive block after Process.exit.
If you need any additional information I can collect it next time
from broadway_kafka.
@josevalim seems to have found the problem. brod_group_coordinator sets trap_exit: true
on start https://github.com/kafka4beam/brod/blob/master/src/brod_group_coordinator.erl#L320
So
Process.exit(pid, reason)
If pid is trapping exits, the exit signal is transformed into a message {:EXIT, from, reason} and delivered to the message queue of pid
coordinator should handle it here
https://github.com/kafka4beam/brod/blob/master/src/brod_group_coordinator.erl#L374-L386
but stucks on call to Producer which waiting for {:DOWN, _, _, ^coord, _}
from coordinator
from broadway_kafka.
@josevalim Hello. Can we just remove receive block in producer's handle_info or he should wait end of exit?
from broadway_kafka.
@anoskov I think we can remove it. I have just pushed a commit that does so, please give it a try.
from broadway_kafka.
@josevalim hm. I still see it in main branch https://github.com/dashbitco/broadway_kafka/blob/main/lib/broadway_kafka/producer.ex#L428
commit doesn't include this 6ef6f41
from broadway_kafka.
Looks like fix helped. Thank you
from broadway_kafka.
Related Issues (20)
- Offsets accumulating in the producer ack state HOT 5
- Support :query_api_versions brod option HOT 1
- Cut release 0.3.6 ? HOT 2
- Consumer Static Membership HOT 9
- the table identifier does not refer to an existing ETS table HOT 5
- Deadlock on race between assigments_revoked call and handle DOWN message HOT 3
- drain_after_revoke failed due to killed process HOT 3
- Producers stuck in :assignments_revoked causing endless group rebalancing HOT 24
- Feature: Add option to set the starting offset for new consumer HOT 6
- Backoff strategy HOT 1
- Manual Partition Assignment HOT 4
- Allow to force consume the topic from the beginning or the end
- Undesirable resource usage related to producer concurrency HOT 8
- Add support for reseting offsets to a specific timestamp HOT 1
- Only one processors receives messages HOT 8
- [Docs] Documentation about handling failure HOT 5
- When a new node joins, two consumers never go to a balancing state HOT 27
- Proposal: Use per partition internal buffer HOT 8
- Question: About a pipeline with a Batcher HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from broadway_kafka.