Giter Site home page Giter Site logo

kcat's Introduction

logo by @dtrapezoid

kcat

kcat is the project formerly known as as kafkacat

kcat and kafkacat are Copyright (c) 2014-2021 Magnus Edenhill

https://github.com/edenhill/kcat

kcat logo by @dtrapezoid

What is kcat

kcat is a generic non-JVM producer and consumer for Apache Kafka >=0.8, think of it as a netcat for Kafka.

In producer mode kcat reads messages from stdin, delimited with a configurable delimiter (-D, defaults to newline), and produces them to the provided Kafka cluster (-b), topic (-t) and partition (-p).

In consumer mode kcat reads messages from a topic and partition and prints them to stdout using the configured message delimiter.

There's also support for the Kafka >=0.9 high-level balanced consumer, use the -G <group> switch and provide a list of topics to join the group.

kcat also features a Metadata list (-L) mode to display the current state of the Kafka cluster and its topics and partitions.

Supports Avro message deserialization using the Confluent Schema-Registry, and generic primitive deserializers (see examples below).

kcat is fast and lightweight; statically linked it is no more than 150Kb.

What happened to kafkacat?

kcat is kafkacat. The kafkacat project was renamed to kcat in August 2021 to adhere to the Apache Software Foundation's (ASF) trademark policies. Apart from the name, nothing else was changed.

Try it out with docker

# List brokers and topics in cluster
$ docker run -it --network=host edenhill/kcat:1.7.1 -b YOUR_BROKER -L

See Examples for usage options, and Running in Docker for more information on how to properly run docker-based clients with Kafka.

Install

On recent enough Debian systems:

apt-get install kafkacat

On recent openSUSE systems:

zypper addrepo https://download.opensuse.org/repositories/network:utilities/openSUSE_Factory/network:utilities.repo
zypper refresh
zypper install kafkacat

(see this page for instructions to install with openSUSE LEAP)

On Mac OS X with homebrew installed:

brew install kcat

On Fedora

# dnf copr enable bvn13/kcat
# dnf update
# dnf install kafkacat

See this blog for how to build from sources and install kafkacat/kcat on recent Fedora systems.

Otherwise follow directions below

Requirements

On Ubuntu or Debian: sudo apt-get install librdkafka-dev libyajl-dev

Build

./configure <usual-configure-options>
make
sudo make install

Build for Windows

cd win32
nuget restore
msbuild

NOTE: Requires Build Tools for Visual Studio 2017 with components Windows 8.1 SDK and VC++ 2015.3 v14.00 (v140) toolset to be installed.

Quick build

The bootstrap.sh build script will download and build the required dependencies, providing a quick and easy means of building kcat. Internet connectivity and wget/curl is required by this script. The resulting kcat binary will be linked statically to avoid runtime dependencies. NOTE: Requires curl and cmake (for yajl) to be installed.

./bootstrap.sh

Configuration

Any librdkafka configuration property can be set on the command line using -X property=value, or in a configuration file specified by -F <config-file>.

If no configuration file was specified with -F .. on the command line, kcat will try the $KCAT_CONFIG or (deprecated) $KAFKACAT_CONFIG environment variable, and then the default configuration file ~/.config/kcat.conf or the (deprecated) ~/.config/kafkacat.conf.

Configuration files are optional.

Examples

High-level balanced KafkaConsumer: subscribe to topic1 and topic2 (requires broker >=0.9.0 and librdkafka version >=0.9.1)

$ kcat -b mybroker -G mygroup topic1 topic2

Read messages from stdin, produce to 'syslog' topic with snappy compression

$ tail -f /var/log/syslog | kcat -b mybroker -t syslog -z snappy

Read messages from Kafka 'syslog' topic, print to stdout

$ kcat -b mybroker -t syslog

Produce messages from file (one file is one message)

$ kcat -P -b mybroker -t filedrop -p 0 myfile1.bin /etc/motd thirdfile.tgz

Produce messages transactionally (one single transaction for all messages):

$ kcat -P -b mybroker -t mytopic -X transactional.id=myproducerapp

Read the last 2000 messages from 'syslog' topic, then exit

$ kcat -C -b mybroker -t syslog -p 0 -o -2000 -e

Consume from all partitions from 'syslog' topic

$ kcat -C -b mybroker -t syslog

Output consumed messages in JSON envelope:

$ kcat -b mybroker -t syslog -J

Decode Avro key (-s key=avro), value (-s value=avro) or both (-s avro) to JSON using schema from the Schema-Registry:

$ kcat -b mybroker -t ledger -s avro -r http://schema-registry-url:8080

Decode Avro message value and extract Avro record's "age" field:

$ kcat -b mybroker -t ledger -s value=avro -r http://schema-registry-url:8080 | jq .payload.age

Decode key as 32-bit signed integer and value as 16-bit signed integer followed by an unsigned byte followed by string:

$ kcat -b mybroker -t mytopic -s key='i$' -s value='hB s'

Hint: see kcat -h for all available deserializer options.

Output consumed messages according to format string:

$ kcat -b mybroker -t syslog -f 'Topic %t[%p], offset: %o, key: %k, payload: %S bytes: %s\n'

Read the last 100 messages from topic 'syslog' with librdkafka configuration parameter 'broker.version.fallback' set to '0.8.2.1' :

$ kcat -C -b mybroker -X broker.version.fallback=0.8.2.1 -t syslog -p 0 -o -100 -e

Produce a tombstone (a "delete" for compacted topics) for key "abc" by providing an empty message value which -Z interpretes as NULL:

$ echo "abc:" | kcat -b mybroker -t mytopic -Z -K:

Produce with headers:

$ echo "hello there" | kcat -b mybroker -P -t mytopic -H "header1=header value" -H "nullheader" -H "emptyheader=" -H "header1=duplicateIsOk"

Print headers in consumer:

$ kcat -b mybroker -C -t mytopic -f 'Headers: %h: Message value: %s\n'

Enable the idempotent producer, providing exactly-once and strict-ordering producer guarantees:

$ kcat -b mybroker -X enable.idempotence=true -P -t mytopic ....

Connect to cluster using SSL and SASL PLAIN authentication:

$ kcat -b mybroker -X security.protocol=SASL_SSL -X sasl.mechanism=PLAIN -X sasl.username=myapikey -X sasl.password=myapisecret ...

Metadata listing:

$ kcat -L -b mybroker
Metadata for all topics (from broker 1: mybroker:9092/1):
 3 brokers:
  broker 1 at mybroker:9092
  broker 2 at mybrokertoo:9092
  broker 3 at thirdbroker:9092
 16 topics:
  topic "syslog" with 3 partitions:
    partition 0, leader 3, replicas: 1,2,3, isrs: 1,2,3
    partition 1, leader 1, replicas: 1,2,3, isrs: 1,2,3
    partition 2, leader 1, replicas: 1,2, isrs: 1,2
  topic "rdkafkatest1_auto_49f744a4327b1b1e" with 2 partitions:
    partition 0, leader 3, replicas: 3, isrs: 3
    partition 1, leader 1, replicas: 1, isrs: 1
  topic "rdkafkatest1_auto_e02f58f2c581cba" with 2 partitions:
    partition 0, leader 3, replicas: 3, isrs: 3
    partition 1, leader 1, replicas: 1, isrs: 1
  ....

