Giter Site home page Giter Site logo

kcat's Issues

kafkacat will discard some messages when the message has the char '\n'

Hi,I have used the kafkacat for a long time,but I find a error recently.
I hava some example messages,see below:
0xxxxxxxxxxxx
1xxxx\nxxxxx\n
2xxxx\nxxxxx\n
3xxxx\nxxxxx\n

when I produce the messages with kafkacat and then I consume the messages with kafkacat,I find some messages have been discarded.what's you advice here?Thanks.

mktemp usage not backward compatible with older Linux distributions

Trying to run configure on Ubuntu 10.04 results in errors such as:

checking for PIC (by compile)...mktemp: too few X's in template `_mkltmpXXXX.c'

On this platform (and many older Linux distributions), mktemp(1) requires the template "X" characters to be at the end of the filename.

kafkacat topic info doesn't match that of kafka-topics

Hiya. I'm running into something where the leader/follower/ISR reported by kafkacat doesn't match that reported by kafka-topics.

Any ideas? Even more confusingly: I'm logged in to broker 1002/kafka900f5013 in two shells right now. For the past hour or so kafkacat was reporting the correct information (partition 0, leader 1002, replicas: 1002, isrs: 1002) in one of the shells, and incorrect information in the other shell (partition 0, leader 1004, replicas: 1004, isrs: 1004). Consistently and repeatedly. It was just while I was typing up this git issue that they all started reporting the same incorrect information as the rest of the nodes.

So Kafkacat thinks the partition is being served by broker 1004:

PRODUCTION ubuntu@bastion-0:~$ dsh -g kafka-production -c -M -- "kafkacat -L -b localhost:9092 -t hound-prod.retriever.mutation"
kafka-1bc7c4e2: Metadata for hound-prod.retriever.mutation (from broker -1: localhost:9092/bootstrap):
kafka-1bc7c4e2:  4 brokers:
kafka-1bc7c4e2:   broker 1003 at ip-10-0-246-113.ec2.internal:9092
kafka-1bc7c4e2:   broker 1004 at ip-10-0-190-45.ec2.internal:9092
kafka-1bc7c4e2:   broker 1001 at ip-10-0-148-52.ec2.internal:9092
kafka-1bc7c4e2:   broker 1002 at ip-10-0-207-186.ec2.internal:9092
kafka-1bc7c4e2:  1 topics:
kafka-1bc7c4e2:   topic "hound-prod.retriever.mutation" with 1 partitions:
kafka-1bc7c4e2:     partition 0, leader 1004, replicas: 1004, isrs: 1004
kafka-d4b7674e: Metadata for hound-prod.retriever.mutation (from broker -1: localhost:9092/bootstrap):
kafka-d4b7674e:  4 brokers:
kafka-d4b7674e:   broker 1003 at ip-10-0-246-113.ec2.internal:9092
kafka-d4b7674e:   broker 1004 at ip-10-0-190-45.ec2.internal:9092
kafka-d4b7674e:   broker 1001 at ip-10-0-148-52.ec2.internal:9092
kafka-d4b7674e:   broker 1002 at ip-10-0-207-186.ec2.internal:9092
kafka-d4b7674e:  1 topics:
kafka-d4b7674e:   topic "hound-prod.retriever.mutation" with 1 partitions:
kafka-d4b7674e:     partition 0, leader 1004, replicas: 1004, isrs: 1004
kafka-900f5013: Metadata for hound-prod.retriever.mutation (from broker -1: localhost:9092/bootstrap):
kafka-900f5013:  4 brokers:
kafka-900f5013:   broker 1003 at ip-10-0-246-113.ec2.internal:9092
kafka-900f5013:   broker 1004 at ip-10-0-190-45.ec2.internal:9092
kafka-900f5013:   broker 1001 at ip-10-0-148-52.ec2.internal:9092
kafka-900f5013:   broker 1002 at ip-10-0-207-186.ec2.internal:9092
kafka-900f5013:  1 topics:
kafka-900f5013:   topic "hound-prod.retriever.mutation" with 1 partitions:
kafka-900f5013:     partition 0, leader 1004, replicas: 1004, isrs: 1004
kafka-88e6bc0c: Metadata for hound-prod.retriever.mutation (from broker -1: localhost:9092/bootstrap):
kafka-88e6bc0c:  4 brokers:
kafka-88e6bc0c:   broker 1003 at ip-10-0-246-113.ec2.internal:9092
kafka-88e6bc0c:   broker 1004 at ip-10-0-190-45.ec2.internal:9092
kafka-88e6bc0c:   broker 1001 at ip-10-0-148-52.ec2.internal:9092
kafka-88e6bc0c:   broker 1002 at ip-10-0-207-186.ec2.internal:9092
kafka-88e6bc0c:  1 topics:
kafka-88e6bc0c:   topic "hound-prod.retriever.mutation" with 1 partitions:
kafka-88e6bc0c:     partition 0, leader 1004, replicas: 1004, isrs: 1004

And kafka-topics thinks the partition is being served by broker 1002.

