edenhill / kcat Goto Github PK
View Code? Open in Web Editor NEWGeneric command line non-JVM Apache Kafka producer and consumer
License: Other
Generic command line non-JVM Apache Kafka producer and consumer
License: Other
Hi,I have used the kafkacat for a long time,but I find a error recently.
I hava some example messages,see below:
0xxxxxxxxxxxx
1xxxx\nxxxxx\n
2xxxx\nxxxxx\n
3xxxx\nxxxxx\n
when I produce the messages with kafkacat and then I consume the messages with kafkacat,I find some messages have been discarded.what's you advice here?Thanks.
Trying to run configure on Ubuntu 10.04 results in errors such as:
checking for PIC (by compile)...mktemp: too few X's in template `_mkltmpXXXX.c'
On this platform (and many older Linux distributions), mktemp(1) requires the template "X" characters to be at the end of the filename.
Hiya. I'm running into something where the leader/follower/ISR reported by kafkacat doesn't match that reported by kafka-topics.
Any ideas? Even more confusingly: I'm logged in to broker 1002/kafka900f5013 in two shells right now. For the past hour or so kafkacat was reporting the correct information (partition 0, leader 1002, replicas: 1002, isrs: 1002) in one of the shells, and incorrect information in the other shell (partition 0, leader 1004, replicas: 1004, isrs: 1004). Consistently and repeatedly. It was just while I was typing up this git issue that they all started reporting the same incorrect information as the rest of the nodes.
So Kafkacat thinks the partition is being served by broker 1004:
PRODUCTION ubuntu@bastion-0:~$ dsh -g kafka-production -c -M -- "kafkacat -L -b localhost:9092 -t hound-prod.retriever.mutation"
kafka-1bc7c4e2: Metadata for hound-prod.retriever.mutation (from broker -1: localhost:9092/bootstrap):
kafka-1bc7c4e2: 4 brokers:
kafka-1bc7c4e2: broker 1003 at ip-10-0-246-113.ec2.internal:9092
kafka-1bc7c4e2: broker 1004 at ip-10-0-190-45.ec2.internal:9092
kafka-1bc7c4e2: broker 1001 at ip-10-0-148-52.ec2.internal:9092
kafka-1bc7c4e2: broker 1002 at ip-10-0-207-186.ec2.internal:9092
kafka-1bc7c4e2: 1 topics:
kafka-1bc7c4e2: topic "hound-prod.retriever.mutation" with 1 partitions:
kafka-1bc7c4e2: partition 0, leader 1004, replicas: 1004, isrs: 1004
kafka-d4b7674e: Metadata for hound-prod.retriever.mutation (from broker -1: localhost:9092/bootstrap):
kafka-d4b7674e: 4 brokers:
kafka-d4b7674e: broker 1003 at ip-10-0-246-113.ec2.internal:9092
kafka-d4b7674e: broker 1004 at ip-10-0-190-45.ec2.internal:9092
kafka-d4b7674e: broker 1001 at ip-10-0-148-52.ec2.internal:9092
kafka-d4b7674e: broker 1002 at ip-10-0-207-186.ec2.internal:9092
kafka-d4b7674e: 1 topics:
kafka-d4b7674e: topic "hound-prod.retriever.mutation" with 1 partitions:
kafka-d4b7674e: partition 0, leader 1004, replicas: 1004, isrs: 1004
kafka-900f5013: Metadata for hound-prod.retriever.mutation (from broker -1: localhost:9092/bootstrap):
kafka-900f5013: 4 brokers:
kafka-900f5013: broker 1003 at ip-10-0-246-113.ec2.internal:9092
kafka-900f5013: broker 1004 at ip-10-0-190-45.ec2.internal:9092
kafka-900f5013: broker 1001 at ip-10-0-148-52.ec2.internal:9092
kafka-900f5013: broker 1002 at ip-10-0-207-186.ec2.internal:9092
kafka-900f5013: 1 topics:
kafka-900f5013: topic "hound-prod.retriever.mutation" with 1 partitions:
kafka-900f5013: partition 0, leader 1004, replicas: 1004, isrs: 1004
kafka-88e6bc0c: Metadata for hound-prod.retriever.mutation (from broker -1: localhost:9092/bootstrap):
kafka-88e6bc0c: 4 brokers:
kafka-88e6bc0c: broker 1003 at ip-10-0-246-113.ec2.internal:9092
kafka-88e6bc0c: broker 1004 at ip-10-0-190-45.ec2.internal:9092
kafka-88e6bc0c: broker 1001 at ip-10-0-148-52.ec2.internal:9092
kafka-88e6bc0c: broker 1002 at ip-10-0-207-186.ec2.internal:9092
kafka-88e6bc0c: 1 topics:
kafka-88e6bc0c: topic "hound-prod.retriever.mutation" with 1 partitions:
kafka-88e6bc0c: partition 0, leader 1004, replicas: 1004, isrs: 1004
And kafka-topics thinks the partition is being served by broker 1002.
```PRODUCTION ubuntu@bastion-0:~$ dsh -g kafka-production -c -M -- "kafka-topics --zookeeper zk1:2181,zk2:2181,zk3:2181/kafka --topic hound-prod.retriever-mutation --describe"
kafka-d4b7674e: Topic:hound-prod.retriever-mutation PartitionCount:1 ReplicationFactor:1 Configs:
kafka-d4b7674e: Topic: hound-prod.retriever-mutation Partition: 0 Leader: 1002 Replicas: 1002 Isr: 1002
kafka-1bc7c4e2: Topic:hound-prod.retriever-mutation PartitionCount:1 ReplicationFactor:1 Configs:
kafka-1bc7c4e2: Topic: hound-prod.retriever-mutation Partition: 0 Leader: 1002 Replicas: 1002 Isr: 1002
kafka-88e6bc0c: Topic:hound-prod.retriever-mutation PartitionCount:1 ReplicationFactor:1 Configs:
kafka-88e6bc0c: Topic: hound-prod.retriever-mutation Partition: 0 Leader: 1002 Replicas: 1002 Isr: 1002
kafka-900f5013: Topic:hound-prod.retriever-mutation PartitionCount:1 ReplicationFactor:1 Configs:
kafka-900f5013: Topic: hound-prod.retriever-mutation Partition: 0 Leader: 1002 Replicas: 1002 Isr: 1002
PRODUCTION ubuntu@bastion-0:~$
Any ideas?
Just looking at the data sizes on disk, it's definitely writing to broker 1002, not 1004.
From librdkafka configuration guide:
The metadata is automatically refreshed on error and connect.
On each kafkacat run, librdkafka loads the Kafka cluster metadata. This has a significant inbound traffic impact in situations where kafkacat is triggered frequently on a cluster with many highly partitioned topics.
Is there a way to cache the metadata in between kafkacat runs so that each kafkacat run doesn't always load a significant amount of metadata on start?
-S (status) could be used to query the cluster's status, e.g., if all brokers are up.
Build with MSVC.
Requires that the oct15 branch of librdkafka is merged to master.
Should provide prebuilt binaries for windows, possibly with installer.
Though app writes messages to Kafka, it does throw the following erorr messages at startup. Not sure what exactly is the issue but any inputs will help.
%3|1418300191.571|FAIL|rdkafka#producer-0| localhost:9092/bootstrap: Failed to connect to broker at [localhost]:9092: Connection refused
%3|1418300191.571|ERROR|rdkafka#producer-0| localhost:9092/bootstrap: Failed to connect to broker at [localhost]:9092: Connection refused
%3|1418300191.571|ERROR|rdkafka#producer-0| 1/1 brokers are down
I would like to use kafkacat to stream from a topic without buffering but I can't figure out the right settings to make that happen. To illustrate using kafka_2.8.0-0.8.0
:
ZOOKEEPER=192.168.33.10:2181 # whatever it may be
BROKERS=192.168.33.10:6667 # whatever it may be
# create the topic
kafka-create-topic.sh --zookeeper $ZOOKEEPER --replica 1 --partition 1 --topic example
# create a stream
ruby -e "STDOUT.sync=true; 10000.times {|i| puts i; sleep 0.2}" |
kafka-console-producer.sh --broker-list $BROKERS --sync --topic example
If you consume the stream like this you'll see output tick along smoothly with no apparent delay.
kafka-console-consumer.sh --zookeeper $ZOOKEEPER --topic example
If you consume the stream with kafkacat in these variations the outputs jerk forward then pause for a second, then jerk forward again. All the output is there but it isn't streaming smoothly.
./kafkacat -t example -b $BROKERS
./kafkacat -t example -b $BROKERS -u
These variations make the outputs tick along in regular 1s intervals, but again the output isn't smooth. There appears to be some kind of fetch delay.
./kafkacat -t example -b $BROKERS -u -X fetch.wa.max.ms=1
./kafkacat -t example -b $BROKERS -u -X fetch.wait.max.ms=1 -X fetch.error.backoff.ms=1
I've just been kinda guessing at options with no success. Is there a way to get kafkacat to smoothly stream the topic a-la kafka-console-consumer.sh
?
- removed high-level-consumer as a mode.
downgraded -G to -g as a switch in -C and -L modes.
- added metadata-list mode for consumergroups
with retrieval of watermarks and stored offsets.
this is what I get now when running kafkcat after doing ./bootstrap
kafkacat: error while loading shared libraries: librdkafka.so.1: cannot open shared object file: No such file or directory
was working before recent changes
when i execute kafka-topics --describe --zookeeper localhost --topic mytopic, it returnsPartitionCount:10 ReplicationFactor:1 Configs:cleanup.policy=compact .
it also display other details about leader and replicas:
Topic: myTopic Partition: 0 Leader: 1 Replicas: 1 Isr: 1
it would be useful if kafkacat return this kind of details, especially the cleanup policy in case of log compaction usage.
best regards,
Charles.
broker port number may not always be 9092.
The Apache Kafka community is trying to consolidate Apache Kafka client discussion mailings, questions, issues and etc to a single list https://groups.google.com/forum/m/#!forum/kafka-clients would be great if you would join in too and drive discussions for Kafka clients there.... also for librdkafka too
After upgrading to kafkacat 1.3.0 and librdkafka 0.9.1 I get the following error in consumer mode:
%4|1467207129.009|PROTOERR|rdkafka#consumer-1| <ip>:9092/1: Protocol parse failure at rd_kafka_fetch_reply_handle:3748 (incorrect broker.version.fallback?)
%4|1467207129.009|PROTOERR|rdkafka#consumer-1| <ip>:9092/3: Protocol parse failure at rd_kafka_fetch_reply_handle:3748 (incorrect broker.version.fallback?)
%4|1467207129.009|PROTOERR|rdkafka#consumer-1| <ip>:9092/3: expected 36681250 bytes > 150 remaining bytes
%4|1467207129.009|PROTOERR|rdkafka#consumer-1| <ip>:9092/2: Protocol parse failure at rd_kafka_fetch_reply_handle:3748 (incorrect broker.version.fallback?)
Any idea what this means?
I have 2 kafka brokers running in my test environment. And I am running a kafkacat as producer and another one as consumer. The test topic is setup to have replication-factor of 2.
In my test, I killed one of the broker while cating multiple files into the kafkacat producer. Sometimes, the consumer got all the messages, but it missed 5-10% of the messages.
Is there any producer specific parameters that I missed? I am hoping at least kafkacat returns an exit code of non-zero when messages were lost.
I think the problem probably is in librdkafka. When there is missing events, I dont even see dr_msg_cb in kafkacat.c being called. So feel like librdkafka did not notify kafkacat about the issue.
Apparently, all partitions must timeout simultaneously in order for this flag to work. If topic has many active partitions the chance of this is low and kafkacat continues to work indefinitely.
Looks like if this flag is set kafkacat should to pin down on start latest offsets for each of partition and exit once all offsets are reached.
It looks like -o flag applies only up to kafka storage blocks, or some other caching is going on, e.g.
$ for i in {01..05}; do echo -n "$i " ; sh -c "~/kafkacat/kafkacat -C -b localhost:9092 -t topic -K\' -o 98564$i -p 0 -c 3 -u | md5sum" ; done
01 d1e6071308960d3d04c9ff911ef32516 -
02 d1e6071308960d3d04c9ff911ef32516 -
$ for i in {01..05}; do echo -n "$i " ; sh -c "~/kafkacat/kafkacat -C -b localhost:9092 -t topic -K\' -o 98574$i -p 0 -c 3 -u | md5sum" ; done
01 372ec3fd9d054638ff817f73a4c50007 -
02 372ec3fd9d054638ff817f73a4c50007 -
Messages were loaded in kafka in batches with snappy compression, latest kafkacat used.
Kafka new consumer support would be great, but the released version is only 1.2.0
Is it possible to release a beta or a release candidate ?
By default, stdout is buffered. If writing out to a terminal, it will flush after each newline. However, if the stdout is redirected, it won't flush on each newline. It can be flushed manually or the buffer can be disabled.
Something like this could fix the problem:
int main (int argc, char **argv) {
setbuf(stdout, NULL);
Or the buffer can be flushed manually after each write.
I made this an issue rather than a PR because I'm not sure of the best way to handle it.
For one of my topics, kafkacat is showing a different list of isrs than both the kafka-topics.sh script and kafkat (the AirBnb tool). I'm wondering if there is a bug in kafkacat.
See below, partition 1 shows isrs = 4 when using kafkacat, but isrs = 4,2,3 when using the others.
jcheng:~ jcheng$ docker run --rm -it kafkacat kafkacat -L -b 10.100.222.65 -t engrcloud.core.kafka.install-test
Metadata for engrcloud.core.kafka.install-test (from broker -1: 10.100.222.65:9092/bootstrap):
5 brokers:
broker 5 at 10.100.222.69:9092
broker 1 at 10.100.222.65:9092
broker 2 at 10.100.222.66:9092
broker 3 at 10.100.222.67:9092
broker 4 at 10.100.222.68:9092
1 topics:
topic "engrcloud.core.kafka.install-test" with 3 partitions:
partition 2, leader 5, replicas: 5,3,4, isrs: 4,5,3
partition 1, leader 4, replicas: 4,2,3, isrs: 4
partition 0, leader 3, replicas: 3,1,2, isrs: 1,2,3
vs
jcheng:~ jcheng$ docker run --rm -it wurstmeister/kafka /opt/kafka_2.10-0.8.2.1/bin/kafka-topics.sh --zookeeper=10.100.222.69:2181 --describe --topic engrcloud.core.kafka.install-test
Topic:engrcloud.core.kafka.install-test PartitionCount:3 ReplicationFactor:3 Configs:
Topic: engrcloud.core.kafka.install-test Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 1,2,3
Topic: engrcloud.core.kafka.install-test Partition: 1 Leader: 4 Replicas: 4,2,3 Isr: 4,2,3
Topic: engrcloud.core.kafka.install-test Partition: 2 Leader: 5 Replicas: 5,3,4 Isr: 4,5,3
vs
jcheng:~ jcheng$ docker run -it --rm kafkat sh -c 'echo { \"zk_path\": \"10.100.222.65:2181\" } > /etc/kafkatcfg; kafkat partitions engrcloud.core.kafka.install-test'
Topic Partition Leader Replicas ISRs
engrcloud.core.kafka.install-test0 3 [3, 1, 2] [1, 2, 3]
engrcloud.core.kafka.install-test1 4 [4, 2, 3] [4, 2, 3]
engrcloud.core.kafka.install-test2 5 [5, 3, 4] [4, 5, 3]
Is there any debugging that I can supply?
Hello,
When kafkacat tries to fetch a message bigger than librdkafka fetch.message.max.bytes, kafkacat loops forever and consumes all bandwidth and cpu.
Instead it should throttle / print a message / return?
Hi!
Currently, if we do kafkacat -C | someprogram
, even when the target program is handling unbuffered input, we don't get the last few received messages because the libc is buffering fwrite()
output when it is not a terminal. Adding a fflush(fp)
solves this issue.
Here is the use-case. A topic with say 150 messages.
$kafkacat -b broker -C -q -o -1000 -e -c 1000 -t topic | wc -l
0
$kafkacat -b broker -C -q -o -100 -e -c 1000 -t topic | wc -l
100
What this shows is that if the negative offset is larger than available it doesnt return anything. Semantically I believe this should just return from the beginning then.
On OS X 10.10.2, bootstrap.sh doesn't yield a statically linked binary, because libyajl's static library does not match the expected naming conventions.
In bootstrap.sh:
export STATIC_LIB_yajl="tmp-bootstrap/usr/local/lib/libyajl.a"
However, in the filesystem:
$ ls -1 tmp-bootstrap/usr/local/lib/libyajl*
tmp-bootstrap/usr/local/lib/libyajl.2.1.1.dylib
tmp-bootstrap/usr/local/lib/libyajl.2.dylib
tmp-bootstrap/usr/local/lib/libyajl.dylib
tmp-bootstrap/usr/local/lib/libyajl_s.a
Notice that the libyajl static library is libyajl_s.a, not libyajl.a.
After compilation, you can see that librdkafka was statically linked in, but not libyajl.
$ otool -L kafkacat
kafkacat:
/usr/lib/libSystem.B.dylib (compatibility version 1.0.0, current version 1213.0.0)
/usr/lib/libz.1.dylib (compatibility version 1.0.0, current version 1.2.5)
/usr/local/lib/libyajl.2.dylib (compatibility version 2.0.0, current version 2.1.1)
I worked around it by manually running gcc, and replacing "-lyajl" with "tmp-bootstrap/usr/local/lib/libyajl_s.a"
$ gcc -Itmp-bootstrap/usr/local/include -g -O2 -Wall -Werror -Wfloat-equal -Wpointer-arith -Itmp-bootstrap/usr/local/include -g -O2 -Wall -Werror -Wfloat-equal -Wpointer-arith -Ltmp-bootstrap/usr/local/lib -Ltmp-bootstrap/usr/local/lib kafkacat.o format.o json.o -o kafkacat tmp-bootstrap/usr/local/lib/librdkafka.a -lpthread -lz tmp-bootstrap/usr/local/lib/libyajl_s.a
$ otool -L kafkacat
kafkacat:
/usr/lib/libSystem.B.dylib (compatibility version 1.0.0, current version 1213.0.0)
/usr/lib/libz.1.dylib (compatibility version 1.0.0, current version 1.2.5)
$
(version 1.2.0) when I run:
$ kafkacat -b mybroker -t syslog -J
It gives me:
kafkacat: illegal option -- J
Magnus, great tool, really love it!!!
I am trying to read from a specific offset on the consumer side but it seems to always just display everything
kafkacat -b xyz -t abc -p 0 -o 8
also is there a way to do something like from x offset until y offset and not just from x till end?
#/bin/sh
cat /tmp/test | kafkacat-CentOS-6.5-x86_64 -P -b badhost -t RawEvents -p 1 -X socket.timeout.ms=1000 -X topic.message.timeout.ms=1000 -X debug=all
echo "Exit Code:" $?
The output is
%7|1429923085.924|BROKER|rdkafka#producer-0| badhost:9092/bootstrap: Added new broker with NodeId -1
%7|1429923085.924|BRKMAIN|rdkafka#producer-0| badhost:9092/bootstrap: Enter main broker thread
%7|1429923085.924|TOPIC|rdkafka#producer-0| New local topic: RawEvents
%7|1429923085.924|CONNECT|rdkafka#producer-0| badhost:9092/bootstrap: broker in state INIT connecting
%3|1429923085.929|ERROR|rdkafka#producer-0| Failed to resolve 'badhost:9092': Name or service not known
%3|1429923085.929|GETADDR|rdkafka#producer-0| badhost:9092/bootstrap: Failed to resolve 'badhost:9092': Name or service not known
%7|1429923086.929|CONNECT|rdkafka#producer-0| badhost:9092/bootstrap: broker in state INIT connecting
%3|1429923086.929|ERROR|rdkafka#producer-0| Failed to resolve 'badhost:9092': Name or service not known
%3|1429923086.929|GETADDR|rdkafka#producer-0| badhost:9092/bootstrap: Failed to resolve 'badhost:9092': Name or service not known
%7|1429923087.924|TIMEOUT|rdkafka#producer-0| 6 message(s) from 1 toppar(s) timed out
%7|1429923087.929|CONNECT|rdkafka#producer-0| badhost:9092/bootstrap: broker in state INIT connecting
%3|1429923087.929|ERROR|rdkafka#producer-0| Failed to resolve 'badhost:9092': Name or service not known
%3|1429923087.929|GETADDR|rdkafka#producer-0| badhost:9092/bootstrap: Failed to resolve 'badhost:9092': Name or service not known
%7|1429923087.934|DESTROY|rdkafka#producer-0| Terminating instance
%7|1429923087.934|BROKERFAIL|rdkafka#producer-0| badhost:9092/bootstrap: failed: err: Local: Broker handle destroyed: (errno: Interrupted system call)
%7|1429923087.934|STATE|rdkafka#producer-0| badhost:9092/bootstrap: Broker changed state INIT -> DOWN
%7|1429923087.935|BUFQ|rdkafka#producer-0| badhost:9092/bootstrap: Purging bufq with 0 buffers
Exit Code: 0
I was expecting to see a non-zero exit code.
#/bin/sh
cat /tmp/test | kafkacat-CentOS-6.5-x86_64 -P -b localhost -t test-topic -p 1 -X socket.timeout.ms=1000 -X topic.message.timeout.ms=1000 -X debug=all
echo "Exit Code:" $?
The output is
%7|1429923690.023|BROKER|rdkafka#producer-0| localhost:9092/bootstrap: Added new broker with NodeId -1
%7|1429923690.023|BRKMAIN|rdkafka#producer-0| localhost:9092/bootstrap: Enter main broker thread
%7|1429923690.023|TOPIC|rdkafka#producer-0| New local topic: test-topic
%7|1429923690.023|CONNECT|rdkafka#producer-0| localhost:9092/bootstrap: broker in state INIT connecting
%7|1429923690.024|CONNECT|rdkafka#producer-0| localhost:9092/bootstrap: couldn't connect to ipv4#127.0.0.1:9092: Connection refused
%7|1429923690.024|BROKERFAIL|rdkafka#producer-0| localhost:9092/bootstrap: failed: err: Local: Broker transport failure: (errno: Connection refused)
%3|1429923690.024|FAIL|rdkafka#producer-0| localhost:9092/bootstrap: Failed to connect to broker at localhost.localdomain:9092: Connection refused
%3|1429923690.024|ERROR|rdkafka#producer-0| localhost:9092/bootstrap: Failed to connect to broker at localhost.localdomain:9092: Connection refused
%7|1429923690.024|STATE|rdkafka#producer-0| localhost:9092/bootstrap: Broker changed state INIT -> DOWN
%3|1429923690.024|ERROR|rdkafka#producer-0| 1/1 brokers are down
%7|1429923690.024|BUFQ|rdkafka#producer-0| localhost:9092/bootstrap: Purging bufq with 0 buffers
%7|1429923691.024|CONNECT|rdkafka#producer-0| localhost:9092/bootstrap: broker in state DOWN connecting
%7|1429923691.024|CONNECT|rdkafka#producer-0| localhost:9092/bootstrap: couldn't connect to ipv4#127.0.0.1:9092: Connection refused
%7|1429923691.024|BROKERFAIL|rdkafka#producer-0| localhost:9092/bootstrap: failed: err: Local: Communication failure with broker: (errno: Connection refused)
%7|1429923691.024|BUFQ|rdkafka#producer-0| localhost:9092/bootstrap: Purging bufq with 0 buffers
%7|1429923692.023|TIMEOUT|rdkafka#producer-0| 6 message(s) from 1 toppar(s) timed out
%7|1429923692.024|CONNECT|rdkafka#producer-0| localhost:9092/bootstrap: broker in state DOWN connecting
%7|1429923692.024|CONNECT|rdkafka#producer-0| localhost:9092/bootstrap: couldn't connect to ipv4#127.0.0.1:9092: Connection refused
%7|1429923692.024|BROKERFAIL|rdkafka#producer-0| localhost:9092/bootstrap: failed: err: Local: Communication failure with broker: (errno: Connection refused)
%7|1429923692.024|BUFQ|rdkafka#producer-0| localhost:9092/bootstrap: Purging bufq with 0 buffers
%7|1429923692.026|DESTROY|rdkafka#producer-0| Terminating instance
%7|1429923692.026|BROKERFAIL|rdkafka#producer-0| localhost:9092/bootstrap: failed: err: Local: Broker handle destroyed: (errno: Interrupted system call)
%7|1429923692.026|BUFQ|rdkafka#producer-0| localhost:9092/bootstrap: Purging bufq with 0 buffers
Exit Code: 0
I was expecting to see a non-zero exit code.
I think I asked this before, but I don't remember the answer.
When consuming from multiple partitions, what does the -o <offset>
flag do? Offsets are for particular partitions. Which partition's offset is -o
referring to?
Hi.
Is it possible to build a kafkacat for kafka 0.7 with reduced functionality?
Hi, i new in kafka, after I installed librdkafka, then run an example file, i have the some issues when using its. When using rdkafka_example_cpp to set up producer:
rdkafka_example_cpp -P -t my_topic -p 1 -b 127.0.0.1:9092
i got these outputs:
% Created producer rdkafka#producer-0
LOG-3-FAIL: 127.0.0.1:9091/bootstrap: Failed to connect to broker at new-host:9091: Connection refused
and when using rdkafka_example_cpp to set up consumer:
rdkafka_example_cpp -C -t my_topic -p 3 -b 127.0.0.1:9093
a got this these outputs :
% Created consumer rdkafka#consumer-0
LOG-3-FAIL: 127.0.0.1:9093/bootstrap: Failed to connect to broker at new-host:9093: Connection refused
ERROR (Local: Broker transport failure): 127.0.0.1:9093/bootstrap: Failed to connect to broker at new-host:9093: Connection refused
ERROR (Local: All broker connections are down): 1/1 brokers are down
after I was looking at the same problem here, I get an explanation that the problem is caused by the use of ipv4 or ipv6. I've checked the IP version used, and I think my problem does not lie there.is there any suggestion, what's wrong with above command? Thx.
I am using CentOS 6.5. I compiled the binary on one system then copied it out to several others. There are Kafka brokers on hadoop-dev[02,03,04] on my cluster. I get proper output, with what looks like valid info, but also some odd errors.
When I look at metadata_list() which calls metadata_print(), I start to suspect the problem comes when you're iterating over the members of the ISR. Perhaps the error comes from me having defined my broker IDs as 2,3,4 (instead of a more-expected 0,1,2?)...
I'm honestly not sure. I'll keep digging, but if you have any insight, I'd welcome any guidance. Full output follows.
:) ./kafkacat-CentOS-6.5-x86_64 -L -b hadoop-dev03
Metadata for all topics (from broker -1: hadoop-dev03:9092/bootstrap):
3 brokers:
broker 3 at hadoop-dev03:9092
broker 2 at hadoop-dev02:9092
broker 4 at hadoop-dev04:9092
3 topics:
topic "CleanedStream" with 32 partitions:
partition 23, leader 3, replicas: 3,2, isrs: 3,2
partition 17, leader 3, replicas: 3,2, isrs: 3,2
partition 8, leader 3, replicas: 3,4, isrs: 3,4
partition 26, leader 3, replicas: 3,4, isrs: 3,4
partition 11, leader 3, replicas: 3,2, isrs: 3,2
partition 29, leader 3, replicas: 3,2, isrs: 3,2
partition 20, leader 3, replicas: 3,4, isrs: 3,4
partition 2, leader 3, replicas: 3,4, isrs: 3,4
partition 5, leader 3, replicas: 3,2, isrs: 3,2
partition 14, leader 3, replicas: 3,4, isrs: 3,4
partition 13, leader 2, replicas: 2,3, isrs: 2,3
partition 4, leader 2, replicas: 2,4, isrs: 2,4
partition 22, leader 2, replicas: 2,4, isrs: 2,4
partition 31, leader 2, replicas: 2,3, isrs: 2,3
partition 7, leader 2, replicas: 2,3, isrs: 2,3
partition 16, leader 2, replicas: 2,4, isrs: 2,4
partition 25, leader 2, replicas: 2,3, isrs: 2,3
partition 10, leader 2, replicas: 2,4, isrs: 2,4
partition 1, leader 2, replicas: 2,3, isrs: 2,3
partition 19, leader 2, replicas: 2,3, isrs: 2,3
partition 28, leader 2, replicas: 2,4, isrs: 2,4
partition 9, leader 4, replicas: 4,3, isrs: 4,3
partition 18, leader 4, replicas: 4,2, isrs: 4,2
partition 27, leader 4, replicas: 4,3, isrs: 4,3
partition 12, leader 4, replicas: 4,2, isrs: 4,2
partition 3, leader 4, replicas: 4,3, isrs: 4,3
partition 21, leader 4, replicas: 4,3, isrs: 4,3
partition 30, leader 4, replicas: 4,2, isrs: 4,2
partition 15, leader 4, replicas: 4,3, isrs: 4,3
partition 24, leader 4, replicas: 4,2, isrs: 4,2
partition 6, leader 4, replicas: 4,2, isrs: 4,2
partition 0, leader 4, replicas: 4,2, isrs: 4,2
topic "RawStream" with 32 partitions:
partition 23, leader 3, replicas: 3,4, isrs: 3,4
partition 8, leader 3, replicas: 3,2, isrs: 3,2
partition 17, leader 3, replicas: 3,4, isrs: 3,4
partition 26, leader 3, replicas: 3,2, isrs: 3,2
partition 11, leader 3, replicas: 3,4, isrs: 3,4
partition 29, leader 3, replicas: 3,4, isrs: 3,4
partition 2, leader 3, replicas: 3,2, isrs: 3,2
partition 20, leader 3, replicas: 3,2, isrs: 3,2
partition 5, leader 3, replicas: 3,4, isrs: 3,4
partition 14, leader 3, replicas: 3,2, isrs: 3,2
partition 13, leader 2, replicas: 2,4, isrs: 2,4
partition 4, leader 2, replicas: 2,3, isrs: 2,3
partition 31, leader 2, replicas: 2,4, isrs: 2,4
partition 22, leader 2, replicas: 2,3, isrs: 2,3
partition 16, leader 2, replicas: 2,3, isrs: 2,3
partition 7, leader 2, replicas: 2,4, isrs: 2,4
partition 25, leader 2, replicas: 2,4, isrs: 2,4
partition 10, leader 2, replicas: 2,3, isrs: 2,3
partition 1, leader 2, replicas: 2,4, isrs: 2,4
partition 28, leader 2, replicas: 2,3, isrs: 2,3
partition 19, leader 2, replicas: 2,4, isrs: 2,4
partition 9, leader 4, replicas: 4,2, isrs: 4,2
partition 27, leader 4, replicas: 4,2, isrs: 4,2
partition 18, leader 4, replicas: 4,3, isrs: 4,3
partition 21, leader 4, replicas: 4,2, isrs: 4,2
partition 3, leader 4, replicas: 4,2, isrs: 4,2
partition 12, leader 4, replicas: 4,3, isrs: 4,3
partition 30, leader 4, replicas: 4,3, isrs: 4,3
partition 15, leader 4, replicas: 4,2, isrs: 4,2
partition 24, leader 4, replicas: 4,3, isrs: 4,3
partition 6, leader 4, replicas: 4,3, isrs: 4,3
partition 0, leader 4, replicas: 4,3, isrs: 4,3
topic "FirstTest" with 8 partitions:
partition 2, leader 2, replicas: 2,4, isrs: 2,4
partition 5, leader 2, replicas: 2,3, isrs: 2,3
partition 4, leader 4, replicas: 4,2, isrs: 4,2
partition 7, leader 4, replicas: 4,3, isrs: 4,3
partition 1, leader 4, replicas: 4,3, isrs: 4,3
partition 3, leader 3, replicas: 3,4, isrs: 3,4
partition 6, leader 3, replicas: 3,2, isrs: 3,2
partition 0, leader 3, replicas: 3,2, isrs: 3,2
%3|1418754255.111|FAIL|rdkafka#producer-0| hadoop-dev02:9092/2: Failed to connect to broker at hadoop-dev02:9092: Interrupted system call
%3|1418754255.111|FAIL|rdkafka#producer-0| hadoop-dev03:9092/3: Failed to connect to broker at hadoop-dev03:9092: Interrupted system call
%3|1418754255.111|ERROR|rdkafka#producer-0| hadoop-dev02:9092/2: Failed to connect to broker at hadoop-dev02:9092: Interrupted system call
%3|1418754255.111|ERROR|rdkafka#producer-0| hadoop-dev03:9092/3: Failed to connect to broker at hadoop-dev03:9092: Interrupted system call
%3|1418754255.111|FAIL|rdkafka#producer-0| hadoop-dev04:9092/4: Failed to connect to broker at hadoop-dev04:9092: Interrupted system call
%3|1418754255.111|ERROR|rdkafka#producer-0| hadoop-dev04:9092/4: Failed to connect to broker at hadoop-dev04:9092: Interrupted system call
(This might be a librdkafka issue, I'm not sure.)
When running
kafkacat -C -b mybroker.example.org -t mytopic -e -o stored -c 1000 -X topic.auto.commit.interval.ms=100 -X topic.offset.store.path=/tmp/kafka-offsets
for the first time, if /tmp/kafka-offsets
does not yet exist, kafkacat seems to create this as a file and store a single integer in this file. I'm not sure if this is a real partition offset, but kafka brokers log plenty of OffsetOutOfRangeExceptions for the number in this file when kafkacat runs.
Perhaps librdkafka should always create topic.offset.store.path
as a directory if it does not yet exist?
kafkacat needs a regression test framework and some tests.
Once kafkacat -C
is started it only prints messages from partitions which existed at time it was started. Newly added partitions are not included in kafkacat output(although, there is a warning that partition count changed).
It might be helpful if kafkacat could add new partitions to the output without restart.
Might be a problem more related to broker.
When I try to get the last msg. in queue using -o -1 option, I get the following:
$ kafkacat -C -b localhost -t my_topic -o -1 -c 1 -v
% Fatal error at consume_cb:345:
% ERROR: Topic my_topic [1] error: Broker: Offset out of range
Using kafkacat & librdkafka from GitHub + kafka v2.9.2-0.8.1.1
Hi,
It seems that it takes at least 1 second for kafkacat producer to complete.
time sh -c "date | ~/dev/kafkacat/kafkacat -P -t _temp -p 0 -c 1 -b 192.168.86.3,192.168.86.5"
real 0m1.068s
user 0m0.006s
sys 0m0.039s
Is there any timeout setting that would allow kafkacat to exit immediately upon producing all messages?
Upon consuming a message which does not have a message part (key only), kafkacat fails with error:
% ERROR: Write error for message of 0 bytes at offset 18): Undefined error: 0
On a relevant note, kafka-console-consumer.sh
shows no error, expectedly printing null
as value
We observe the following issue:
After topic has been re-created, kafkacat is no longer able to move past a certain point. No error messages are being displayed.
When run with -dmsg
flag, the following line is seen:
[9] MessageSet size 1048576, error "Success", MaxOffset 1069260
And MaxOffset
keeps increasing, since new data is being added to the topic.
If we run kafkacat past the problematic offset (however, just next offset is not enough), e.g. with -o N+1000000
it picks up working fine.
kafka.tools.DumpLogSegments
does not show any errors, saying is valid
for each message, even for the problematic offset.
Any ideas, how to debug, fix?
requires rdkafka 0.8.4 (which hasnt been released yet).
Would be great to have the ability to see the partition id of the messages, similar to offsets printed with -O
flag.
Better yet, since offsets belong to partitions it would make sense to prepend them with partition id, e.g. 0:12345
Just use
kafkacat -b localhost -t test -C
and get 100% cpu. How to debug?
While using kafkacat as a consumer or as a producer, it would be good if it's possible to have it quit with a non-zero exit status if there are broker related errors. Currently, in my testing as a consumer if there are broker connectivity errors after kafkacat starts consuming, it prints logs to stderr and appears to repeatedly retry.
Is this perhaps already possible to do?
Hi!
Would it make sense for kafkacat to be able to create/delete topics?
This is probably a brew issue somehow, but when I run
kafkacat -L -b 10.223.25.68:9092
Against my 5 broker cluster, I get
Metadata for all topics (from broker -1: 10.223.25.68:9092/bootstrap): 5 brokers: broker 5 at 10.223.25.69:9092 broker 1 at 10.223.25.65:9092 broker 2 at 10.223.25.66:9092 broker 3 at 10.223.25.67:9092 broker 4 at 10.223.25.68:9092 58 topics: ... much topic metadata elided ... %3|1445369634.430|FAIL|rdkafka#producer-0| 10.223.25.69:9092/5: Failed to connect to broker at 10.223.25.69:9092: Undefined error: 0 %3|1445369634.430|FAIL|rdkafka#producer-0| 10.223.25.68:9092/4: Failed to connect to broker at 10.223.25.68:9092: Undefined error: 0 %3|1445369634.430|ERROR|rdkafka#producer-0| 10.223.25.69:9092/5: Failed to connect to broker at 10.223.25.69:9092: Undefined error: 0 %3|1445369634.430|ERROR|rdkafka#producer-0| 10.223.25.68:9092/4: Failed to connect to broker at 10.223.25.68:9092: Undefined error: 0 %3|1445369634.430|FAIL|rdkafka#producer-0| 10.223.25.67:9092/3: Failed to connect to broker at 10.223.25.67:9092: Undefined error: 0 %3|1445369634.430|FAIL|rdkafka#producer-0| 10.223.25.66:9092/2: Failed to connect to broker at 10.223.25.66:9092: Undefined error: 0 %3|1445369634.430|ERROR|rdkafka#producer-0| 10.223.25.67:9092/3: Failed to connect to broker at 10.223.25.67:9092: Undefined error: 0 %3|1445369634.430|ERROR|rdkafka#producer-0| 10.223.25.66:9092/2: Failed to connect to broker at 10.223.25.66:9092: Undefined error: 0 %3|1445369634.430|FAIL|rdkafka#producer-0| 10.223.25.65:9092/1: Failed to connect to broker at 10.223.25.65:9092: Undefined error: 0 %3|1445369634.430|ERROR|rdkafka#producer-0| 10.223.25.65:9092/1: Failed to connect to broker at 10.223.25.65:9092: Undefined error: 0
Those 'Undefined errors' at the end are weird. It's clearly contacting the brokers, as it's getting the metadata. When I run kafkacat on ubuntu against the same broker. Everything is printed out identically, except the errors aren't printed on the end.
Hello - thank you for this awesome bit of code and also for librdkafka. I've set everything up on my macbook without a single issue. However, when attempting to get this running on Linux, I have hit a couple snags. I am currently stuck on this error when I try to run ./bootstrap for the kafkacata install:
Now type 'make' to build
gcc -MD -MP -Itmp-bootstrap/usr/local/include -Itmp-bootstrap/usr/local/include -g -O2 -Wall -Werror -Wfloat-equal -Wpointer-arith -c kafkacat.c -o kafkacat.o
kafkacat.c: In function ‘produce_file’:
kafkacat.c:198:9: error: format ‘%zd’ expects argument of type ‘signed size_t’, but argument 4 has type ‘__off_t’ [-Werror=format]
cc1: all warnings being treated as errors
make: *** [kafkacat.o] Error 1
This is a fresh install of debian on a VM. Please let me know if you have any suggestions for getting past this point.
Thanks again for the awesome software.
No LSB modules are available.
Distributor ID: Ubuntu
Description: Ubuntu 14.04.2 LTS
Release: 14.04
Codename: trusty
sudo ldconfig -p | grep yajl
libyajl.so.2 (libc6,x86-64) => /usr/lib/x86_64-linux-gnu/libyajl.so.2 libyajl.so (libc6,x86-64) => /usr/lib/x86_64-linux-gnu/libyajl.so
Hello,
I am attempting to use Kafka in the loadbalancing fashion. As Kafka documentation says:
If all the consumer instances have the same consumer group, then this works just like a traditional queue balancing load over the consumers.
So what I did is:
group.id
First I created the topic:
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic beta
Then subscribe two consumers to it:
kafkacat -C -b $(boot2docker ip):9092 -t beta -X group.id=mygroupid
And produce a message:
date | kafkacat -P -b $(boot2docker ip):9092 -t beta
I expect to only one of the consumers to receive the message, but both do.
On the other hand, if I run the consumers using kafka-console-consumer.sh
, the message is only consumed once:
echo "group.id=mygroupid" > /consumer.beta.properties
$KAFKA_HOME/bin/kafka-console-consumer.sh \
--zookeeper localhost:2181 \
--topic beta \
--consumer.config /consumer.beta.properties
What's wrong?
(I have also asked this question on Stackoverflow)
I ran the most recent bootstrap.sh. It errored out with:
Build of libyajl SUCCEEDED!
Building kafkacat
Using -L/usr/local/lib -lpthread -lz -lcrypto -lssl -lsasl2 for rdkafka
Package yajl was not found in the pkg-config search path.
Perhaps you should add the directory containing `yajl.pc'
to the PKG_CONFIG_PATH environment variable
No package 'yajl' found
grep: tmp-bootstrap/usr/local/lib/pkgconfig/yajl.pc: No such file or directory
Using for yajl
./bootstrap.sh: line 97: ./configure: No such file or directory
i use the compact retention policy on some topics.
i need to send to kafka, a command to add a key and no message (null payload), to create a tombstone.
i've tried this command :
kafkacat -b -t -p 0 -K# -c 1 -P
with input for example :
11#
=> does it creates a tombstone?
i've seen the -Z flag, but it seems to be for a null key and not a null value .
if you can clarify this in your documentaiton, it will help many topic compact users.
best regards,
Charles.
This probably should be filed against both librdkafka and kafkacat. This is an enhancement request.
[http://kafka.apache.org/08/documentation.html|(http://kafka.apache.org/08/documentation.html)]
Documentation describes.
zookeeper.connect
Zookeeper also allows you to add a "chroot" path which will make all kafka data for this cluster appear under a particular path. This is a way to setup multiple Kafka clusters or other applications on the same zookeeper cluster. To do this give a connection string in the form hostname1:port1,hostname2:port2,hostname3:port3/chroot/path which would put all this cluster's data under the path /chroot/path. Note that you must create this path yourself prior to starting the broker and consumers must use the same connection string.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.