JSON metadata listing

$ kcat -b mybroker -L -J

Pretty-printed JSON metadata listing

$ kcat -b mybroker -L -J | jq .

Query offset(s) by timestamp(s)

$ kcat -b mybroker -Q -t mytopic:3:2389238523 -t mytopic2:0:18921841

Consume messages between two timestamps

$ kcat -b mybroker -C -t mytopic -o s@1568276612443 -o e@1568276617901

Running in Docker

The latest kcat docker image is edenhill/kcat:1.7.1, there's also Confluent's kafkacat docker images on Docker Hub.

If you are connecting to Kafka brokers also running on Docker you should specify the network name as part of the docker run command using the --network parameter. For more details of networking with Kafka and Docker see this post.

Here are two short examples of using kcat from Docker. See the Docker Hub listing and kafkacat docs for more details:

Send messages using here doc:

docker run -it --rm \
        edenhill/kcat \
                -b kafka-broker:9092 \
                -t test \
                -K: \
                -P <<EOF

1:{"order_id":1,"order_ts":1534772501276,"total_amount":10.50,"customer_name":"Bob Smith"}
2:{"order_id":2,"order_ts":1534772605276,"total_amount":3.32,"customer_name":"Sarah Black"}
3:{"order_id":3,"order_ts":1534772742276,"total_amount":21.00,"customer_name":"Emma Turner"}
EOF

Consume messages:

docker run -it --rm \
        edenhill/kcat \
           -b kafka-broker:9092 \
           -C \
           -f '\nKey (%K bytes): %k\t\nValue (%S bytes): %s\n\Partition: %p\tOffset: %o\n--\n' \
           -t test

Key (1 bytes): 1
Value (88 bytes): {"order_id":1,"order_ts":1534772501276,"total_amount":10.50,"customer_name":"Bob Smith"}
Partition: 0    Offset: 0
--

Key (1 bytes): 2
Value (89 bytes): {"order_id":2,"order_ts":1534772605276,"total_amount":3.32,"customer_name":"Sarah Black"}
Partition: 0    Offset: 1
--

Key (1 bytes): 3
Value (90 bytes): {"order_id":3,"order_ts":1534772742276,"total_amount":21.00,"customer_name":"Emma Turner"}
Partition: 0    Offset: 2
--
% Reached end of topic test [0] at offset 3

Run a mock Kafka cluster

With kcat you can spin up an ephemeral in-memory mock Kafka cluster that you you can connect your Kafka applications to for quick testing. The mock cluster supports a reasonable subset of the Kafka protocol, such as:

  • Producer
  • Idempotent Producer
  • Transactional Producer
  • Low-level consumer
  • High-level balanced consumer groups with offset commits
  • Topic Metadata and auto creation

Spin the cluster by running kcat in the -M (for mock) mode:

# Create mock cluster with 3 brokers
$ kcat -M 3
...
BROKERS=localhost:12345,localhost:46346,localhost:23599
...

While kcat runs, let your Kafka applications connect to the mock cluster by configuring them with the bootstrap.servers emitted in the BROKERS line above.

Let kcat run for as long as you need the cluster, then terminate it by pressing Ctrl-D.

Since the cluster runs all in memory, with no disk IO, it is quite suitable for performance testing.

kcat's People

Contributors

62mkv avatar andrewegel avatar ankon avatar anshulpatel25 avatar apple-corps avatar bstarling avatar bvn13 avatar cephalowat avatar champtar avatar chrisvroberts avatar edenhill avatar ethack avatar fsaintjacques avatar jeeftor avatar jelmer avatar johnroesler avatar julien-lecomte avatar ludwikjaniuk avatar maximecaron avatar michael12312 avatar redmar avatar rmoff avatar rpluem-vf avatar rtyler avatar sanjaymsh avatar slimhazard avatar solsson avatar vincentbernat avatar whissi avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kcat's Issues

support for zookeeper.connect chroot path

This probably should be filed against both librdkafka and kafkacat. This is an enhancement request.

