Giter Site home page Giter Site logo

deviceinsight / kafkactl Goto Github PK

View Code? Open in Web Editor NEW
785.0 785.0 75.0 2.01 MB

Command Line Tool for managing Apache Kafka

Home Page: https://deviceinsight.github.io/kafkactl/

License: Apache License 2.0

Go 89.42% Makefile 0.45% Shell 10.13%
apache-kafka avro cli fish golang kafka zsh

kafkactl's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafkactl's Issues

Avro support

Would be nice if this supported avro schemas as well

Kafkactl hangs on Deserialize error and never exits.

Problem

When I run kafkactl consume <topic-name> with schema registry enabled, kafkactl fails to parse the avro message (I don't know why, I am still debugging that), but when I hit ctrl+c, the process just hangs.

Breakdown

Based on what I have been able to debug, the program is getting hung up on this line https://github.com/deviceinsight/kafkactl/blob/master/operations/consumer/consumer-operation.go#L164. Because the deserializer is no longer consuming messages, that loop is waiting to post a message to the buffer, and so the closing channel signal here is never read. This means the function never returns so the defer is never called and we never get past the consumer wait.

When I fixed that problem using the following code:

select {
  case messages <- message: // Put message in the channel unless it is full
  default:
	  output.Debugf("Message Buffer is full. Discarding value.")
	  pc.AsyncClose()
	  break messageChannelRead
}					

we get caught at the next wait statement, which means that this defer is never being called because the error channel is full. My solution there was to increase the size of the error channel here

errChannel        = make(chan error, 100)

Once I resolved that the program now exits and I see my error in the console. I feel like what I have done is a workaround to the problem. Just dropping messages on the floor and exiting seems like a bad route to take (see first code snippet). Additionally, refering to https://stackoverflow.com/a/25657232, if we used the len(chan) function, you could get into nasty race conditions.

Error when using --config-file option with kubernetes support enabled

Kubernetes support does not work with --config-file option. All kafkactl commands fails to execute on the k8s cluster.
This is always reproducible.

An example of error log below :

[kafkactl] 2021/08/05 13:45:32 Using config file: /projets/test/.kube/kafka.yaml
[kafkactl] 2021/08/05 13:45:32 Assuming kafkaVersion: 2.0.0
[kafkactl] 2021/08/05 13:45:32 kubectl version: 1.20.4
[kafkactl] 2021/08/05 13:45:32 exec: kubectl run --rm -i --tty --restart=Never kafkactl-bnd9qanckp --image deviceinsight/kafkactl:v1.17.2-scratch --kubeconfig /projets/test/.kube/config_test --context test-cluster --namespace test-dfr --env BROKERS=kafka:9092 --env KAFKAVERSION=2.0.0 --command -- /kafkactl get topics --config-file=/projets/test/.kube/kafka.yaml --verbose=true
[kafkactl] 2021/08/05 11:45:37 generated default config at /config/kafkactl/config.yml
Error reading config file: /projets/test/.kube/kafka.yaml (open /projets/test/.kube/kafka.yaml: no such file or directory)
pod test-dfr-117411-build-test-backend-charts/kafkactl-bnd9qanckp terminated (Error)
command "/projets/test/tools/kubectl/kubectl" exited with non-zero status:

PATH:
  /projets/test/tools/kubectl/kubectl

ARGS:
  0: kubectl (7 bytes)
  1: run (3 bytes)
  2: --rm (4 bytes)
  3: -i (2 bytes)
  4: --tty (5 bytes)
  5: --restart=Never (15 bytes)
  6: kafkactl-bnd9qanckp (19 bytes)
  7: --image (7 bytes)
  8: deviceinsight/kafkactl:v1.17.2-scratch (38 bytes)
  9: --kubeconfig (12 bytes)
  10: /projets/test/.kube/config_test (37 bytes)
  11: --context (9 bytes)
  12: test-cluster (12 bytes)
  13: --namespace (11 bytes)
  14: test-dfr-117411-build-test-backend-charts (41 bytes)
  15: --env (5 bytes)
  16: BROKERS=kafka:9092 (18 bytes)
  17: --env (5 bytes)
  18: KAFKAVERSION=2.0.0 (18 bytes)
  19: --command (9 bytes)
  20: -- (2 bytes)
  21: /kafkactl (9 bytes)
  22: get (3 bytes)
  23: topics (6 bytes)
  24: --config-file=/projets/test/.kube/kafka.yaml (47 bytes)
  25: --verbose=true (14 bytes)

ERROR:
  exit status 1

EXIT STATUS
  1

The --config-file option is forwarded to the kafkactl executed in the pod. kafkactl can't find the file since the path provided in the --config-file points to a file on the local file system. This file does not exists in the pod as the config file is always generated in /config/kafkactl/config.yml.

The --config-file must not be forwarded to the kafkactl executed in the pod.

JSON Pretty Print

It would be great to have additional flag for the kafkactl consume to display pretty printed JSON.

Raw JSON:

{"type":"UserStatusChanged","id":"e886eb90-4003-4b1f-9971-8f4bdb23dc89","timestamp":1588167444,"data":{"user":{"id":331,"status":"approved"},"from_status":"created","to_status":"approved","changed_at":"2020-04-29T13:37:18+00:00"}}

Pretty printed JSON:

{
  "type": "UserStatusChanged",
  "id": "e886eb90-4003-4b1f-9971-8f4bdb23dc89",
  "timestamp": 1588167444,
  "data": {
    "user": {
      "id": 331,
      "status": "approved"
    },
    "from_status": "created",
    "to_status": "approved",
    "changed_at": "2020-04-29T13:37:18+00:00"
  }
}

It might be useful on development phase for debugging the messages.

Add a parameter `--tail` to consume command

for development purposes it would be useful to have an option to get the latest n messages from a topic.
This would allow to simply check the format of the messages on the topic or to see if a message has been produced in the last minutes.

In order to find out which messages are the latest n we will have to read the latest n messages for each partition and afterwards sort/filter them based on the timestamp in the header.

Add headers in producer

Hi,

Is it possible to add headers when producing messages with kafakctl produce topic xxx

Thanks !

kafkactl reset should error when a group is 'stable'

Comparison between kafkactl and kafka-consumer-groups.sh when resetting consumer-group offsets of a 'stable' group:

kafkactl reset consumer-group-offset group -t topic --newest -e
# Returns success message/table with new reset offsets
# If you verify the consumer-group offsets you will see that nothing has actually happened

Doing the same operation with the default Kafka console tool shows us an error:

kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group group --reset-offsets --dry-run --topic topic --to-latest

Error: Assignments can only be reset if the group 'group' is inactive, but the current state is Stable.

kafkactl should do something similar. Currently it just acts as if the operation was successful.

Kafkactl doesn't work in current version with kubernetes

I tried to run kafkactl in version 1.17.1 with a kubernetes backed config.

Unfortunately that fail due to an imagePull issue. Checking the implementation it looks like it is trying to download 1.17.1-scratch, but on dockerhub only v1.17.1-scratch is available.

Delete a consumer group

kafkactl doesn't seems able to delete a consumer group (ex. kafka-consumer-group --delete --group my_group). Adding kafkactl delete consumer-group would be very useful as deleting a consumer group is a common operation (ex. application decommissioning).

Add global config file

Hi,

I want to install kafkactl on all Kafka cluster nodes. It would be nice to be able to install a configuration for all users, so they would not have to create a config file each (it would likely be identical for all users).

I will provide a PR.

Kind regards,
Stefan

Using SSL keystore and Truststore

Thanks for the awesome tool.

How would I go about passing in a configuration like this?

example.properties file

security.protocol=SSL
ssl.truststore.location=
ssl.truststore.password=
ssl.keystore.location=
ssl.keystore.password=
ssl.key.password=

This is an example properties file that I would normally pass into a kafka command like this

kafka-topics --bootstrap-server broker1  \
--command-config /path/to/example.properties \
--topic some-topic \
--describe

README.md is outdated?

$ kafkactl alter topic internal-other --replication-factor 2
Could not reassign partition replicas for topic 'internal-other': failed to reassign partitions for topic: 
kafka server: The version of API is not supported.

kafka 2.7.0
kafkactl 1.17.1

config.yml

contexts:
  kafka-test:
    brokers:
    - kafka-test-01.wz.local:9092
    - kafka-test-02.wz.local:9092
    - kafka-test-02.wz.local:9092
  kafka-prod:
    brokers:
    - kafka-01.wz.local:9092
    - kafka-02.wz.local:9092
    - kafka-03.wz.local:9092

current-context: kafka-prod

Support for Apple Silicon

As Go is now fully supported on Apple Silicon - any chance of getting a native build of kafkactl as well?

Preferably distributed through Homebrew, thanks!

Add parameter --rate to produce command

add a parameter --rate which controls the rate with which messages are produced.
this is useful to limit the number of produced messages when inserting messages on a production environment.
--rate should be an integer that represents the number of messages that should be produced per second.

Snap can't load config

The Snap version is basically unusable, because it can't read its configuration file.

kafkactl attempts to read configs from:

  • /etc/kafkactl/config.yml
    but Snap doesn't allow access outside the home directory of the user

  • ~/.kafkactl/config.yml
    but Snap doesn't allow access to hidden files inside the home directory either

I'd suggest adding another searched config path (e.g. ~/kafkactl/config.yml). It needs to be in the users home directory and it must not be a hidden file.

P.S. Why have a directory at all? Why isn't it ~/kafkactl.yml and /etc/kafkactl.yml?
P.P.S. I don't exactly get how Viper works โ€“ does it load only the first found? In that case you should probably reverse the order so a user-specific config has precedence over a global one. However, if all files are loaded, then the order is correct, as latter settings override former ones.

Custom CA cert in 1.8

I'm having the opposite issue in 1.8.0 to the one addressed by #41.

I have a config file with two contexts, one using AWS's MSK, with the CA cert omitted, and the other one using a local cluster with TLS enabled, which uses a custom CA cert:

contexts:

  emma-client-1:
    brokers:
    - b1.kafka.emma:9091
    - b2.kafka.emma:9092

    tls:
      enabled: true
      # insecure: true
      ca: /home/ateijelo/.config/kafka/emma-caroot.pem
      cert: /home/ateijelo/.config/kafka/emma-client-1-cert.pem
      certkey: /home/ateijelo/.config/kafka/emma-client-1-key.pem

    #tlsCA: /home/ateijelo/.config/kafka/emma-caroot.pem
    #tlsCert: /home/ateijelo/.config/kafka/emma-client-1-cert.pem
    #tlsCertKey: /home/ateijelo/.config/kafka/emma-client-1-key.pem

  msk-client-1:
    brokers:
    - b-1....my-msk-cluster....amazonaws.com:9094
    - b-2....my-msk-cluster....amazonaws.com:9094
    tls:
      enabled: true
      cert: /home/ateijelo/.config/kafka/msk-client-1-cert.pem
      certKey: /home/ateijelo/.config/kafka/msk-client-1-key.pem

Using 1.7.0 with the deprecated tlsCA, tlsCert, tlsCertKey works fine with both MSK and my local cluster.
Using 1.8.0 with the new tls block works fine with MSK, but not with the local cluster, I get:

failed to create cluster admin: kafka: client has run out of available brokers 
to talk to (Is your cluster reachable?)

Allow setting TLS config on commandline

It would be very handy if one could set some of the TLS parameters using the command-line (Such as enabled: true and insecure: false). That would allow one-off invocations of kafkactl against brokers that only speak TLS (Such as Amazon MSK).

It is currently not possible, forcing one to create a config for one-off invocations.

Proposal: Extend kafkactl to allow take a flag indicating TLS and a flag indicating that certificate should be ignored. Example: kafkactl -B somebroker:9094 --tls --insecure get topics --lag

Compile released binary with CGO=0

Hi,

when I try to execute the provided binary on CentOS 7, I get

kafkactl: error while loading shared libraries: libgo.so.13: cannot open shared object file: No such file or directory

I think it would be convenient if the provided binaries were compiled using CGO=0 to get around issues like that.

Regards,
Stefan

Can't run delete commands in kubernetes context

All kafkactl commands seems to run properly when accessing kafka through kubernetes except delete commands.
For those commands the kubernetes: section of the configuration file seemed to be ignored.
For example, i can create a topic, but not delete it

$ kafkactl create topic test-delete
topic created: test-delete
$ kafkactl delete topic test-delete
failed to create cluster admin: kafka: client has run out of available brokers to talk to (Is your cluster reachable

If i activate the -V (verbose) option, the log clearly indicates that kafkactl delete tries to connect to a local cluster instead of using kubectl to launch the command on the remote cluster.

Looking at the code, for all commands in package cmd, there is call to if !(&k8s.K8sOperation{}).TryRun. But this call is missing in :

  • delete-acl.go
  • delete-consumer-group.go
  • delete-topic.go

Dynamic Completion not working when installed via snap

When kafkactl is installed via snap bash auto-completion is packaged inside the snap and should work out of the box.
Static autocompletion is working but dynamic auto-completion not.

Type e.g.:

kafkactl config use-context <TAB> <TAB>

Debugging the auto-completion shows that it should be working:

kafkactl __complete config use-context ""
default
:4
Completion ended with directive: ShellCompDirectiveNoFileComp

It probably has something to do with prefixing of the binary by snap.

Bad performance when using AVRO schema registry

I have noticed that consuming topics which are AVRO encoded (and with a schema registry configured) yields significantly worse performance than consuming "plain" topics. I initially suspected a network problem but found that kafkactl issues a lot of calls to the schema-registrys /subjects endpoint (see screenshot). Looking through the code I saw that the response is already supposed to be cached - so I'm not yet sure why this is happening ๐Ÿค”

Screenshot from 2021-07-23 15-09-40

Version info:

$ kafkactl version
cmd.info{version:"v1.18.1", buildTime:"2021-07-15T06:49:01Z", gitCommit:"2a72db3", goVersion:"go1.16.5", compiler:"gc", platform:"linux/386"

Insecure TLS Config?

Hi, and thank you for a great tool for helping manage kafka!

We have a setup where kafka runs with TLS setup. The certificates that the brokers use are signed by our own CA, but we have to connect by IP so the hostname validation will always fail.

I made a small modification to kafkactl to be able to skip certificate validation entirely, and a slightly larger one to be able to only verify the certificate, but ignore Hostname Verification/SAN verification.

Is this something you would be interested in adding? I think it would be good to support the second variation with only CA signature verification, but I am not sure how to express this in the configuration, where as the first option is much simpler configuration wise.

Note that kafka brokers also implement this second scheme, see configuration parameter ssl.endpoint.identification.algorithm from broker configuration.

What do you think?

Thanks again for a great tool!

Partial SASL Mechanism support

Hi!
This part works almost nice:

[root@sks04ap469 kafkactl]# kafkactl -V describe topic asdf
[kafkactl] 2021/03/26 19:26:59 Using config file: /root/.config/kafkactl/config.yml
[kafkactl] 2021/03/26 19:26:59 Assuming kafkaVersion: 2.0.0
[kafkactl] 2021/03/26 19:26:59 Assuming kafkaVersion: 2.0.0
[kafkactl] 2021/03/26 19:26:59 SASL is enabled (username = admin).
[sarama  ] 2021/03/26 19:26:59 Initializing new client
[sarama  ] 2021/03/26 19:26:59 client/metadata fetching metadata for all topics from broker sks04ap462:9092
[sarama  ] 2021/03/26 19:26:59 Successful SASL handshake. Available mechanisms: [SCRAM-SHA-256]
[sarama  ] 2021/03/26 19:26:59 SASL authentication succeeded
[sarama  ] 2021/03/26 19:26:59 Connected to broker at sks04ap462:9092 (unregistered)
[sarama  ] 2021/03/26 19:26:59 client/brokers registered new broker #5 at 10.42.122.91:9092
[sarama  ] 2021/03/26 19:26:59 client/brokers registered new broker #1 at 10.26.102.86:9092
[sarama  ] 2021/03/26 19:26:59 client/brokers registered new broker #4 at 10.26.102.89:9092
[sarama  ] 2021/03/26 19:26:59 client/brokers registered new broker #6 at 10.42.122.92:9092
[sarama  ] 2021/03/26 19:26:59 client/brokers registered new broker #7 at 10.42.122.93:9092
[sarama  ] 2021/03/26 19:26:59 client/brokers registered new broker #2 at 10.26.102.87:9092
[sarama  ] 2021/03/26 19:26:59 client/brokers registered new broker #3 at 10.26.102.88:9092
[sarama  ] 2021/03/26 19:26:59 Successfully initialized new client
topic 'asdf' does not exist

And this isn't:

[root@sks04ap469 kafkactl]# kafkactl -V get topics
[kafkactl] 2021/03/26 19:27:16 Using config file: /root/.config/kafkactl/config.yml
[kafkactl] 2021/03/26 19:27:16 Assuming kafkaVersion: 2.0.0
[kafkactl] 2021/03/26 19:27:16 Assuming kafkaVersion: 2.0.0
[kafkactl] 2021/03/26 19:27:16 using default admin request timeout: 3s
[kafkactl] 2021/03/26 19:27:16 SASL is enabled (username = admin).
[sarama  ] 2021/03/26 19:27:16 Initializing new client
[sarama  ] 2021/03/26 19:27:16 client/metadata fetching metadata for all topics from broker sks04ap462:9092
[sarama  ] 2021/03/26 19:27:16 Invalid SASL Mechanism : kafka server: The broker does not support the requested SASL mechanism.
[sarama  ] 2021/03/26 19:27:16 Error while performing SASL handshake sks04ap462:9092
[sarama  ] 2021/03/26 19:27:16 Closed connection to broker sks04ap462:9092
[sarama  ] 2021/03/26 19:27:16 client/metadata got error from broker -1 while fetching metadata: kafka server: The broker does not support the requested SASL mechanism.
[sarama  ] 2021/03/26 19:27:16 client/metadata no available broker to send metadata request to
[sarama  ] 2021/03/26 19:27:16 client/brokers resurrecting 1 dead seed brokers
[sarama  ] 2021/03/26 19:27:16 client/metadata retrying after 250ms... (3 attempts remaining)
[sarama  ] 2021/03/26 19:27:16 client/metadata fetching metadata for all topics from broker sks04ap462:9092
[sarama  ] 2021/03/26 19:27:17 Invalid SASL Mechanism : kafka server: The broker does not support the requested SASL mechanism.
[sarama  ] 2021/03/26 19:27:17 Error while performing SASL handshake sks04ap462:9092
[sarama  ] 2021/03/26 19:27:17 Closed connection to broker sks04ap462:9092
[sarama  ] 2021/03/26 19:27:17 client/metadata got error from broker -1 while fetching metadata: kafka server: The broker does not support the requested SASL mechanism.
[sarama  ] 2021/03/26 19:27:17 client/metadata no available broker to send metadata request to
[sarama  ] 2021/03/26 19:27:17 client/brokers resurrecting 1 dead seed brokers
[sarama  ] 2021/03/26 19:27:17 client/metadata retrying after 250ms... (2 attempts remaining)
[sarama  ] 2021/03/26 19:27:17 client/metadata fetching metadata for all topics from broker sks04ap462:9092
[sarama  ] 2021/03/26 19:27:17 Invalid SASL Mechanism : kafka server: The broker does not support the requested SASL mechanism.
[sarama  ] 2021/03/26 19:27:17 Error while performing SASL handshake sks04ap462:9092
[sarama  ] 2021/03/26 19:27:17 Closed connection to broker sks04ap462:9092
[sarama  ] 2021/03/26 19:27:17 client/metadata got error from broker -1 while fetching metadata: kafka server: The broker does not support the requested SASL mechanism.
[sarama  ] 2021/03/26 19:27:17 client/metadata no available broker to send metadata request to
[sarama  ] 2021/03/26 19:27:17 client/brokers resurrecting 1 dead seed brokers
[sarama  ] 2021/03/26 19:27:17 client/metadata retrying after 250ms... (1 attempts remaining)
[sarama  ] 2021/03/26 19:27:17 client/metadata fetching metadata for all topics from broker sks04ap462:9092
[sarama  ] 2021/03/26 19:27:18 Invalid SASL Mechanism : kafka server: The broker does not support the requested SASL mechanism.
[sarama  ] 2021/03/26 19:27:18 Error while performing SASL handshake sks04ap462:9092
[sarama  ] 2021/03/26 19:27:18 Closed connection to broker sks04ap462:9092
[sarama  ] 2021/03/26 19:27:18 client/metadata got error from broker -1 while fetching metadata: kafka server: The broker does not support the requested SASL mechanism.
[sarama  ] 2021/03/26 19:27:18 client/metadata no available broker to send metadata request to
[sarama  ] 2021/03/26 19:27:18 client/brokers resurrecting 1 dead seed brokers
[sarama  ] 2021/03/26 19:27:18 Closing Client
failed to create cluster admin: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)

May be problem is here (missing mechanism):

func CreateClusterAdmin(context *ClientContext) (sarama.ClusterAdmin, error) {

"topic % does not exist" error is given even when topic auto creation is enabled

Trying to produce a message to the non-existent topic gives a "topic % does not exist" error in any case (seems that the logic responsible for this is here ) while it should be actually possible to do this if auto.create.topics.enable is true in kafka config.

P.S. Thanks for the great CLI tool, btw. It's quite handy to use for pet projects as well as for the "real" work :)

Unable to connect securely to Amazon MSK

Using the latest version of Kafkactl, I'm not able to connect securely to an Amazon MSK cluster.

kafkactl version: cmd.info{version:"1.7.0", buildTime:"2020-03-05T11:23:06Z", gitCommit:"60a67f9", goVersion:"go1.13.4", compiler:"gc", platform:"linux/amd64"}

OS: Debian:latest docker container on Amazon Linux host

Config:

contexts:
  localhost:
    brokers:
    - b-4.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
    - b-6.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
    - b-1.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
current-context: localhost

Running kafkactl get topics returns failed to create cluster admin: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)

Running kafkactl get topics -V returns

[kafkactl] 2020/05/12 21:04:16 Using config file: /root/.config/kafkactl/config.yml
[kafkactl] 2020/05/12 21:04:16 Assuming kafkaVersion: 2.0.0
[sarama  ] 2020/05/12 21:04:16 Initializing new client
[sarama  ] 2020/05/12 21:04:16 client/metadata fetching metadata for all topics from broker b-1.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/05/12 21:04:16 Connected to broker at b-1.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094 (unregistered)
[sarama  ] 2020/05/12 21:04:16 client/metadata got error from broker -1 while fetching metadata: unexpected EOF
[sarama  ] 2020/05/12 21:04:16 Closed connection to broker b-1.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/05/12 21:04:16 client/metadata fetching metadata for all topics from broker b-4.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/05/12 21:04:16 Connected to broker at b-4.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094 (unregistered)
[sarama  ] 2020/05/12 21:04:16 client/metadata got error from broker -1 while fetching metadata: unexpected EOF
[sarama  ] 2020/05/12 21:04:16 Closed connection to broker b-4.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/05/12 21:04:16 client/metadata fetching metadata for all topics from broker b-6.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/05/12 21:04:16 Connected to broker at b-6.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094 (unregistered)
[sarama  ] 2020/05/12 21:04:16 client/metadata got error from broker -1 while fetching metadata: unexpected EOF
[sarama  ] 2020/05/12 21:04:16 Closed connection to broker b-6.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/05/12 21:04:16 client/metadata no available broker to send metadata request to
[sarama  ] 2020/05/12 21:04:16 client/brokers resurrecting 3 dead seed brokers
[sarama  ] 2020/05/12 21:04:16 client/metadata retrying after 250ms... (3 attempts remaining)
[sarama  ] 2020/05/12 21:04:16 client/metadata fetching metadata for all topics from broker b-1.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/05/12 21:04:16 Connected to broker at b-1.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094 (unregistered)
[sarama  ] 2020/05/12 21:04:16 client/metadata got error from broker -1 while fetching metadata: unexpected EOF
[sarama  ] 2020/05/12 21:04:16 Closed connection to broker b-1.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/05/12 21:04:16 client/metadata fetching metadata for all topics from broker b-4.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/05/12 21:04:16 Connected to broker at b-4.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094 (unregistered)
[sarama  ] 2020/05/12 21:04:16 client/metadata got error from broker -1 while fetching metadata: unexpected EOF
[sarama  ] 2020/05/12 21:04:16 Closed connection to broker b-4.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/05/12 21:04:16 client/metadata fetching metadata for all topics from broker b-6.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/05/12 21:04:16 Connected to broker at b-6.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094 (unregistered)
[sarama  ] 2020/05/12 21:04:16 client/metadata got error from broker -1 while fetching metadata: unexpected EOF
[sarama  ] 2020/05/12 21:04:16 Closed connection to broker b-6.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/05/12 21:04:16 client/metadata no available broker to send metadata request to
[sarama  ] 2020/05/12 21:04:16 client/brokers resurrecting 3 dead seed brokers
[sarama  ] 2020/05/12 21:04:16 client/metadata retrying after 250ms... (2 attempts remaining)
[sarama  ] 2020/05/12 21:04:16 client/metadata fetching metadata for all topics from broker b-1.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/05/12 21:04:16 Connected to broker at b-1.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094 (unregistered)
[sarama  ] 2020/05/12 21:04:16 client/metadata got error from broker -1 while fetching metadata: unexpected EOF
[sarama  ] 2020/05/12 21:04:16 Closed connection to broker b-1.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/05/12 21:04:16 client/metadata fetching metadata for all topics from broker b-4.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/05/12 21:04:16 Connected to broker at b-4.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094 (unregistered)
[sarama  ] 2020/05/12 21:04:16 client/metadata got error from broker -1 while fetching metadata: unexpected EOF
[sarama  ] 2020/05/12 21:04:16 Closed connection to broker b-4.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/05/12 21:04:16 client/metadata fetching metadata for all topics from broker b-6.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/05/12 21:04:16 Connected to broker at b-6.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094 (unregistered)
[sarama  ] 2020/05/12 21:04:16 client/metadata got error from broker -1 while fetching metadata: unexpected EOF
[sarama  ] 2020/05/12 21:04:16 Closed connection to broker b-6.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/05/12 21:04:16 client/metadata no available broker to send metadata request to
[sarama  ] 2020/05/12 21:04:16 client/brokers resurrecting 3 dead seed brokers
[sarama  ] 2020/05/12 21:04:16 client/metadata retrying after 250ms... (1 attempts remaining)
[sarama  ] 2020/05/12 21:04:16 client/metadata fetching metadata for all topics from broker b-1.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/05/12 21:04:16 Connected to broker at b-1.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094 (unregistered)
[sarama  ] 2020/05/12 21:04:16 client/metadata got error from broker -1 while fetching metadata: unexpected EOF
[sarama  ] 2020/05/12 21:04:16 Closed connection to broker b-1.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/05/12 21:04:16 client/metadata fetching metadata for all topics from broker b-4.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/05/12 21:04:16 Connected to broker at b-4.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094 (unregistered)
[sarama  ] 2020/05/12 21:04:16 client/metadata got error from broker -1 while fetching metadata: unexpected EOF
[sarama  ] 2020/05/12 21:04:16 Closed connection to broker b-4.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/05/12 21:04:16 client/metadata fetching metadata for all topics from broker b-6.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/05/12 21:04:16 Connected to broker at b-6.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094 (unregistered)
[sarama  ] 2020/05/12 21:04:16 client/metadata got error from broker -1 while fetching metadata: unexpected EOF
[sarama  ] 2020/05/12 21:04:16 Closed connection to broker b-6.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/05/12 21:04:16 client/metadata no available broker to send metadata request to
[sarama  ] 2020/05/12 21:04:16 client/brokers resurrecting 3 dead seed brokers
[sarama  ] 2020/05/12 21:04:16 Closing Client
failed to create cluster admin: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)

Adding tlsInsecure: true to config, and running kafkactl get topics -V yields:

[kafkactl] 2020/05/12 21:05:45 Using config file: /root/.config/kafkactl/config.yml
[kafkactl] 2020/05/12 21:05:45 Assuming kafkaVersion: 2.0.0
[sarama  ] 2020/05/12 21:05:45 Initializing new client
[sarama  ] 2020/05/12 21:05:45 client/metadata fetching metadata for all topics from broker b-4.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/05/12 21:05:45 Connected to broker at b-4.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094 (unregistered)
[sarama  ] 2020/05/12 21:05:45 client/brokers registered new broker #5 at b-5.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/05/12 21:05:45 client/brokers registered new broker #4 at b-4.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/05/12 21:05:45 client/brokers registered new broker #1 at b-1.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/05/12 21:05:45 client/brokers registered new broker #6 at b-6.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/05/12 21:05:45 client/brokers registered new broker #2 at b-2.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/05/12 21:05:45 client/brokers registered new broker #3 at b-3.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/05/12 21:05:45 Successfully initialized new client
[sarama  ] 2020/05/12 21:05:45 Initializing new client
[sarama  ] 2020/05/12 21:05:45 client/metadata fetching metadata for all topics from broker b-4.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/05/12 21:05:45 Connected to broker at b-4.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094 (unregistered)
[sarama  ] 2020/05/12 21:05:45 client/brokers registered new broker #5 at b-5.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/05/12 21:05:45 client/brokers registered new broker #4 at b-4.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/05/12 21:05:45 client/brokers registered new broker #1 at b-1.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/05/12 21:05:45 client/brokers registered new broker #6 at b-6.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/05/12 21:05:45 client/brokers registered new broker #2 at b-2.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/05/12 21:05:45 client/brokers registered new broker #3 at b-3.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/05/12 21:05:45 Successfully initialized new client
TOPIC     PARTITIONS

Note: This is a brand new cluster, so no topics are expected.

Connecting to the cluster using openssl s_client -connect b-5.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com:9094 yields:

CONNECTED(00000003)
depth=2 C = US, O = Amazon, CN = Amazon Root CA 1
verify return:1
depth=1 C = US, O = Amazon, OU = Server CA 1B, CN = Amazon
verify return:1
depth=0 CN = *.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com
verify return:1
---
Certificate chain
 0 s:CN = *.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com
   i:C = US, O = Amazon, OU = Server CA 1B, CN = Amazon
 1 s:C = US, O = Amazon, OU = Server CA 1B, CN = Amazon
   i:C = US, O = Amazon, CN = Amazon Root CA 1
 2 s:C = US, O = Amazon, CN = Amazon Root CA 1
   i:C = US, ST = Arizona, L = Scottsdale, O = "Starfield Technologies, Inc.", CN = Starfield Services Root Certificate Authority - G2
 3 s:C = US, ST = Arizona, L = Scottsdale, O = "Starfield Technologies, Inc.", CN = Starfield Services Root Certificate Authority - G2
   i:C = US, O = "Starfield Technologies, Inc.", OU = Starfield Class 2 Certification Authority
---
Server certificate
-----BEGIN CERTIFICATE-----
MIIFozCCBIugAwIBAgIQA2vMgvNCTPp44DHXuPsN9zANBgkqhkiG9w0BAQsFADBG
MQswCQYDVQQGEwJVUzEPMA0GA1UEChMGQW1hem9uMRUwEwYDVQQLEwxTZXJ2ZXIg
Q0EgMUIxDzANBgNVBAMTBkFtYXpvbjAeFw0yMDA1MTIwMDAwMDBaFw0yMTA2MTIx
MjAwMDBaMDgxNjA0BgNVBAMMLSoubXNrLm13d2FiNy5jNC5rYWZrYS5ldS13ZXN0
LTEuYW1hem9uYXdzLmNvbTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEB
ANnjc1PPDNE0aeDS16iMhwr31qqqlrWnA8BdUudMF/iXiwSKdeUpSQbXlg/0sjja
ejTZGeXZqrt7Pg9bajNktPj/ks3wFdtt2M0+n6hbcxuAjS5en3cgR/WALCNAlQ89
y8n++0O5Qddd18RiRByK6rxhIBEbzkizmsdzaHEE2ZXbKW94Z/m8Pb2pH9oAcpVr
gyoqfTIM/708s78obcRQYWgEgPZRXNjrlnrT/SRDM4K0EPCX9oyVPq14DibjcTHU
ZFsKp/10jFEhDAga7segyYA6LCwkM/5R/TI91jT9DT4lf/goH+CIYt4PGVYizgB3
IdEBgJKk/wuZeM4sy3MLXyUCAwEAAaOCApkwggKVMB8GA1UdIwQYMBaAFFmkZgZS
oHuVkjyjlAcnlnRb+T3QMB0GA1UdDgQWBBQBS0SsGG+pmh5K7kuowHS60kyQczA4
BgNVHREEMTAvgi0qLm1zay5td3dhYjcuYzQua2Fma2EuZXUtd2VzdC0xLmFtYXpv
bmF3cy5jb20wDgYDVR0PAQH/BAQDAgWgMB0GA1UdJQQWMBQGCCsGAQUFBwMBBggr
BgEFBQcDAjA7BgNVHR8ENDAyMDCgLqAshipodHRwOi8vY3JsLnNjYTFiLmFtYXpv
bnRydXN0LmNvbS9zY2ExYi5jcmwwIAYDVR0gBBkwFzALBglghkgBhv1sAQIwCAYG
Z4EMAQIBMHUGCCsGAQUFBwEBBGkwZzAtBggrBgEFBQcwAYYhaHR0cDovL29jc3Au
c2NhMWIuYW1hem9udHJ1c3QuY29tMDYGCCsGAQUFBzAChipodHRwOi8vY3J0LnNj
YTFiLmFtYXpvbnRydXN0LmNvbS9zY2ExYi5jcnQwDAYDVR0TAQH/BAIwADCCAQQG
CisGAQQB1nkCBAIEgfUEgfIA8AB2APZclC/RdzAiFFQYCDCUVo7jTRMZM7/fDC8g
C8xO8WTjAAABcgqSjoEAAAQDAEcwRQIgWaqBa89gkPbvaKvdb041YMzvzRA3DQ8M
/flq1Xa0sVMCIQDYHaQNfgWwbJN1w9VxRokNWYDlsMoKy0KHVWQMpWLXOgB2AFzc
Q5L+5qtFRLFemtRW5hA3+9X6R9yhc5SyXub2xw7KAAABcgqSjnwAAAQDAEcwRQIh
AL5zx8nn3sHZkdHstJqFSoDa0Tg4GRpYmmdXKXgFP/etAiAyEBa29rkkC7c0JVrn
XoGpeyumqgaGXo51ukh4QDZC+zANBgkqhkiG9w0BAQsFAAOCAQEAVOkBh/7tzE2j
Cwv9wG88EwNjPbUJcPjUKKcY5km9hfhnfcQkxXZKsaRMn6wK2ZrSvNrfPp6G9fcp
7/y5S4mGNvRsRZBFNTae/uT2363cngpnRaYWROd5iHR8Zc1bQDZzo5PddhTUrvU2
sCoHPrzQKJBAv2uGrylvyl+jHMLqKAWnEwFuWjf+ibJ+xIbJ6/hqWYknnG8nCCQ8
uynZ4pBMbFZhLh2UrukGu9e9ljcDfrQ2B5+H9T4XzfKM1ijUXwGO3ZnA9acY4xTm
iMs0ZzjBEluAoAymSQwuVAG50KClmY5tYiyePazi8ZZG7jLXwumCrNBFtcFS2ihn
5bGKn6WVMA==
-----END CERTIFICATE-----
subject=CN = *.msk.mwwab7.c4.kafka.eu-west-1.amazonaws.com

issuer=C = US, O = Amazon, OU = Server CA 1B, CN = Amazon

---
No client certificate CA names sent
Peer signing digest: SHA256
Peer signature type: RSA
Server Temp Key: ECDH, P-256, 256 bits
---
SSL handshake has read 5364 bytes and written 465 bytes
Verification: OK
---
New, TLSv1.2, Cipher is ECDHE-RSA-AES256-GCM-SHA384
Server public key is 2048 bit
Secure Renegotiation IS supported
Compression: NONE
Expansion: NONE
No ALPN negotiated
SSL-Session:
    Protocol  : TLSv1.2
    Cipher    : ECDHE-RSA-AES256-GCM-SHA384
    Session-ID: 5EBB10161A322E16C4F8950E5395111911503394B25922B8C03DC3523101B97B
    Session-ID-ctx:
    Master-Key: 727BF84EC5BAE5BF675FB76D3980E2031AC3E3B8C849EE43FA6DB1FF64DAAF27EA51A1BF9B8B81F25A17D290F41CACD7
    PSK identity: None
    PSK identity hint: None
    SRP username: None
    Start Time: 1589317654
    Timeout   : 7200 (sec)
    Verify return code: 0 (ok)
    Extended master secret: yes
---

Indicating that the certificate used by Amazon MSK is trusted by the OS.
It is my understanding that the underlying library, sarama, uses Golangs default tls.Config, which uses the OS's truststore if none is given.

I've tried pointing the config, with tlsCA, to a file containing the Amazon Root CA 1, but to no avail. It tells me that I need to add a client cert and private key, which isn't in use by MSK.

This pretty much prevents one from using kafkactl with Amazon MSK, as you have no way of verifying the hostname.

[Windows] "crypto/x509: system root pool is not available on Windows"

With Windows, we see the warning error reading system cert pool: crypto/x509: system root pool is not available on Windows everytime kafkactl reaches the brokers.

Example:

MINGW64 ~/Downloads
$ ./kafkactl.exe get topics --config-file kafkactl-dev.yaml
error reading system cert pool: crypto/x509: system root pool is not available on Windows
error reading system cert pool: crypto/x509: system root pool is not available on Windows
TOPIC                                                           PARTITIONS
[...]

I saw a workaround here: golang/go#16736 (comment). I don't know if it can help.

produced messages rejected

Apologies for the r=1 issue, but I cannot find any relevant trace of solution for my problem.

When I try to produce messages with kafkactl, I always get an error message:

root@kubectl /# kafkactl produce hello-world -v hello
Failed to produce message: kafka server: Messages are rejected since there are fewer in-sync replicas than required.

Producing into the very same topic with kafka-console-producer.sh works without any issue.

root@kubectl ~/k/bin# ./kafka-console-producer.sh --broker-list b-1.*****.c4.kafka.eu-central-1.amazonaws.com:9096,b-3.*****.c4.kafka.eu-central-1.amazonaws.com:9096,b-2.*****.c4.kafka.eu-central-1.amazonaws.com:9096 --topic hello-world --producer.config client_sasl.properties
>hello

Any ideas what this can be?

Remove topic from consumer group

When an application is refactored to replace one input topic by another one, old topic must be removed from the consumer group (we track consumer group lag using something similar to https://github.com/lightbend/kafka-lag-exporter).

kafka-consumer-groups allows such operation with a combination of --delete-offsets and --topic.

./kafka-consumer-groups.sh  --bootstrap-server 10.10.10.10:9092 --delete-offsets --group my_group --topic old_topic

kafkactl doesn't seems to support this feature. It is a deliberate choice or a feature gap?

kafkactl get consumer-groups fails

Error message:
failed to get group member assignment: kafka: insufficient data to decode packet, more bytes expected

All other commands work just fine.

kafkactl version:
cmd.info{version:"v1.23.1", buildTime:"2021-11-23T13:08:18Z", gitCommit:"9cb9f72", goVersion:"go1.16.10", compiler:"gc", platform:"darwin/amd64"}

validate-only should return an error when attempting to alter a topic to its current state

I'm using kafkactl as an methodology to add, update and delete topics with some puppet automation around it. I'm relying on the --validate-only flag to return 0 so that I can proceed with the requested change. However, if I pass in an alteration on a topic that actually matches the current state, I would expect for --validate-only to throw an error because it's not possible to alter a topic to the current state by removing the --validate-only argument from the command.

For example, if I wanted to increase the partition count from 1 to 2, I would do something like this:

# kafkactl describe topic ernst
CONFIG                     VALUE
min.insync.replicas        1
segment.bytes              1073741824
message.format.version     2.4-IV1

PARTITION     OLDEST_OFFSET     NEWEST_OFFSET     LEADER                              REPLICAS     IN_SYNC_REPLICAS
0             0                 0                 kfk001:9091     1            1

# kafkactl alter topic ernst --replication-factor 1 --partitions 2 --validate-only
PARTITION     OLDEST_OFFSET     NEWEST_OFFSET     LEADER                              REPLICAS     IN_SYNC_REPLICAS
0             0                 0                 kfk001:9091     1            1
1             0                 0                                                     2

# echo $?
0

# kafkactl alter topic ernst --replication-factor 1 --partitions 2
partitions have been created
partition replicas have been reassigned

That brings us to this point, where I would like to test whether I could change to 2 partitions, and the --validate-only returns a 0. I'd expect that the validation would say, "This is already the current state, so here's a non-zero return code", since when you attempt to actually run without the --validate-only flag, you will receive an error from kafkactl.

# kafkactl alter topic ernst --replication-factor 1 --partitions 2 --validate-only
PARTITION     OLDEST_OFFSET     NEWEST_OFFSET     LEADER                              REPLICAS     IN_SYNC_REPLICAS
0             0                 0                 kfk001:9091     1            1
1             0                 0                 kfk002:9091     2            2

# echo $?
0

So, now without --validate-only, I would expect that this would run correctly, since it validated correctly (but it doesn't).

# kafkactl alter topic ernst --replication-factor 1 --partitions 2
Could not create partitions for topic 'ernst': kafka server: Number of partitions is invalid. - Topic already has 2 partitions.

# echo $?
1

The specific logic that I'm trying to use is as follows (in a Puppet ERB template, but that's besides the point -- just throwing it in here for context.

if /usr/local/bin/kafkactl alter topic <%= v1['name'] %> --replication-factor ${REPLICATION_FACTOR} --partitions ${PARTITION_COUNT} <% unless v1['additional_params'].nil? -%><%=  v1['additional_params'] %><% end -%> --validate-only ${KAFKACFG_FILE} 2>&1 > /dev/null; then
    /usr/local/bin/kafkactl  alter topic <%= v1['name'] %> --replication-factor ${REPLICATION_FACTOR} --partitions ${PARTITION_COUNT} <% unless v1['additional_params'].nil? -%><%=  v1['additional_params'] %><% end %> ${KAFKACFG_FILE}
fi

Shell completion should honor `-C other-config.yaml`

If a different config file is used with -C and shell completion is attempted:

kafkactl -C other-config.yaml consume <TAB><TAB>

the results from the completion are determined using the default config file.

I'm talking to two Kafka clusters using kafkactl in two separate terminal windows, so doing use-context every time is not practical. I separated the configs in two files and I pass it with -C, but the completions I'm getting come from the default one.

Docker image does not include CA certificates

First off: Great work on allowing us to configure Kafkactl using environment variables! Makes the tool very useable with Amazon MSK

The issue:
You provided docker image does not seem to include any CA certificates. In order to connect to a TLS host (In this case an Amazon MSK cluster using certificates provided by Amazon) one needs to either set TLS_INSECURE=true or TLS_CA=/path/to/provided/ca.

Unfortunately, in my use-case, adding the CA certificate manually does not work very well, as the container is running on a remote host (and is launched ad-hoc).

Using the TLS_INSECURE=true is a workaround, but not optimal, especially not for production.

Command example:

user@machine dsp % kubectl run -i --tty --rm kafkactl --image=deviceinsight/kafkactl:latest --restart=Never --env="TLS_ENABLED=true" --env="BROKERS=$(aws --profile saml kafka get-bootstrap-brokers --cluster-arn $(aws --profile saml kafka list-clusters --cluster-name-filter 'msk' --query 'ClusterInfoList[0].{arn:ClusterArn}' | jq -r .arn) | jq -r .BootstrapBrokerStringTls | sed 's/,/ /g')" -- kafkactl -V get topics
If you don't see a command prompt, try pressing enter.
[sarama  ] 2020/09/19 20:49:42 client/metadata fetching metadata for all topics from broker b-3.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/09/19 20:49:42 Failed to connect to broker b-3.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094: x509: certificate signed by unknown authority
[sarama  ] 2020/09/19 20:49:42 client/metadata got error from broker -1 while fetching metadata: x509: certificate signed by unknown authority
[sarama  ] 2020/09/19 20:49:42 client/metadata fetching metadata for all topics from broker b-6.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/09/19 20:49:42 Failed to connect to broker b-6.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094: x509: certificate signed by unknown authority
[sarama  ] 2020/09/19 20:49:42 client/metadata got error from broker -1 while fetching metadata: x509: certificate signed by unknown authority
[sarama  ] 2020/09/19 20:49:42 client/metadata fetching metadata for all topics from broker b-1.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/09/19 20:49:42 Failed to connect to broker b-1.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094: x509: certificate signed by unknown authority
[sarama  ] 2020/09/19 20:49:42 client/metadata got error from broker -1 while fetching metadata: x509: certificate signed by unknown authority
[sarama  ] 2020/09/19 20:49:42 client/metadata no available broker to send metadata request to
[sarama  ] 2020/09/19 20:49:42 client/brokers resurrecting 3 dead seed brokers
[sarama  ] 2020/09/19 20:49:42 Closing Client
failed to create cluster admin: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
pod "kafkactl" deleted
pod default/kafkactl terminated (Error)
user@machine dsp % kubectl run -i --tty --rm kafkactl --image=deviceinsight/kafkactl:latest --restart=Never --env="TLS_INSECURE=true" --env="TLS_ENABLED=true" --env="BROKERS=$(aws --profile saml kafka get-bootstrap-brokers --cluster-arn $(aws --profile saml kafka list-clusters --cluster-name-filter 'msk' --query 'ClusterInfoList[0].{arn:ClusterArn}' | jq -r .arn) | jq -r .BootstrapBrokerStringTls | sed 's/,/ /g')" -- kafkactl -V get topics
[kafkactl] 2020/09/19 20:50:11 generated default config at /home/kafkactl/.config/kafkactl/config.yml
[kafkactl] 2020/09/19 20:50:11 Using config file: /home/kafkactl/.config/kafkactl/config.yml
[kafkactl] 2020/09/19 20:50:11 Assuming kafkaVersion: 2.0.0
[kafkactl] 2020/09/19 20:50:11 Assuming kafkaVersion: 2.0.0
[kafkactl] 2020/09/19 20:50:11 TLS is enabled.
[sarama  ] 2020/09/19 20:50:11 Initializing new client
[sarama  ] 2020/09/19 20:50:11 client/metadata fetching metadata for all topics from broker b-6.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/09/19 20:50:11 Connected to broker at b-6.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094 (unregistered)
[sarama  ] 2020/09/19 20:50:11 client/brokers registered new broker #5 at b-5.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/09/19 20:50:11 client/brokers registered new broker #4 at b-4.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/09/19 20:50:11 client/brokers registered new broker #1 at b-1.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/09/19 20:50:11 client/brokers registered new broker #6 at b-6.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/09/19 20:50:11 client/brokers registered new broker #2 at b-2.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/09/19 20:50:11 client/brokers registered new broker #3 at b-3.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/09/19 20:50:11 Successfully initialized new client
[kafkactl] 2020/09/19 20:50:11 TLS is enabled.
[sarama  ] 2020/09/19 20:50:11 Initializing new client
[sarama  ] 2020/09/19 20:50:11 client/metadata fetching metadata for all topics from broker b-6.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/09/19 20:50:12 Connected to broker at b-3.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094 (registered as #3)
[sarama  ] 2020/09/19 20:50:12 Connected to broker at b-6.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094 (unregistered)
[sarama  ] 2020/09/19 20:50:12 client/brokers registered new broker #5 at b-5.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/09/19 20:50:12 client/brokers registered new broker #4 at b-4.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/09/19 20:50:12 client/brokers registered new broker #1 at b-1.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/09/19 20:50:12 client/brokers registered new broker #6 at b-6.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/09/19 20:50:12 client/brokers registered new broker #2 at b-2.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/09/19 20:50:12 client/brokers registered new broker #3 at b-3.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094
[sarama  ] 2020/09/19 20:50:12 Successfully initialized new client
TOPIC                  PARTITIONS
__consumer_offsets     50
pod "kafkactl" deleted

Please consider installing CA certificates to the docker ubuntu image. It should only use around 5801kB more space.

Support Kafka ACLs

Is there any chance this awesome tools can has Create/Remove ACL for authorizations ?

Support SCRAM-SHA-512

Problem

Kafkactl does not support SCRAM-SHA-512 authentication

Context

As noted in the AWS MSK developer guide here, MSK only supports SCRAM-SHA-512 when using SASL authentication. We should add an extra optional configuration parameter to kafkactl to allow setting which SCRAM mechanism that you want to use.

Implementation Notes:

Digging into sarama we see that it will default to SASLTypePlaintext if no sassl mechanism is set in the config. If the mechanism is set, then this will get executed. This example and this example shows how to make the missing generator function very easily.

Default config generation

In order to simplify getting started with kafkactl we should try to generate a simple config file automatically if no config was found.
This config should work for a kafka cluster running locally on localhost:9092 so it should look like this:

contexts:
  localhost:
    brokers:
    - localhost:9092
current-context: localhost

for the config file location we should use $HOME/.config/kafkactl/config.yml (location according to XDG)

It is not working with latest kubectl

$ kafkactl get topics
Error: unknown flag: --generator
See 'kubectl run --help' for usage.
command "/usr/local/bin/kubectl" exited with non-zero status:

PATH:
  /usr/local/bin/kubectl
$ kubectl version
Client Version: version.Info{Major:"1", Minor:"21", GitVersion:"v1.21.0", GitCommit:"cb303e613a121a29364f75cc67d3d580833a7479", GitTreeState:"clean", BuildDate:"2021-04-08T21:16:14Z", GoVersion:"go1.16.3", Compiler:"gc", Platform:"darwin/amd64"}

Am I doing something wrong? It used to work and it stopped recently.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.