```PRODUCTION ubuntu@bastion-0:~$ dsh -g kafka-production -c -M -- "kafka-topics --zookeeper zk1:2181,zk2:2181,zk3:2181/kafka --topic hound-prod.retriever-mutation --describe"
kafka-d4b7674e: Topic:hound-prod.retriever-mutation     PartitionCount:1        ReplicationFactor:1     Configs:
kafka-d4b7674e:         Topic: hound-prod.retriever-mutation    Partition: 0    Leader: 1002    Replicas: 1002  Isr: 1002
kafka-1bc7c4e2: Topic:hound-prod.retriever-mutation     PartitionCount:1        ReplicationFactor:1     Configs:
kafka-1bc7c4e2:         Topic: hound-prod.retriever-mutation    Partition: 0    Leader: 1002    Replicas: 1002  Isr: 1002
kafka-88e6bc0c: Topic:hound-prod.retriever-mutation     PartitionCount:1        ReplicationFactor:1     Configs:
kafka-88e6bc0c:         Topic: hound-prod.retriever-mutation    Partition: 0    Leader: 1002    Replicas: 1002  Isr: 1002
kafka-900f5013: Topic:hound-prod.retriever-mutation     PartitionCount:1        ReplicationFactor:1     Configs:
kafka-900f5013:         Topic: hound-prod.retriever-mutation    Partition: 0    Leader: 1002    Replicas: 1002  Isr: 1002
PRODUCTION ubuntu@bastion-0:~$ 

Any ideas?

Just looking at the data sizes on disk, it's definitely writing to broker 1002, not 1004.

Persistent metadata cache

From librdkafka configuration guide:

The metadata is automatically refreshed on error and connect.

On each kafkacat run, librdkafka loads the Kafka cluster metadata. This has a significant inbound traffic impact in situations where kafkacat is triggered frequently on a cluster with many highly partitioned topics.

Is there a way to cache the metadata in between kafkacat runs so that each kafkacat run doesn't always load a significant amount of metadata on start?

Native win32 build

Build with MSVC.
Requires that the oct15 branch of librdkafka is merged to master.

Should provide prebuilt binaries for windows, possibly with installer.

In producer mode, it sends the messages but throws errors while starting up

Though app writes messages to Kafka, it does throw the following erorr messages at startup. Not sure what exactly is the issue but any inputs will help.

%3|1418300191.571|FAIL|rdkafka#producer-0| localhost:9092/bootstrap: Failed to connect to broker at [localhost]:9092: Connection refused
%3|1418300191.571|ERROR|rdkafka#producer-0| localhost:9092/bootstrap: Failed to connect to broker at [localhost]:9092: Connection refused
%3|1418300191.571|ERROR|rdkafka#producer-0| 1/1 brokers are down

Settings to stream without buffering

I would like to use kafkacat to stream from a topic without buffering but I can't figure out the right settings to make that happen. To illustrate using kafka_2.8.0-0.8.0:

ZOOKEEPER=192.168.33.10:2181  # whatever it may be
BROKERS=192.168.33.10:6667    # whatever it may be

# create the topic
kafka-create-topic.sh --zookeeper $ZOOKEEPER --replica 1 --partition 1 --topic example

# create a stream
ruby -e "STDOUT.sync=true; 10000.times {|i| puts i; sleep 0.2}" |
kafka-console-producer.sh --broker-list $BROKERS --sync --topic example

If you consume the stream like this you'll see output tick along smoothly with no apparent delay.

kafka-console-consumer.sh --zookeeper $ZOOKEEPER --topic example

If you consume the stream with kafkacat in these variations the outputs jerk forward then pause for a second, then jerk forward again. All the output is there but it isn't streaming smoothly.

./kafkacat -t example -b $BROKERS
./kafkacat -t example -b $BROKERS -u

These variations make the outputs tick along in regular 1s intervals, but again the output isn't smooth. There appears to be some kind of fetch delay.

./kafkacat -t example -b $BROKERS -u -X fetch.wa.max.ms=1
./kafkacat -t example -b $BROKERS -u -X fetch.wait.max.ms=1 -X fetch.error.backoff.ms=1

I've just been kinda guessing at options with no success. Is there a way to get kafkacat to smoothly stream the topic a-la kafka-console-consumer.sh?

from last changes install is not linking shared lib

this is what I get now when running kafkcat after doing ./bootstrap

kafkacat: error while loading shared libraries: librdkafka.so.1: cannot open shared object file: No such file or directory

was working before recent changes

kafkacat metadata does not display topic metadata

when i execute kafka-topics --describe --zookeeper localhost --topic mytopic, it returnsPartitionCount:10 ReplicationFactor:1 Configs:cleanup.policy=compact .
it also display other details about leader and replicas:
Topic: myTopic Partition: 0 Leader: 1 Replicas: 1 Isr: 1

it would be useful if kafkacat return this kind of details, especially the cleanup policy in case of log compaction usage.

best regards,

Charles.

Protocol parse failure (incorrect broker.version)

After upgrading to kafkacat 1.3.0 and librdkafka 0.9.1 I get the following error in consumer mode:

%4|1467207129.009|PROTOERR|rdkafka#consumer-1| <ip>:9092/1: Protocol parse failure at rd_kafka_fetch_reply_handle:3748 (incorrect broker.version.fallback?)
%4|1467207129.009|PROTOERR|rdkafka#consumer-1| <ip>:9092/3: Protocol parse failure at rd_kafka_fetch_reply_handle:3748 (incorrect broker.version.fallback?)
%4|1467207129.009|PROTOERR|rdkafka#consumer-1| <ip>:9092/3: expected 36681250 bytes > 150 remaining bytes
%4|1467207129.009|PROTOERR|rdkafka#consumer-1| <ip>:9092/2: Protocol parse failure at rd_kafka_fetch_reply_handle:3748 (incorrect broker.version.fallback?)

Any idea what this means?

Losing messages on failover test

I have 2 kafka brokers running in my test environment. And I am running a kafkacat as producer and another one as consumer. The test topic is setup to have replication-factor of 2.

In my test, I killed one of the broker while cating multiple files into the kafkacat producer. Sometimes, the consumer got all the messages, but it missed 5-10% of the messages.

Is there any producer specific parameters that I missed? I am hoping at least kafkacat returns an exit code of non-zero when messages were lost.

I think the problem probably is in librdkafka. When there is missing events, I dont even see dr_msg_cb in kafkacat.c being called. So feel like librdkafka did not notify kafkacat about the issue.

`-e` flag ignored in case of many active partitions

Apparently, all partitions must timeout simultaneously in order for this flag to work. If topic has many active partitions the chance of this is low and kafkacat continues to work indefinitely.

Looks like if this flag is set kafkacat should to pin down on start latest offsets for each of partition and exit once all offsets are reached.

Offset flag ignored

It looks like -o flag applies only up to kafka storage blocks, or some other caching is going on, e.g.

$ for i in {01..05}; do echo -n "$i " ; sh -c "~/kafkacat/kafkacat -C -b localhost:9092 -t topic -K\' -o 98564$i -p 0 -c 3 -u | md5sum" ; done
01 d1e6071308960d3d04c9ff911ef32516  -
02 d1e6071308960d3d04c9ff911ef32516  -

$ for i in {01..05}; do echo -n "$i " ; sh -c "~/kafkacat/kafkacat -C -b localhost:9092 -t topic -K\' -o 98574$i -p 0 -c 3 -u | md5sum" ; done
01 372ec3fd9d054638ff817f73a4c50007  -
02 372ec3fd9d054638ff817f73a4c50007  -

Messages were loaded in kafka in batches with snappy compression, latest kafkacat used.

New release ?

Kafka new consumer support would be great, but the released version is only 1.2.0
Is it possible to release a beta or a release candidate ?

stdout buffer causes messages not to be delivered on arrival

By default, stdout is buffered. If writing out to a terminal, it will flush after each newline. However, if the stdout is redirected, it won't flush on each newline. It can be flushed manually or the buffer can be disabled.

http://stackoverflow.com/questions/1716296/why-does-printf-not-flush-after-the-call-unless-a-newline-is-in-the-format-strin

Something like this could fix the problem:

int main (int argc, char **argv) {
        setbuf(stdout, NULL);

Or the buffer can be flushed manually after each write.

I made this an issue rather than a PR because I'm not sure of the best way to handle it.

kafkacat showing different ISR list vs other tools

For one of my topics, kafkacat is showing a different list of isrs than both the kafka-topics.sh script and kafkat (the AirBnb tool). I'm wondering if there is a bug in kafkacat.

See below, partition 1 shows isrs = 4 when using kafkacat, but isrs = 4,2,3 when using the others.

jcheng:~ jcheng$ docker run --rm -it kafkacat kafkacat -L -b 10.100.222.65 -t engrcloud.core.kafka.install-test
Metadata for engrcloud.core.kafka.install-test (from broker -1: 10.100.222.65:9092/bootstrap):
 5 brokers:
  broker 5 at 10.100.222.69:9092
  broker 1 at 10.100.222.65:9092
  broker 2 at 10.100.222.66:9092
  broker 3 at 10.100.222.67:9092
  broker 4 at 10.100.222.68:9092
 1 topics:
  topic "engrcloud.core.kafka.install-test" with 3 partitions:
    partition 2, leader 5, replicas: 5,3,4, isrs: 4,5,3
    partition 1, leader 4, replicas: 4,2,3, isrs: 4
    partition 0, leader 3, replicas: 3,1,2, isrs: 1,2,3

vs

jcheng:~ jcheng$ docker run --rm -it wurstmeister/kafka /opt/kafka_2.10-0.8.2.1/bin/kafka-topics.sh --zookeeper=10.100.222.69:2181 --describe --topic engrcloud.core.kafka.install-test
Topic:engrcloud.core.kafka.install-test PartitionCount:3    ReplicationFactor:3 Configs:
    Topic: engrcloud.core.kafka.install-test    Partition: 0    Leader: 3   Replicas: 3,1,2 Isr: 1,2,3
    Topic: engrcloud.core.kafka.install-test    Partition: 1    Leader: 4   Replicas: 4,2,3 Isr: 4,2,3
    Topic: engrcloud.core.kafka.install-test    Partition: 2    Leader: 5   Replicas: 5,3,4 Isr: 4,5,3

vs

jcheng:~ jcheng$ docker run -it --rm kafkat sh -c 'echo { \"zk_path\": \"10.100.222.65:2181\" } > /etc/kafkatcfg; kafkat partitions engrcloud.core.kafka.install-test'
Topic       Partition   Leader      Replicas                        ISRs
engrcloud.core.kafka.install-test0      3       [3, 1, 2]                       [1, 2, 3]
engrcloud.core.kafka.install-test1      4       [4, 2, 3]                       [4, 2, 3]
engrcloud.core.kafka.install-test2      5       [5, 3, 4]                       [4, 5, 3]

Is there any debugging that I can supply?

Kafkacat loops and consume all bandwidth

Hello,

When kafkacat tries to fetch a message bigger than librdkafka fetch.message.max.bytes, kafkacat loops forever and consumes all bandwidth and cpu.

Instead it should throttle / print a message / return?

kafkacat -c -NUM semantics edge case

Here is the use-case. A topic with say 150 messages.

$kafkacat -b broker -C -q -o -1000 -e -c 1000 -t topic | wc -l
0

$kafkacat -b broker -C -q -o -100 -e -c 1000 -t topic | wc -l
100

What this shows is that if the negative offset is larger than available it doesnt return anything. Semantically I believe this should just return from the beginning then.

On OS X, bootstrap.sh doesn't yield a statically linked binary

On OS X 10.10.2, bootstrap.sh doesn't yield a statically linked binary, because libyajl's static library does not match the expected naming conventions.

In bootstrap.sh:
export STATIC_LIB_yajl="tmp-bootstrap/usr/local/lib/libyajl.a"

However, in the filesystem:

$ ls -1 tmp-bootstrap/usr/local/lib/libyajl*
tmp-bootstrap/usr/local/lib/libyajl.2.1.1.dylib
tmp-bootstrap/usr/local/lib/libyajl.2.dylib
tmp-bootstrap/usr/local/lib/libyajl.dylib
tmp-bootstrap/usr/local/lib/libyajl_s.a

Notice that the libyajl static library is libyajl_s.a, not libyajl.a.

After compilation, you can see that librdkafka was statically linked in, but not libyajl.

$ otool -L kafkacat
kafkacat:
    /usr/lib/libSystem.B.dylib (compatibility version 1.0.0, current version 1213.0.0)
    /usr/lib/libz.1.dylib (compatibility version 1.0.0, current version 1.2.5)
    /usr/local/lib/libyajl.2.dylib (compatibility version 2.0.0, current version 2.1.1)

I worked around it by manually running gcc, and replacing "-lyajl" with "tmp-bootstrap/usr/local/lib/libyajl_s.a"

$ gcc -Itmp-bootstrap/usr/local/include -g -O2 -Wall -Werror -Wfloat-equal -Wpointer-arith -Itmp-bootstrap/usr/local/include -g -O2 -Wall -Werror -Wfloat-equal -Wpointer-arith -Ltmp-bootstrap/usr/local/lib -Ltmp-bootstrap/usr/local/lib kafkacat.o format.o json.o -o kafkacat tmp-bootstrap/usr/local/lib/librdkafka.a -lpthread -lz tmp-bootstrap/usr/local/lib/libyajl_s.a
$ otool -L kafkacat
kafkacat:
    /usr/lib/libSystem.B.dylib (compatibility version 1.0.0, current version 1213.0.0)
    /usr/lib/libz.1.dylib (compatibility version 1.0.0, current version 1.2.5)
$

Is -J option supported?

(version 1.2.0) when I run:
$ kafkacat -b mybroker -t syslog -J

It gives me:
kafkacat: illegal option -- J

-o not working or I am not using it right

Magnus, great tool, really love it!!!

I am trying to read from a specific offset on the consumer side but it seems to always just display everything

kafkacat -b xyz -t abc -p 0 -o 8

also is there a way to do something like from x offset until y offset and not just from x till end?

kafkacat reports exit code of 0 in some error cases

  1. When bad hostname is used
#/bin/sh
cat /tmp/test | kafkacat-CentOS-6.5-x86_64 -P -b badhost -t RawEvents -p 1 -X socket.timeout.ms=1000 -X topic.message.timeout.ms=1000 -X debug=all
echo "Exit Code:" $?

The output is

%7|1429923085.924|BROKER|rdkafka#producer-0| badhost:9092/bootstrap: Added new broker with NodeId -1
%7|1429923085.924|BRKMAIN|rdkafka#producer-0| badhost:9092/bootstrap: Enter main broker thread
%7|1429923085.924|TOPIC|rdkafka#producer-0| New local topic: RawEvents
%7|1429923085.924|CONNECT|rdkafka#producer-0| badhost:9092/bootstrap: broker in state INIT connecting
%3|1429923085.929|ERROR|rdkafka#producer-0| Failed to resolve 'badhost:9092': Name or service not known
%3|1429923085.929|GETADDR|rdkafka#producer-0| badhost:9092/bootstrap: Failed to resolve 'badhost:9092': Name or service not known
%7|1429923086.929|CONNECT|rdkafka#producer-0| badhost:9092/bootstrap: broker in state INIT connecting
%3|1429923086.929|ERROR|rdkafka#producer-0| Failed to resolve 'badhost:9092': Name or service not known
%3|1429923086.929|GETADDR|rdkafka#producer-0| badhost:9092/bootstrap: Failed to resolve 'badhost:9092': Name or service not known
%7|1429923087.924|TIMEOUT|rdkafka#producer-0| 6 message(s) from 1 toppar(s) timed out
%7|1429923087.929|CONNECT|rdkafka#producer-0| badhost:9092/bootstrap: broker in state INIT connecting
%3|1429923087.929|ERROR|rdkafka#producer-0| Failed to resolve 'badhost:9092': Name or service not known
%3|1429923087.929|GETADDR|rdkafka#producer-0| badhost:9092/bootstrap: Failed to resolve 'badhost:9092': Name or service not known
%7|1429923087.934|DESTROY|rdkafka#producer-0| Terminating instance
%7|1429923087.934|BROKERFAIL|rdkafka#producer-0| badhost:9092/bootstrap: failed: err: Local: Broker handle destroyed: (errno: Interrupted system call)
%7|1429923087.934|STATE|rdkafka#producer-0| badhost:9092/bootstrap: Broker changed state INIT -> DOWN
%7|1429923087.935|BUFQ|rdkafka#producer-0| badhost:9092/bootstrap: Purging bufq with 0 buffers
Exit Code: 0

I was expecting to see a non-zero exit code.

  1. When correct hostname is used but the broker is not running
#/bin/sh
cat /tmp/test | kafkacat-CentOS-6.5-x86_64 -P -b localhost -t test-topic -p 1 -X socket.timeout.ms=1000 -X topic.message.timeout.ms=1000 -X debug=all
echo "Exit Code:" $?

The output is

%7|1429923690.023|BROKER|rdkafka#producer-0| localhost:9092/bootstrap: Added new broker with NodeId -1
%7|1429923690.023|BRKMAIN|rdkafka#producer-0| localhost:9092/bootstrap: Enter main broker thread
%7|1429923690.023|TOPIC|rdkafka#producer-0| New local topic: test-topic
%7|1429923690.023|CONNECT|rdkafka#producer-0| localhost:9092/bootstrap: broker in state INIT connecting
%7|1429923690.024|CONNECT|rdkafka#producer-0| localhost:9092/bootstrap: couldn't connect to ipv4#127.0.0.1:9092: Connection refused
%7|1429923690.024|BROKERFAIL|rdkafka#producer-0| localhost:9092/bootstrap: failed: err: Local: Broker transport failure: (errno: Connection refused)
%3|1429923690.024|FAIL|rdkafka#producer-0| localhost:9092/bootstrap: Failed to connect to broker at localhost.localdomain:9092: Connection refused
%3|1429923690.024|ERROR|rdkafka#producer-0| localhost:9092/bootstrap: Failed to connect to broker at localhost.localdomain:9092: Connection refused
%7|1429923690.024|STATE|rdkafka#producer-0| localhost:9092/bootstrap: Broker changed state INIT -> DOWN
%3|1429923690.024|ERROR|rdkafka#producer-0| 1/1 brokers are down
%7|1429923690.024|BUFQ|rdkafka#producer-0| localhost:9092/bootstrap: Purging bufq with 0 buffers
%7|1429923691.024|CONNECT|rdkafka#producer-0| localhost:9092/bootstrap: broker in state DOWN connecting
%7|1429923691.024|CONNECT|rdkafka#producer-0| localhost:9092/bootstrap: couldn't connect to ipv4#127.0.0.1:9092: Connection refused
%7|1429923691.024|BROKERFAIL|rdkafka#producer-0| localhost:9092/bootstrap: failed: err: Local: Communication failure with broker: (errno: Connection refused)
%7|1429923691.024|BUFQ|rdkafka#producer-0| localhost:9092/bootstrap: Purging bufq with 0 buffers
%7|1429923692.023|TIMEOUT|rdkafka#producer-0| 6 message(s) from 1 toppar(s) timed out
%7|1429923692.024|CONNECT|rdkafka#producer-0| localhost:9092/bootstrap: broker in state DOWN connecting
%7|1429923692.024|CONNECT|rdkafka#producer-0| localhost:9092/bootstrap: couldn't connect to ipv4#127.0.0.1:9092: Connection refused
%7|1429923692.024|BROKERFAIL|rdkafka#producer-0| localhost:9092/bootstrap: failed: err: Local: Communication failure with broker: (errno: Connection refused)
%7|1429923692.024|BUFQ|rdkafka#producer-0| localhost:9092/bootstrap: Purging bufq with 0 buffers
%7|1429923692.026|DESTROY|rdkafka#producer-0| Terminating instance
%7|1429923692.026|BROKERFAIL|rdkafka#producer-0| localhost:9092/bootstrap: failed: err: Local: Broker handle destroyed: (errno: Interrupted system call)
%7|1429923692.026|BUFQ|rdkafka#producer-0| localhost:9092/bootstrap: Purging bufq with 0 buffers
Exit Code: 0

I was expecting to see a non-zero exit code.

-o when consuming from multiple partitions?

I think I asked this before, but I don't remember the answer.

When consuming from multiple partitions, what does the -o <offset> flag do? Offsets are for particular partitions. Which partition's offset is -o referring to?

Error message while using rdkafka_example_cpp

Hi, i new in kafka, after I installed librdkafka, then run an example file, i have the some issues when using its. When using rdkafka_example_cpp to set up producer:

rdkafka_example_cpp -P -t my_topic -p 1 -b 127.0.0.1:9092

i got these outputs:

% Created producer rdkafka#producer-0
LOG-3-FAIL: 127.0.0.1:9091/bootstrap: Failed to connect to broker at new-host:9091: Connection refused

and when using rdkafka_example_cpp to set up consumer:

rdkafka_example_cpp -C -t my_topic -p 3 -b 127.0.0.1:9093

a got this these outputs :

% Created consumer rdkafka#consumer-0
LOG-3-FAIL: 127.0.0.1:9093/bootstrap: Failed to connect to broker at new-host:9093: Connection refused
ERROR (Local: Broker transport failure): 127.0.0.1:9093/bootstrap: Failed to connect to broker at new-host:9093: Connection refused
ERROR (Local: All broker connections are down): 1/1 brokers are down

after I was looking at the same problem here, I get an explanation that the problem is caused by the use of ipv4 or ipv6. I've checked the IP version used, and I think my problem does not lie there.is there any suggestion, what's wrong with above command? Thx.

Interrupted system call when -L

I am using CentOS 6.5. I compiled the binary on one system then copied it out to several others. There are Kafka brokers on hadoop-dev[02,03,04] on my cluster. I get proper output, with what looks like valid info, but also some odd errors.

When I look at metadata_list() which calls metadata_print(), I start to suspect the problem comes when you're iterating over the members of the ISR. Perhaps the error comes from me having defined my broker IDs as 2,3,4 (instead of a more-expected 0,1,2?)...

I'm honestly not sure. I'll keep digging, but if you have any insight, I'd welcome any guidance. Full output follows.

:) ./kafkacat-CentOS-6.5-x86_64 -L -b hadoop-dev03
Metadata for all topics (from broker -1: hadoop-dev03:9092/bootstrap):
3 brokers:
broker 3 at hadoop-dev03:9092
broker 2 at hadoop-dev02:9092
broker 4 at hadoop-dev04:9092
3 topics:
topic "CleanedStream" with 32 partitions:
partition 23, leader 3, replicas: 3,2, isrs: 3,2
partition 17, leader 3, replicas: 3,2, isrs: 3,2
partition 8, leader 3, replicas: 3,4, isrs: 3,4
partition 26, leader 3, replicas: 3,4, isrs: 3,4
partition 11, leader 3, replicas: 3,2, isrs: 3,2
partition 29, leader 3, replicas: 3,2, isrs: 3,2
partition 20, leader 3, replicas: 3,4, isrs: 3,4
partition 2, leader 3, replicas: 3,4, isrs: 3,4
partition 5, leader 3, replicas: 3,2, isrs: 3,2
partition 14, leader 3, replicas: 3,4, isrs: 3,4
partition 13, leader 2, replicas: 2,3, isrs: 2,3
partition 4, leader 2, replicas: 2,4, isrs: 2,4
partition 22, leader 2, replicas: 2,4, isrs: 2,4
partition 31, leader 2, replicas: 2,3, isrs: 2,3
partition 7, leader 2, replicas: 2,3, isrs: 2,3
partition 16, leader 2, replicas: 2,4, isrs: 2,4
partition 25, leader 2, replicas: 2,3, isrs: 2,3
partition 10, leader 2, replicas: 2,4, isrs: 2,4
partition 1, leader 2, replicas: 2,3, isrs: 2,3
partition 19, leader 2, replicas: 2,3, isrs: 2,3
partition 28, leader 2, replicas: 2,4, isrs: 2,4
partition 9, leader 4, replicas: 4,3, isrs: 4,3
partition 18, leader 4, replicas: 4,2, isrs: 4,2
partition 27, leader 4, replicas: 4,3, isrs: 4,3
partition 12, leader 4, replicas: 4,2, isrs: 4,2
partition 3, leader 4, replicas: 4,3, isrs: 4,3
partition 21, leader 4, replicas: 4,3, isrs: 4,3
partition 30, leader 4, replicas: 4,2, isrs: 4,2
partition 15, leader 4, replicas: 4,3, isrs: 4,3
partition 24, leader 4, replicas: 4,2, isrs: 4,2
partition 6, leader 4, replicas: 4,2, isrs: 4,2
partition 0, leader 4, replicas: 4,2, isrs: 4,2
topic "RawStream" with 32 partitions:
partition 23, leader 3, replicas: 3,4, isrs: 3,4
partition 8, leader 3, replicas: 3,2, isrs: 3,2
partition 17, leader 3, replicas: 3,4, isrs: 3,4
partition 26, leader 3, replicas: 3,2, isrs: 3,2
partition 11, leader 3, replicas: 3,4, isrs: 3,4
partition 29, leader 3, replicas: 3,4, isrs: 3,4
partition 2, leader 3, replicas: 3,2, isrs: 3,2
partition 20, leader 3, replicas: 3,2, isrs: 3,2
partition 5, leader 3, replicas: 3,4, isrs: 3,4
partition 14, leader 3, replicas: 3,2, isrs: 3,2
partition 13, leader 2, replicas: 2,4, isrs: 2,4
partition 4, leader 2, replicas: 2,3, isrs: 2,3
partition 31, leader 2, replicas: 2,4, isrs: 2,4
partition 22, leader 2, replicas: 2,3, isrs: 2,3
partition 16, leader 2, replicas: 2,3, isrs: 2,3
partition 7, leader 2, replicas: 2,4, isrs: 2,4
partition 25, leader 2, replicas: 2,4, isrs: 2,4
partition 10, leader 2, replicas: 2,3, isrs: 2,3
partition 1, leader 2, replicas: 2,4, isrs: 2,4
partition 28, leader 2, replicas: 2,3, isrs: 2,3
partition 19, leader 2, replicas: 2,4, isrs: 2,4
partition 9, leader 4, replicas: 4,2, isrs: 4,2
partition 27, leader 4, replicas: 4,2, isrs: 4,2
partition 18, leader 4, replicas: 4,3, isrs: 4,3
partition 21, leader 4, replicas: 4,2, isrs: 4,2
partition 3, leader 4, replicas: 4,2, isrs: 4,2
partition 12, leader 4, replicas: 4,3, isrs: 4,3
partition 30, leader 4, replicas: 4,3, isrs: 4,3
partition 15, leader 4, replicas: 4,2, isrs: 4,2
partition 24, leader 4, replicas: 4,3, isrs: 4,3
partition 6, leader 4, replicas: 4,3, isrs: 4,3
partition 0, leader 4, replicas: 4,3, isrs: 4,3
topic "FirstTest" with 8 partitions:
partition 2, leader 2, replicas: 2,4, isrs: 2,4
partition 5, leader 2, replicas: 2,3, isrs: 2,3
partition 4, leader 4, replicas: 4,2, isrs: 4,2
partition 7, leader 4, replicas: 4,3, isrs: 4,3
partition 1, leader 4, replicas: 4,3, isrs: 4,3
partition 3, leader 3, replicas: 3,4, isrs: 3,4
partition 6, leader 3, replicas: 3,2, isrs: 3,2
partition 0, leader 3, replicas: 3,2, isrs: 3,2
%3|1418754255.111|FAIL|rdkafka#producer-0| hadoop-dev02:9092/2: Failed to connect to broker at hadoop-dev02:9092: Interrupted system call
%3|1418754255.111|FAIL|rdkafka#producer-0| hadoop-dev03:9092/3: Failed to connect to broker at hadoop-dev03:9092: Interrupted system call
%3|1418754255.111|ERROR|rdkafka#producer-0| hadoop-dev02:9092/2: Failed to connect to broker at hadoop-dev02:9092: Interrupted system call
%3|1418754255.111|ERROR|rdkafka#producer-0| hadoop-dev03:9092/3: Failed to connect to broker at hadoop-dev03:9092: Interrupted system call
%3|1418754255.111|FAIL|rdkafka#producer-0| hadoop-dev04:9092/4: Failed to connect to broker at hadoop-dev04:9092: Interrupted system call
%3|1418754255.111|ERROR|rdkafka#producer-0| hadoop-dev04:9092/4: Failed to connect to broker at hadoop-dev04:9092: Interrupted system call

Weird offset file when directory does not yet exist

(This might be a librdkafka issue, I'm not sure.)

When running

kafkacat -C -b mybroker.example.org -t mytopic -e -o stored -c 1000 -X topic.auto.commit.interval.ms=100 -X topic.offset.store.path=/tmp/kafka-offsets

for the first time, if /tmp/kafka-offsets does not yet exist, kafkacat seems to create this as a file and store a single integer in this file. I'm not sure if this is a real partition offset, but kafka brokers log plenty of OffsetOutOfRangeExceptions for the number in this file when kafkacat runs.

Perhaps librdkafka should always create topic.offset.store.path as a directory if it does not yet exist?

`kafkacat -C` ignores new partitions

Once kafkacat -C is started it only prints messages from partitions which existed at time it was started. Newly added partitions are not included in kafkacat output(although, there is a warning that partition count changed).

It might be helpful if kafkacat could add new partitions to the output without restart.

Negative message offset is out-of-range

Might be a problem more related to broker.

When I try to get the last msg. in queue using -o -1 option, I get the following:

$ kafkacat -C -b localhost -t my_topic -o -1 -c 1 -v
% Fatal error at consume_cb:345:
% ERROR: Topic my_topic [1] error: Broker: Offset out of range

Using kafkacat & librdkafka from GitHub + kafka v2.9.2-0.8.1.1

Min wall clock time per single kafkacat invokation

Hi,

It seems that it takes at least 1 second for kafkacat producer to complete.

time sh -c "date | ~/dev/kafkacat/kafkacat -P -t _temp -p 0 -c 1 -b 192.168.86.3,192.168.86.5"

real 0m1.068s
user 0m0.006s
sys 0m0.039s

Is there any timeout setting that would allow kafkacat to exit immediately upon producing all messages?

Error consuming message with empty 'message' part

Upon consuming a message which does not have a message part (key only), kafkacat fails with error:

% ERROR: Write error for message of 0 bytes at offset 18): Undefined error: 0

On a relevant note, kafka-console-consumer.sh shows no error, expectedly printing null as value

kafkacat stuck at certain offsets after topic delete/restore

We observe the following issue:

After topic has been re-created, kafkacat is no longer able to move past a certain point. No error messages are being displayed.

When run with -dmsg flag, the following line is seen:

[9] MessageSet size 1048576, error "Success", MaxOffset 1069260

And MaxOffset keeps increasing, since new data is being added to the topic.

If we run kafkacat past the problematic offset (however, just next offset is not enough), e.g. with -o N+1000000 it picks up working fine.

kafka.tools.DumpLogSegments does not show any errors, saying is valid for each message, even for the problematic offset.

Any ideas, how to debug, fix?

Print partition id of the message

Would be great to have the ability to see the partition id of the messages, similar to offsets printed with -O flag.

Better yet, since offsets belong to partitions it would make sense to prepend them with partition id, e.g. 0:12345

[Feature Request] Configure kafkacat to exit on broker connectivity errors

While using kafkacat as a consumer or as a producer, it would be good if it's possible to have it quit with a non-zero exit status if there are broker related errors. Currently, in my testing as a consumer if there are broker connectivity errors after kafkacat starts consuming, it prints logs to stderr and appears to repeatedly retry.

Is this perhaps already possible to do?

Topic Admin API

Hi!

Would it make sense for kafkacat to be able to create/delete topics?

kafkacat from brew on OSX works... but displays errors

This is probably a brew issue somehow, but when I run

kafkacat -L -b 10.223.25.68:9092

Against my 5 broker cluster, I get

Metadata for all topics (from broker -1: 10.223.25.68:9092/bootstrap):
 5 brokers:
  broker 5 at 10.223.25.69:9092
  broker 1 at 10.223.25.65:9092
  broker 2 at 10.223.25.66:9092
  broker 3 at 10.223.25.67:9092
  broker 4 at 10.223.25.68:9092
 58 topics:

... much topic metadata elided ...

%3|1445369634.430|FAIL|rdkafka#producer-0| 10.223.25.69:9092/5: Failed to connect to broker at 10.223.25.69:9092: Undefined error: 0
%3|1445369634.430|FAIL|rdkafka#producer-0| 10.223.25.68:9092/4: Failed to connect to broker at 10.223.25.68:9092: Undefined error: 0
%3|1445369634.430|ERROR|rdkafka#producer-0| 10.223.25.69:9092/5: Failed to connect to broker at 10.223.25.69:9092: Undefined error: 0
%3|1445369634.430|ERROR|rdkafka#producer-0| 10.223.25.68:9092/4: Failed to connect to broker at 10.223.25.68:9092: Undefined error: 0
%3|1445369634.430|FAIL|rdkafka#producer-0| 10.223.25.67:9092/3: Failed to connect to broker at 10.223.25.67:9092: Undefined error: 0
%3|1445369634.430|FAIL|rdkafka#producer-0| 10.223.25.66:9092/2: Failed to connect to broker at 10.223.25.66:9092: Undefined error: 0
%3|1445369634.430|ERROR|rdkafka#producer-0| 10.223.25.67:9092/3: Failed to connect to broker at 10.223.25.67:9092: Undefined error: 0
%3|1445369634.430|ERROR|rdkafka#producer-0| 10.223.25.66:9092/2: Failed to connect to broker at 10.223.25.66:9092: Undefined error: 0
%3|1445369634.430|FAIL|rdkafka#producer-0| 10.223.25.65:9092/1: Failed to connect to broker at 10.223.25.65:9092: Undefined error: 0
%3|1445369634.430|ERROR|rdkafka#producer-0| 10.223.25.65:9092/1: Failed to connect to broker at 10.223.25.65:9092: Undefined error: 0

Those 'Undefined errors' at the end are weird. It's clearly contacting the brokers, as it's getting the metadata. When I run kafkacat on ubuntu against the same broker. Everything is printed out identically, except the errors aren't printed on the end.

Errors installing on debian

Hello - thank you for this awesome bit of code and also for librdkafka. I've set everything up on my macbook without a single issue. However, when attempting to get this running on Linux, I have hit a couple snags. I am currently stuck on this error when I try to run ./bootstrap for the kafkacata install:

Now type 'make' to build
gcc -MD -MP -Itmp-bootstrap/usr/local/include -Itmp-bootstrap/usr/local/include -g -O2 -Wall -Werror -Wfloat-equal -Wpointer-arith -c kafkacat.c -o kafkacat.o
kafkacat.c: In function ‘produce_file’:
kafkacat.c:198:9: error: format ‘%zd’ expects argument of type ‘signed size_t’, but argument 4 has type ‘__off_t’ [-Werror=format]
cc1: all warnings being treated as errors
make: *** [kafkacat.o] Error 1

This is a fresh install of debian on a VM. Please let me know if you have any suggestions for getting past this point.

Thanks again for the awesome software.

/usr/bin/ld: cannot find -lyajl

No LSB modules are available.
Distributor ID: Ubuntu
Description: Ubuntu 14.04.2 LTS
Release: 14.04
Codename: trusty

sudo ldconfig -p | grep yajl
libyajl.so.2 (libc6,x86-64) => /usr/lib/x86_64-linux-gnu/libyajl.so.2 libyajl.so (libc6,x86-64) => /usr/lib/x86_64-linux-gnu/libyajl.so

Loadbalancing messages between kafkacat consumers

Hello,

I am attempting to use Kafka in the loadbalancing fashion. As Kafka documentation says:

If all the consumer instances have the same consumer group, then this works just like a traditional queue balancing load over the consumers.

So what I did is:

  • Run Kafka 0.8.2.1 using spotify/kafka (via boot2docker)
  • Once producer
  • Two consumers with same group.id

First I created the topic:

$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic beta

Then subscribe two consumers to it:

kafkacat -C -b $(boot2docker ip):9092 -t beta -X group.id=mygroupid

And produce a message:

date | kafkacat -P -b $(boot2docker ip):9092 -t beta

I expect to only one of the consumers to receive the message, but both do.

On the other hand, if I run the consumers using kafka-console-consumer.sh, the message is only consumed once:

echo "group.id=mygroupid" > /consumer.beta.properties

$KAFKA_HOME/bin/kafka-console-consumer.sh \
  --zookeeper localhost:2181 \
  --topic beta \
  --consumer.config /consumer.beta.properties

What's wrong?

(I have also asked this question on Stackoverflow)

bootstrap.sh not working on OS X El Capitan (10.11.4)

I ran the most recent bootstrap.sh. It errored out with:

Build of libyajl SUCCEEDED!
Building kafkacat
Using -L/usr/local/lib  -lpthread -lz -lcrypto -lssl -lsasl2 for rdkafka
Package yajl was not found in the pkg-config search path.
Perhaps you should add the directory containing `yajl.pc'
to the PKG_CONFIG_PATH environment variable
No package 'yajl' found
grep: tmp-bootstrap/usr/local/lib/pkgconfig/yajl.pc: No such file or directory
Using  for yajl
./bootstrap.sh: line 97: ./configure: No such file or directory

can you put in the documentation how to send to kafka a command to delete a key ?(in a compact topic)

i use the compact retention policy on some topics.
i need to send to kafka, a command to add a key and no message (null payload), to create a tombstone.
i've tried this command :
kafkacat -b -t -p 0 -K# -c 1 -P

with input for example :
11#
=> does it creates a tombstone?
i've seen the -Z flag, but it seems to be for a null key and not a null value .
if you can clarify this in your documentaiton, it will help many topic compact users.

best regards,

Charles.

support for zookeeper.connect chroot path

This probably should be filed against both librdkafka and kafkacat. This is an enhancement request.

[http://kafka.apache.org/08/documentation.html|(http://kafka.apache.org/08/documentation.html)]

Documentation describes.

zookeeper.connect

Zookeeper also allows you to add a "chroot" path which will make all kafka data for this cluster appear under a particular path. This is a way to setup multiple Kafka clusters or other applications on the same zookeeper cluster. To do this give a connection string in the form hostname1:port1,hostname2:port2,hostname3:port3/chroot/path which would put all this cluster's data under the path /chroot/path. Note that you must create this path yourself prior to starting the broker and consumers must use the same connection string.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.