[http://kafka.apache.org/08/documentation.html|(http://kafka.apache.org/08/documentation.html)]

Documentation describes.

zookeeper.connect

Zookeeper also allows you to add a "chroot" path which will make all kafka data for this cluster appear under a particular path. This is a way to setup multiple Kafka clusters or other applications on the same zookeeper cluster. To do this give a connection string in the form hostname1:port1,hostname2:port2,hostname3:port3/chroot/path which would put all this cluster's data under the path /chroot/path. Note that you must create this path yourself prior to starting the broker and consumers must use the same connection string.

[Feature Request] Configure kafkacat to exit on broker connectivity errors

While using kafkacat as a consumer or as a producer, it would be good if it's possible to have it quit with a non-zero exit status if there are broker related errors. Currently, in my testing as a consumer if there are broker connectivity errors after kafkacat starts consuming, it prints logs to stderr and appears to repeatedly retry.

Is this perhaps already possible to do?

Negative message offset is out-of-range

Might be a problem more related to broker.

When I try to get the last msg. in queue using -o -1 option, I get the following:

$ kafkacat -C -b localhost -t my_topic -o -1 -c 1 -v
% Fatal error at consume_cb:345:
% ERROR: Topic my_topic [1] error: Broker: Offset out of range

Using kafkacat & librdkafka from GitHub + kafka v2.9.2-0.8.1.1

-o when consuming from multiple partitions?

I think I asked this before, but I don't remember the answer.

When consuming from multiple partitions, what does the -o <offset> flag do? Offsets are for particular partitions. Which partition's offset is -o referring to?

-o not working or I am not using it right

Magnus, great tool, really love it!!!

I am trying to read from a specific offset on the consumer side but it seems to always just display everything

kafkacat -b xyz -t abc -p 0 -o 8

also is there a way to do something like from x offset until y offset and not just from x till end?

Min wall clock time per single kafkacat invokation

Hi,

It seems that it takes at least 1 second for kafkacat producer to complete.

time sh -c "date | ~/dev/kafkacat/kafkacat -P -t _temp -p 0 -c 1 -b 192.168.86.3,192.168.86.5"

real 0m1.068s
user 0m0.006s
sys 0m0.039s

Is there any timeout setting that would allow kafkacat to exit immediately upon producing all messages?

from last changes install is not linking shared lib

this is what I get now when running kafkcat after doing ./bootstrap

kafkacat: error while loading shared libraries: librdkafka.so.1: cannot open shared object file: No such file or directory

was working before recent changes

`kafkacat -C` ignores new partitions

Once kafkacat -C is started it only prints messages from partitions which existed at time it was started. Newly added partitions are not included in kafkacat output(although, there is a warning that partition count changed).

It might be helpful if kafkacat could add new partitions to the output without restart.

kafkacat reports exit code of 0 in some error cases

  1. When bad hostname is used
#/bin/sh
cat /tmp/test | kafkacat-CentOS-6.5-x86_64 -P -b badhost -t RawEvents -p 1 -X socket.timeout.ms=1000 -X topic.message.timeout.ms=1000 -X debug=all
echo "Exit Code:" $?

The output is

%7|1429923085.924|BROKER|rdkafka#producer-0| badhost:9092/bootstrap: Added new broker with NodeId -1
%7|1429923085.924|BRKMAIN|rdkafka#producer-0| badhost:9092/bootstrap: Enter main broker thread
%7|1429923085.924|TOPIC|rdkafka#producer-0| New local topic: RawEvents
%7|1429923085.924|CONNECT|rdkafka#producer-0| badhost:9092/bootstrap: broker in state INIT connecting
%3|1429923085.929|ERROR|rdkafka#producer-0| Failed to resolve 'badhost:9092': Name or service not known
%3|1429923085.929|GETADDR|rdkafka#producer-0| badhost:9092/bootstrap: Failed to resolve 'badhost:9092': Name or service not known
%7|1429923086.929|CONNECT|rdkafka#producer-0| badhost:9092/bootstrap: broker in state INIT connecting
%3|1429923086.929|ERROR|rdkafka#producer-0| Failed to resolve 'badhost:9092': Name or service not known
%3|1429923086.929|GETADDR|rdkafka#producer-0| badhost:9092/bootstrap: Failed to resolve 'badhost:9092': Name or service not known
%7|1429923087.924|TIMEOUT|rdkafka#producer-0| 6 message(s) from 1 toppar(s) timed out
%7|1429923087.929|CONNECT|rdkafka#producer-0| badhost:9092/bootstrap: broker in state INIT connecting
%3|1429923087.929|ERROR|rdkafka#producer-0| Failed to resolve 'badhost:9092': Name or service not known
%3|1429923087.929|GETADDR|rdkafka#producer-0| badhost:9092/bootstrap: Failed to resolve 'badhost:9092': Name or service not known
%7|1429923087.934|DESTROY|rdkafka#producer-0| Terminating instance
%7|1429923087.934|BROKERFAIL|rdkafka#producer-0| badhost:9092/bootstrap: failed: err: Local: Broker handle destroyed: (errno: Interrupted system call)
%7|1429923087.934|STATE|rdkafka#producer-0| badhost:9092/bootstrap: Broker changed state INIT -> DOWN
%7|1429923087.935|BUFQ|rdkafka#producer-0| badhost:9092/bootstrap: Purging bufq with 0 buffers
Exit Code: 0

I was expecting to see a non-zero exit code.

  1. When correct hostname is used but the broker is not running
#/bin/sh
cat /tmp/test | kafkacat-CentOS-6.5-x86_64 -P -b localhost -t test-topic -p 1 -X socket.timeout.ms=1000 -X topic.message.timeout.ms=1000 -X debug=all
echo "Exit Code:" $?

The output is

%7|1429923690.023|BROKER|rdkafka#producer-0| localhost:9092/bootstrap: Added new broker with NodeId -1
%7|1429923690.023|BRKMAIN|rdkafka#producer-0| localhost:9092/bootstrap: Enter main broker thread
%7|1429923690.023|TOPIC|rdkafka#producer-0| New local topic: test-topic
%7|1429923690.023|CONNECT|rdkafka#producer-0| localhost:9092/bootstrap: broker in state INIT connecting
%7|1429923690.024|CONNECT|rdkafka#producer-0| localhost:9092/bootstrap: couldn't connect to ipv4#127.0.0.1:9092: Connection refused
%7|1429923690.024|BROKERFAIL|rdkafka#producer-0| localhost:9092/bootstrap: failed: err: Local: Broker transport failure: (errno: Connection refused)
%3|1429923690.024|FAIL|rdkafka#producer-0| localhost:9092/bootstrap: Failed to connect to broker at localhost.localdomain:9092: Connection refused
%3|1429923690.024|ERROR|rdkafka#producer-0| localhost:9092/bootstrap: Failed to connect to broker at localhost.localdomain:9092: Connection refused
%7|1429923690.024|STATE|rdkafka#producer-0| localhost:9092/bootstrap: Broker changed state INIT -> DOWN
%3|1429923690.024|ERROR|rdkafka#producer-0| 1/1 brokers are down
%7|1429923690.024|BUFQ|rdkafka#producer-0| localhost:9092/bootstrap: Purging bufq with 0 buffers
%7|1429923691.024|CONNECT|rdkafka#producer-0| localhost:9092/bootstrap: broker in state DOWN connecting
%7|1429923691.024|CONNECT|rdkafka#producer-0| localhost:9092/bootstrap: couldn't connect to ipv4#127.0.0.1:9092: Connection refused
%7|1429923691.024|BROKERFAIL|rdkafka#producer-0| localhost:9092/bootstrap: failed: err: Local: Communication failure with broker: (errno: Connection refused)
%7|1429923691.024|BUFQ|rdkafka#producer-0| localhost:9092/bootstrap: Purging bufq with 0 buffers
%7|1429923692.023|TIMEOUT|rdkafka#producer-0| 6 message(s) from 1 toppar(s) timed out
%7|1429923692.024|CONNECT|rdkafka#producer-0| localhost:9092/bootstrap: broker in state DOWN connecting
%7|1429923692.024|CONNECT|rdkafka#producer-0| localhost:9092/bootstrap: couldn't connect to ipv4#127.0.0.1:9092: Connection refused
%7|1429923692.024|BROKERFAIL|rdkafka#producer-0| localhost:9092/bootstrap: failed: err: Local: Communication failure with broker: (errno: Connection refused)
%7|1429923692.024|BUFQ|rdkafka#producer-0| localhost:9092/bootstrap: Purging bufq with 0 buffers
%7|1429923692.026|DESTROY|rdkafka#producer-0| Terminating instance
%7|1429923692.026|BROKERFAIL|rdkafka#producer-0| localhost:9092/bootstrap: failed: err: Local: Broker handle destroyed: (errno: Interrupted system call)
%7|1429923692.026|BUFQ|rdkafka#producer-0| localhost:9092/bootstrap: Purging bufq with 0 buffers
Exit Code: 0

I was expecting to see a non-zero exit code.

Topic Admin API

Hi!

Would it make sense for kafkacat to be able to create/delete topics?

Error message while using rdkafka_example_cpp

Hi, i new in kafka, after I installed librdkafka, then run an example file, i have the some issues when using its. When using rdkafka_example_cpp to set up producer:

rdkafka_example_cpp -P -t my_topic -p 1 -b 127.0.0.1:9092

i got these outputs:

% Created producer rdkafka#producer-0
LOG-3-FAIL: 127.0.0.1:9091/bootstrap: Failed to connect to broker at new-host:9091: Connection refused

and when using rdkafka_example_cpp to set up consumer:

rdkafka_example_cpp -C -t my_topic -p 3 -b 127.0.0.1:9093

a got this these outputs :

% Created consumer rdkafka#consumer-0
LOG-3-FAIL: 127.0.0.1:9093/bootstrap: Failed to connect to broker at new-host:9093: Connection refused
ERROR (Local: Broker transport failure): 127.0.0.1:9093/bootstrap: Failed to connect to broker at new-host:9093: Connection refused
ERROR (Local: All broker connections are down): 1/1 brokers are down

after I was looking at the same problem here, I get an explanation that the problem is caused by the use of ipv4 or ipv6. I've checked the IP version used, and I think my problem does not lie there.is there any suggestion, what's wrong with above command? Thx.

Interrupted system call when -L

I am using CentOS 6.5. I compiled the binary on one system then copied it out to several others. There are Kafka brokers on hadoop-dev[02,03,04] on my cluster. I get proper output, with what looks like valid info, but also some odd errors.

When I look at metadata_list() which calls metadata_print(), I start to suspect the problem comes when you're iterating over the members of the ISR. Perhaps the error comes from me having defined my broker IDs as 2,3,4 (instead of a more-expected 0,1,2?)...

I'm honestly not sure. I'll keep digging, but if you have any insight, I'd welcome any guidance. Full output follows.

:) ./kafkacat-CentOS-6.5-x86_64 -L -b hadoop-dev03
Metadata for all topics (from broker -1: hadoop-dev03:9092/bootstrap):
3 brokers:
broker 3 at hadoop-dev03:9092
broker 2 at hadoop-dev02:9092
broker 4 at hadoop-dev04:9092
3 topics:
topic "CleanedStream" with 32 partitions:
partition 23, leader 3, replicas: 3,2, isrs: 3,2
partition 17, leader 3, replicas: 3,2, isrs: 3,2
partition 8, leader 3, replicas: 3,4, isrs: 3,4
partition 26, leader 3, replicas: 3,4, isrs: 3,4
partition 11, leader 3, replicas: 3,2, isrs: 3,2
partition 29, leader 3, replicas: 3,2, isrs: 3,2
partition 20, leader 3, replicas: 3,4, isrs: 3,4
partition 2, leader 3, replicas: 3,4, isrs: 3,4
partition 5, leader 3, replicas: 3,2, isrs: 3,2
partition 14, leader 3, replicas: 3,4, isrs: 3,4
partition 13, leader 2, replicas: 2,3, isrs: 2,3
partition 4, leader 2, replicas: 2,4, isrs: 2,4
partition 22, leader 2, replicas: 2,4, isrs: 2,4
partition 31, leader 2, replicas: 2,3, isrs: 2,3
partition 7, leader 2, replicas: 2,3, isrs: 2,3
partition 16, leader 2, replicas: 2,4, isrs: 2,4
partition 25, leader 2, replicas: 2,3, isrs: 2,3
partition 10, leader 2, replicas: 2,4, isrs: 2,4
partition 1, leader 2, replicas: 2,3, isrs: 2,3
partition 19, leader 2, replicas: 2,3, isrs: 2,3
partition 28, leader 2, replicas: 2,4, isrs: 2,4
partition 9, leader 4, replicas: 4,3, isrs: 4,3
partition 18, leader 4, replicas: 4,2, isrs: 4,2
partition 27, leader 4, replicas: 4,3, isrs: 4,3
partition 12, leader 4, replicas: 4,2, isrs: 4,2
partition 3, leader 4, replicas: 4,3, isrs: 4,3
partition 21, leader 4, replicas: 4,3, isrs: 4,3
partition 30, leader 4, replicas: 4,2, isrs: 4,2
partition 15, leader 4, replicas: 4,3, isrs: 4,3
partition 24, leader 4, replicas: 4,2, isrs: 4,2
partition 6, leader 4, replicas: 4,2, isrs: 4,2
partition 0, leader 4, replicas: 4,2, isrs: 4,2
topic "RawStream" with 32 partitions:
partition 23, leader 3, replicas: 3,4, isrs: 3,4
partition 8, leader 3, replicas: 3,2, isrs: 3,2
partition 17, leader 3, replicas: 3,4, isrs: 3,4
partition 26, leader 3, replicas: 3,2, isrs: 3,2
partition 11, leader 3, replicas: 3,4, isrs: 3,4
partition 29, leader 3, replicas: 3,4, isrs: 3,4
partition 2, leader 3, replicas: 3,2, isrs: 3,2
partition 20, leader 3, replicas: 3,2, isrs: 3,2
partition 5, leader 3, replicas: 3,4, isrs: 3,4
partition 14, leader 3, replicas: 3,2, isrs: 3,2
partition 13, leader 2, replicas: 2,4, isrs: 2,4
partition 4, leader 2, replicas: 2,3, isrs: 2,3
partition 31, leader 2, replicas: 2,4, isrs: 2,4
partition 22, leader 2, replicas: 2,3, isrs: 2,3
partition 16, leader 2, replicas: 2,3, isrs: 2,3
partition 7, leader 2, replicas: 2,4, isrs: 2,4
partition 25, leader 2, replicas: 2,4, isrs: 2,4
partition 10, leader 2, replicas: 2,3, isrs: 2,3
partition 1, leader 2, replicas: 2,4, isrs: 2,4
partition 28, leader 2, replicas: 2,3, isrs: 2,3
partition 19, leader 2, replicas: 2,4, isrs: 2,4
partition 9, leader 4, replicas: 4,2, isrs: 4,2
partition 27, leader 4, replicas: 4,2, isrs: 4,2
partition 18, leader 4, replicas: 4,3, isrs: 4,3
partition 21, leader 4, replicas: 4,2, isrs: 4,2
partition 3, leader 4, replicas: 4,2, isrs: 4,2
partition 12, leader 4, replicas: 4,3, isrs: 4,3
partition 30, leader 4, replicas: 4,3, isrs: 4,3
partition 15, leader 4, replicas: 4,2, isrs: 4,2
partition 24, leader 4, replicas: 4,3, isrs: 4,3
partition 6, leader 4, replicas: 4,3, isrs: 4,3
partition 0, leader 4, replicas: 4,3, isrs: 4,3
topic "FirstTest" with 8 partitions:
partition 2, leader 2, replicas: 2,4, isrs: 2,4
partition 5, leader 2, replicas: 2,3, isrs: 2,3
partition 4, leader 4, replicas: 4,2, isrs: 4,2
partition 7, leader 4, replicas: 4,3, isrs: 4,3
partition 1, leader 4, replicas: 4,3, isrs: 4,3
partition 3, leader 3, replicas: 3,4, isrs: 3,4
partition 6, leader 3, replicas: 3,2, isrs: 3,2
partition 0, leader 3, replicas: 3,2, isrs: 3,2
%3|1418754255.111|FAIL|rdkafka#producer-0| hadoop-dev02:9092/2: Failed to connect to broker at hadoop-dev02:9092: Interrupted system call
%3|1418754255.111|FAIL|rdkafka#producer-0| hadoop-dev03:9092/3: Failed to connect to broker at hadoop-dev03:9092: Interrupted system call
%3|1418754255.111|ERROR|rdkafka#producer-0| hadoop-dev02:9092/2: Failed to connect to broker at hadoop-dev02:9092: Interrupted system call
%3|1418754255.111|ERROR|rdkafka#producer-0| hadoop-dev03:9092/3: Failed to connect to broker at hadoop-dev03:9092: Interrupted system call
%3|1418754255.111|FAIL|rdkafka#producer-0| hadoop-dev04:9092/4: Failed to connect to broker at hadoop-dev04:9092: Interrupted system call
%3|1418754255.111|ERROR|rdkafka#producer-0| hadoop-dev04:9092/4: Failed to connect to broker at hadoop-dev04:9092: Interrupted system call

kafkacat metadata does not display topic metadata

when i execute kafka-topics --describe --zookeeper localhost --topic mytopic, it returnsPartitionCount:10 ReplicationFactor:1 Configs:cleanup.policy=compact .
it also display other details about leader and replicas:
Topic: myTopic Partition: 0 Leader: 1 Replicas: 1 Isr: 1

it would be useful if kafkacat return this kind of details, especially the cleanup policy in case of log compaction usage.

best regards,

Charles.

kafkacat stuck at certain offsets after topic delete/restore

We observe the following issue:

After topic has been re-created, kafkacat is no longer able to move past a certain point. No error messages are being displayed.

When run with -dmsg flag, the following line is seen:

[9] MessageSet size 1048576, error "Success", MaxOffset 1069260

And MaxOffset keeps increasing, since new data is being added to the topic.

If we run kafkacat past the problematic offset (however, just next offset is not enough), e.g. with -o N+1000000 it picks up working fine.

kafka.tools.DumpLogSegments does not show any errors, saying is valid for each message, even for the problematic offset.

Any ideas, how to debug, fix?

/usr/bin/ld: cannot find -lyajl

No LSB modules are available.
Distributor ID: Ubuntu
Description: Ubuntu 14.04.2 LTS
Release: 14.04
Codename: trusty

sudo ldconfig -p | grep yajl
libyajl.so.2 (libc6,x86-64) => /usr/lib/x86_64-linux-gnu/libyajl.so.2 libyajl.so (libc6,x86-64) => /usr/lib/x86_64-linux-gnu/libyajl.so

can you put in the documentation how to send to kafka a command to delete a key ?(in a compact topic)

i use the compact retention policy on some topics.
i need to send to kafka, a command to add a key and no message (null payload), to create a tombstone.
i've tried this command :
kafkacat -b -t -p 0 -K# -c 1 -P

with input for example :
11#
=> does it creates a tombstone?
i've seen the -Z flag, but it seems to be for a null key and not a null value .
if you can clarify this in your documentaiton, it will help many topic compact users.

best regards,

Charles.

Error consuming message with empty 'message' part

Upon consuming a message which does not have a message part (key only), kafkacat fails with error:

% ERROR: Write error for message of 0 bytes at offset 18): Undefined error: 0

On a relevant note, kafka-console-consumer.sh shows no error, expectedly printing null as value

`-e` flag ignored in case of many active partitions

Apparently, all partitions must timeout simultaneously in order for this flag to work. If topic has many active partitions the chance of this is low and kafkacat continues to work indefinitely.

Looks like if this flag is set kafkacat should to pin down on start latest offsets for each of partition and exit once all offsets are reached.

Kafkacat loops and consume all bandwidth

Hello,

When kafkacat tries to fetch a message bigger than librdkafka fetch.message.max.bytes, kafkacat loops forever and consumes all bandwidth and cpu.

Instead it should throttle / print a message / return?

Protocol parse failure (incorrect broker.version)

After upgrading to kafkacat 1.3.0 and librdkafka 0.9.1 I get the following error in consumer mode:

%4|1467207129.009|PROTOERR|rdkafka#consumer-1| <ip>:9092/1: Protocol parse failure at rd_kafka_fetch_reply_handle:3748 (incorrect broker.version.fallback?)
%4|1467207129.009|PROTOERR|rdkafka#consumer-1| <ip>:9092/3: Protocol parse failure at rd_kafka_fetch_reply_handle:3748 (incorrect broker.version.fallback?)
%4|1467207129.009|PROTOERR|rdkafka#consumer-1| <ip>:9092/3: expected 36681250 bytes > 150 remaining bytes
%4|1467207129.009|PROTOERR|rdkafka#consumer-1| <ip>:9092/2: Protocol parse failure at rd_kafka_fetch_reply_handle:3748 (incorrect broker.version.fallback?)

Any idea what this means?

kafkacat will discard some messages when the message has the char '\n'

Hi,I have used the kafkacat for a long time,but I find a error recently.
I hava some example messages,see below:
0xxxxxxxxxxxx
1xxxx\nxxxxx\n
2xxxx\nxxxxx\n
3xxxx\nxxxxx\n

when I produce the messages with kafkacat and then I consume the messages with kafkacat,I find some messages have been discarded.what's you advice here?Thanks.

Errors installing on debian

Hello - thank you for this awesome bit of code and also for librdkafka. I've set everything up on my macbook without a single issue. However, when attempting to get this running on Linux, I have hit a couple snags. I am currently stuck on this error when I try to run ./bootstrap for the kafkacata install:

Now type 'make' to build
gcc -MD -MP -Itmp-bootstrap/usr/local/include -Itmp-bootstrap/usr/local/include -g -O2 -Wall -Werror -Wfloat-equal -Wpointer-arith -c kafkacat.c -o kafkacat.o
kafkacat.c: In function ‘produce_file’:
kafkacat.c:198:9: error: format ‘%zd’ expects argument of type ‘signed size_t’, but argument 4 has type ‘__off_t’ [-Werror=format]
cc1: all warnings being treated as errors
make: *** [kafkacat.o] Error 1

This is a fresh install of debian on a VM. Please let me know if you have any suggestions for getting past this point.

Thanks again for the awesome software.

Native win32 build

Build with MSVC.
Requires that the oct15 branch of librdkafka is merged to master.

Should provide prebuilt binaries for windows, possibly with installer.

Loadbalancing messages between kafkacat consumers

Hello,

I am attempting to use Kafka in the loadbalancing fashion. As Kafka documentation says:

If all the consumer instances have the same consumer group, then this works just like a traditional queue balancing load over the consumers.

So what I did is:

  • Run Kafka 0.8.2.1 using spotify/kafka (via boot2docker)
  • Once producer
  • Two consumers with same group.id

First I created the topic:

$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic beta

Then subscribe two consumers to it:

kafkacat -C -b $(boot2docker ip):9092 -t beta -X group.id=mygroupid

And produce a message:

date | kafkacat -P -b $(boot2docker ip):9092 -t beta

I expect to only one of the consumers to receive the message, but both do.

On the other hand, if I run the consumers using kafka-console-consumer.sh, the message is only consumed once:

echo "group.id=mygroupid" > /consumer.beta.properties

$KAFKA_HOME/bin/kafka-console-consumer.sh \
  --zookeeper localhost:2181 \
  --topic beta \
  --consumer.config /consumer.beta.properties

What's wrong?

(I have also asked this question on Stackoverflow)

Weird offset file when directory does not yet exist

(This might be a librdkafka issue, I'm not sure.)

When running

kafkacat -C -b mybroker.example.org -t mytopic -e -o stored -c 1000 -X topic.auto.commit.interval.ms=100 -X topic.offset.store.path=/tmp/kafka-offsets

for the first time, if /tmp/kafka-offsets does not yet exist, kafkacat seems to create this as a file and store a single integer in this file. I'm not sure if this is a real partition offset, but kafka brokers log plenty of OffsetOutOfRangeExceptions for the number in this file when kafkacat runs.

Perhaps librdkafka should always create topic.offset.store.path as a directory if it does not yet exist?

kafkacat topic info doesn't match that of kafka-topics

Hiya. I'm running into something where the leader/follower/ISR reported by kafkacat doesn't match that reported by kafka-topics.

Any ideas? Even more confusingly: I'm logged in to broker 1002/kafka900f5013 in two shells right now. For the past hour or so kafkacat was reporting the correct information (partition 0, leader 1002, replicas: 1002, isrs: 1002) in one of the shells, and incorrect information in the other shell (partition 0, leader 1004, replicas: 1004, isrs: 1004). Consistently and repeatedly. It was just while I was typing up this git issue that they all started reporting the same incorrect information as the rest of the nodes.

So Kafkacat thinks the partition is being served by broker 1004:

PRODUCTION ubuntu@bastion-0:~$ dsh -g kafka-production -c -M -- "kafkacat -L -b localhost:9092 -t hound-prod.retriever.mutation"
kafka-1bc7c4e2: Metadata for hound-prod.retriever.mutation (from broker -1: localhost:9092/bootstrap):
kafka-1bc7c4e2:  4 brokers:
kafka-1bc7c4e2:   broker 1003 at ip-10-0-246-113.ec2.internal:9092
kafka-1bc7c4e2:   broker 1004 at ip-10-0-190-45.ec2.internal:9092
kafka-1bc7c4e2:   broker 1001 at ip-10-0-148-52.ec2.internal:9092
kafka-1bc7c4e2:   broker 1002 at ip-10-0-207-186.ec2.internal:9092
kafka-1bc7c4e2:  1 topics:
kafka-1bc7c4e2:   topic "hound-prod.retriever.mutation" with 1 partitions:
kafka-1bc7c4e2:     partition 0, leader 1004, replicas: 1004, isrs: 1004
kafka-d4b7674e: Metadata for hound-prod.retriever.mutation (from broker -1: localhost:9092/bootstrap):
kafka-d4b7674e:  4 brokers:
kafka-d4b7674e:   broker 1003 at ip-10-0-246-113.ec2.internal:9092
kafka-d4b7674e:   broker 1004 at ip-10-0-190-45.ec2.internal:9092
kafka-d4b7674e:   broker 1001 at ip-10-0-148-52.ec2.internal:9092
kafka-d4b7674e:   broker 1002 at ip-10-0-207-186.ec2.internal:9092
kafka-d4b7674e:  1 topics:
kafka-d4b7674e:   topic "hound-prod.retriever.mutation" with 1 partitions:
kafka-d4b7674e:     partition 0, leader 1004, replicas: 1004, isrs: 1004
kafka-900f5013: Metadata for hound-prod.retriever.mutation (from broker -1: localhost:9092/bootstrap):
kafka-900f5013:  4 brokers:
kafka-900f5013:   broker 1003 at ip-10-0-246-113.ec2.internal:9092
kafka-900f5013:   broker 1004 at ip-10-0-190-45.ec2.internal:9092
kafka-900f5013:   broker 1001 at ip-10-0-148-52.ec2.internal:9092
kafka-900f5013:   broker 1002 at ip-10-0-207-186.ec2.internal:9092
kafka-900f5013:  1 topics:
kafka-900f5013:   topic "hound-prod.retriever.mutation" with 1 partitions:
kafka-900f5013:     partition 0, leader 1004, replicas: 1004, isrs: 1004
kafka-88e6bc0c: Metadata for hound-prod.retriever.mutation (from broker -1: localhost:9092/bootstrap):
kafka-88e6bc0c:  4 brokers:
kafka-88e6bc0c:   broker 1003 at ip-10-0-246-113.ec2.internal:9092
kafka-88e6bc0c:   broker 1004 at ip-10-0-190-45.ec2.internal:9092
kafka-88e6bc0c:   broker 1001 at ip-10-0-148-52.ec2.internal:9092
kafka-88e6bc0c:   broker 1002 at ip-10-0-207-186.ec2.internal:9092
kafka-88e6bc0c:  1 topics:
kafka-88e6bc0c:   topic "hound-prod.retriever.mutation" with 1 partitions:
kafka-88e6bc0c:     partition 0, leader 1004, replicas: 1004, isrs: 1004

And kafka-topics thinks the partition is being served by broker 1002.

```PRODUCTION ubuntu@bastion-0:~$ dsh -g kafka-production -c -M -- "kafka-topics --zookeeper zk1:2181,zk2:2181,zk3:2181/kafka --topic hound-prod.retriever-mutation --describe"
kafka-d4b7674e: Topic:hound-prod.retriever-mutation     PartitionCount:1        ReplicationFactor:1     Configs:
kafka-d4b7674e:         Topic: hound-prod.retriever-mutation    Partition: 0    Leader: 1002    Replicas: 1002  Isr: 1002
kafka-1bc7c4e2: Topic:hound-prod.retriever-mutation     PartitionCount:1        ReplicationFactor:1     Configs:
kafka-1bc7c4e2:         Topic: hound-prod.retriever-mutation    Partition: 0    Leader: 1002    Replicas: 1002  Isr: 1002
kafka-88e6bc0c: Topic:hound-prod.retriever-mutation     PartitionCount:1        ReplicationFactor:1     Configs:
kafka-88e6bc0c:         Topic: hound-prod.retriever-mutation    Partition: 0    Leader: 1002    Replicas: 1002  Isr: 1002
kafka-900f5013: Topic:hound-prod.retriever-mutation     PartitionCount:1        ReplicationFactor:1     Configs:
kafka-900f5013:         Topic: hound-prod.retriever-mutation    Partition: 0    Leader: 1002    Replicas: 1002  Isr: 1002
PRODUCTION ubuntu@bastion-0:~$ 

Any ideas?

Just looking at the data sizes on disk, it's definitely writing to broker 1002, not 1004.

kafkacat -c -NUM semantics edge case

Here is the use-case. A topic with say 150 messages.

$kafkacat -b broker -C -q -o -1000 -e -c 1000 -t topic | wc -l
0

$kafkacat -b broker -C -q -o -100 -e -c 1000 -t topic | wc -l
100

What this shows is that if the negative offset is larger than available it doesnt return anything. Semantically I believe this should just return from the beginning then.

mktemp usage not backward compatible with older Linux distributions

Trying to run configure on Ubuntu 10.04 results in errors such as:

checking for PIC (by compile)...mktemp: too few X's in template `_mkltmpXXXX.c'

On this platform (and many older Linux distributions), mktemp(1) requires the template "X" characters to be at the end of the filename.

stdout buffer causes messages not to be delivered on arrival

By default, stdout is buffered. If writing out to a terminal, it will flush after each newline. However, if the stdout is redirected, it won't flush on each newline. It can be flushed manually or the buffer can be disabled.

http://stackoverflow.com/questions/1716296/why-does-printf-not-flush-after-the-call-unless-a-newline-is-in-the-format-strin

Something like this could fix the problem:

int main (int argc, char **argv) {
        setbuf(stdout, NULL);

Or the buffer can be flushed manually after each write.

I made this an issue rather than a PR because I'm not sure of the best way to handle it.

Settings to stream without buffering

I would like to use kafkacat to stream from a topic without buffering but I can't figure out the right settings to make that happen. To illustrate using kafka_2.8.0-0.8.0:

ZOOKEEPER=192.168.33.10:2181  # whatever it may be
BROKERS=192.168.33.10:6667    # whatever it may be

# create the topic
kafka-create-topic.sh --zookeeper $ZOOKEEPER --replica 1 --partition 1 --topic example

# create a stream
ruby -e "STDOUT.sync=true; 10000.times {|i| puts i; sleep 0.2}" |
kafka-console-producer.sh --broker-list $BROKERS --sync --topic example

If you consume the stream like this you'll see output tick along smoothly with no apparent delay.

kafka-console-consumer.sh --zookeeper $ZOOKEEPER --topic example

If you consume the stream with kafkacat in these variations the outputs jerk forward then pause for a second, then jerk forward again. All the output is there but it isn't streaming smoothly.

./kafkacat -t example -b $BROKERS
./kafkacat -t example -b $BROKERS -u

These variations make the outputs tick along in regular 1s intervals, but again the output isn't smooth. There appears to be some kind of fetch delay.

./kafkacat -t example -b $BROKERS -u -X fetch.wa.max.ms=1
./kafkacat -t example -b $BROKERS -u -X fetch.wait.max.ms=1 -X fetch.error.backoff.ms=1

I've just been kinda guessing at options with no success. Is there a way to get kafkacat to smoothly stream the topic a-la kafka-console-consumer.sh?

Is -J option supported?

(version 1.2.0) when I run:
$ kafkacat -b mybroker -t syslog -J

It gives me:
kafkacat: illegal option -- J

bootstrap.sh not working on OS X El Capitan (10.11.4)

I ran the most recent bootstrap.sh. It errored out with:

Build of libyajl SUCCEEDED!
Building kafkacat
Using -L/usr/local/lib  -lpthread -lz -lcrypto -lssl -lsasl2 for rdkafka
Package yajl was not found in the pkg-config search path.
Perhaps you should add the directory containing `yajl.pc'
to the PKG_CONFIG_PATH environment variable
No package 'yajl' found
grep: tmp-bootstrap/usr/local/lib/pkgconfig/yajl.pc: No such file or directory
Using  for yajl
./bootstrap.sh: line 97: ./configure: No such file or directory

kafkacat showing different ISR list vs other tools

For one of my topics, kafkacat is showing a different list of isrs than both the kafka-topics.sh script and kafkat (the AirBnb tool). I'm wondering if there is a bug in kafkacat.

See below, partition 1 shows isrs = 4 when using kafkacat, but isrs = 4,2,3 when using the others.

jcheng:~ jcheng$ docker run --rm -it kafkacat kafkacat -L -b 10.100.222.65 -t engrcloud.core.kafka.install-test
Metadata for engrcloud.core.kafka.install-test (from broker -1: 10.100.222.65:9092/bootstrap):
 5 brokers:
  broker 5 at 10.100.222.69:9092
  broker 1 at 10.100.222.65:9092
  broker 2 at 10.100.222.66:9092
  broker 3 at 10.100.222.67:9092
  broker 4 at 10.100.222.68:9092
 1 topics:
  topic "engrcloud.core.kafka.install-test" with 3 partitions:
    partition 2, leader 5, replicas: 5,3,4, isrs: 4,5,3
    partition 1, leader 4, replicas: 4,2,3, isrs: 4
    partition 0, leader 3, replicas: 3,1,2, isrs: 1,2,3

vs

jcheng:~ jcheng$ docker run --rm -it wurstmeister/kafka /opt/kafka_2.10-0.8.2.1/bin/kafka-topics.sh --zookeeper=10.100.222.69:2181 --describe --topic engrcloud.core.kafka.install-test
Topic:engrcloud.core.kafka.install-test PartitionCount:3    ReplicationFactor:3 Configs:
    Topic: engrcloud.core.kafka.install-test    Partition: 0    Leader: 3   Replicas: 3,1,2 Isr: 1,2,3
    Topic: engrcloud.core.kafka.install-test    Partition: 1    Leader: 4   Replicas: 4,2,3 Isr: 4,2,3
    Topic: engrcloud.core.kafka.install-test    Partition: 2    Leader: 5   Replicas: 5,3,4 Isr: 4,5,3

vs

jcheng:~ jcheng$ docker run -it --rm kafkat sh -c 'echo { \"zk_path\": \"10.100.222.65:2181\" } > /etc/kafkatcfg; kafkat partitions engrcloud.core.kafka.install-test'
Topic       Partition   Leader      Replicas                        ISRs
engrcloud.core.kafka.install-test0      3       [3, 1, 2]                       [1, 2, 3]
engrcloud.core.kafka.install-test1      4       [4, 2, 3]                       [4, 2, 3]
engrcloud.core.kafka.install-test2      5       [5, 3, 4]                       [4, 5, 3]

Is there any debugging that I can supply?

New release ?

Kafka new consumer support would be great, but the released version is only 1.2.0
Is it possible to release a beta or a release candidate ?

kafkacat from brew on OSX works... but displays errors

This is probably a brew issue somehow, but when I run

kafkacat -L -b 10.223.25.68:9092

Against my 5 broker cluster, I get

Metadata for all topics (from broker -1: 10.223.25.68:9092/bootstrap):
 5 brokers:
  broker 5 at 10.223.25.69:9092
  broker 1 at 10.223.25.65:9092
  broker 2 at 10.223.25.66:9092
  broker 3 at 10.223.25.67:9092
  broker 4 at 10.223.25.68:9092
 58 topics:

... much topic metadata elided ...

%3|1445369634.430|FAIL|rdkafka#producer-0| 10.223.25.69:9092/5: Failed to connect to broker at 10.223.25.69:9092: Undefined error: 0
%3|1445369634.430|FAIL|rdkafka#producer-0| 10.223.25.68:9092/4: Failed to connect to broker at 10.223.25.68:9092: Undefined error: 0
%3|1445369634.430|ERROR|rdkafka#producer-0| 10.223.25.69:9092/5: Failed to connect to broker at 10.223.25.69:9092: Undefined error: 0
%3|1445369634.430|ERROR|rdkafka#producer-0| 10.223.25.68:9092/4: Failed to connect to broker at 10.223.25.68:9092: Undefined error: 0
%3|1445369634.430|FAIL|rdkafka#producer-0| 10.223.25.67:9092/3: Failed to connect to broker at 10.223.25.67:9092: Undefined error: 0
%3|1445369634.430|FAIL|rdkafka#producer-0| 10.223.25.66:9092/2: Failed to connect to broker at 10.223.25.66:9092: Undefined error: 0
%3|1445369634.430|ERROR|rdkafka#producer-0| 10.223.25.67:9092/3: Failed to connect to broker at 10.223.25.67:9092: Undefined error: 0
%3|1445369634.430|ERROR|rdkafka#producer-0| 10.223.25.66:9092/2: Failed to connect to broker at 10.223.25.66:9092: Undefined error: 0
%3|1445369634.430|FAIL|rdkafka#producer-0| 10.223.25.65:9092/1: Failed to connect to broker at 10.223.25.65:9092: Undefined error: 0
%3|1445369634.430|ERROR|rdkafka#producer-0| 10.223.25.65:9092/1: Failed to connect to broker at 10.223.25.65:9092: Undefined error: 0

Those 'Undefined errors' at the end are weird. It's clearly contacting the brokers, as it's getting the metadata. When I run kafkacat on ubuntu against the same broker. Everything is printed out identically, except the errors aren't printed on the end.

Persistent metadata cache

From librdkafka configuration guide:

The metadata is automatically refreshed on error and connect.

On each kafkacat run, librdkafka loads the Kafka cluster metadata. This has a significant inbound traffic impact in situations where kafkacat is triggered frequently on a cluster with many highly partitioned topics.

Is there a way to cache the metadata in between kafkacat runs so that each kafkacat run doesn't always load a significant amount of metadata on start?

In producer mode, it sends the messages but throws errors while starting up

Though app writes messages to Kafka, it does throw the following erorr messages at startup. Not sure what exactly is the issue but any inputs will help.

%3|1418300191.571|FAIL|rdkafka#producer-0| localhost:9092/bootstrap: Failed to connect to broker at [localhost]:9092: Connection refused
%3|1418300191.571|ERROR|rdkafka#producer-0| localhost:9092/bootstrap: Failed to connect to broker at [localhost]:9092: Connection refused
%3|1418300191.571|ERROR|rdkafka#producer-0| 1/1 brokers are down

On OS X, bootstrap.sh doesn't yield a statically linked binary

On OS X 10.10.2, bootstrap.sh doesn't yield a statically linked binary, because libyajl's static library does not match the expected naming conventions.

In bootstrap.sh:
export STATIC_LIB_yajl="tmp-bootstrap/usr/local/lib/libyajl.a"

However, in the filesystem:

$ ls -1 tmp-bootstrap/usr/local/lib/libyajl*
tmp-bootstrap/usr/local/lib/libyajl.2.1.1.dylib
tmp-bootstrap/usr/local/lib/libyajl.2.dylib
tmp-bootstrap/usr/local/lib/libyajl.dylib
tmp-bootstrap/usr/local/lib/libyajl_s.a

Notice that the libyajl static library is libyajl_s.a, not libyajl.a.

After compilation, you can see that librdkafka was statically linked in, but not libyajl.

$ otool -L kafkacat
kafkacat:
    /usr/lib/libSystem.B.dylib (compatibility version 1.0.0, current version 1213.0.0)
    /usr/lib/libz.1.dylib (compatibility version 1.0.0, current version 1.2.5)
    /usr/local/lib/libyajl.2.dylib (compatibility version 2.0.0, current version 2.1.1)

I worked around it by manually running gcc, and replacing "-lyajl" with "tmp-bootstrap/usr/local/lib/libyajl_s.a"

$ gcc -Itmp-bootstrap/usr/local/include -g -O2 -Wall -Werror -Wfloat-equal -Wpointer-arith -Itmp-bootstrap/usr/local/include -g -O2 -Wall -Werror -Wfloat-equal -Wpointer-arith -Ltmp-bootstrap/usr/local/lib -Ltmp-bootstrap/usr/local/lib kafkacat.o format.o json.o -o kafkacat tmp-bootstrap/usr/local/lib/librdkafka.a -lpthread -lz tmp-bootstrap/usr/local/lib/libyajl_s.a
$ otool -L kafkacat
kafkacat:
    /usr/lib/libSystem.B.dylib (compatibility version 1.0.0, current version 1213.0.0)
    /usr/lib/libz.1.dylib (compatibility version 1.0.0, current version 1.2.5)
$

Print partition id of the message

Would be great to have the ability to see the partition id of the messages, similar to offsets printed with -O flag.

Better yet, since offsets belong to partitions it would make sense to prepend them with partition id, e.g. 0:12345

Offset flag ignored

It looks like -o flag applies only up to kafka storage blocks, or some other caching is going on, e.g.

$ for i in {01..05}; do echo -n "$i " ; sh -c "~/kafkacat/kafkacat -C -b localhost:9092 -t topic -K\' -o 98564$i -p 0 -c 3 -u | md5sum" ; done
01 d1e6071308960d3d04c9ff911ef32516  -
02 d1e6071308960d3d04c9ff911ef32516  -

$ for i in {01..05}; do echo -n "$i " ; sh -c "~/kafkacat/kafkacat -C -b localhost:9092 -t topic -K\' -o 98574$i -p 0 -c 3 -u | md5sum" ; done
01 372ec3fd9d054638ff817f73a4c50007  -
02 372ec3fd9d054638ff817f73a4c50007  -

Messages were loaded in kafka in batches with snappy compression, latest kafkacat used.

Losing messages on failover test

I have 2 kafka brokers running in my test environment. And I am running a kafkacat as producer and another one as consumer. The test topic is setup to have replication-factor of 2.

In my test, I killed one of the broker while cating multiple files into the kafkacat producer. Sometimes, the consumer got all the messages, but it missed 5-10% of the messages.

Is there any producer specific parameters that I missed? I am hoping at least kafkacat returns an exit code of non-zero when messages were lost.

I think the problem probably is in librdkafka. When there is missing events, I dont even see dr_msg_cb in kafkacat.c being called. So feel like librdkafka did not notify kafkacat about the issue.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.