Giter Site home page Giter Site logo

twmb / kcl Goto Github PK

View Code? Open in Web Editor NEW
186.0 4.0 19.0 469 KB

Your one stop shop to do anything with Kafka. Producing, consuming, transacting, administrating; 0.8.0 through 3.2+

License: BSD 3-Clause "New" or "Revised" License

Go 99.82% Shell 0.16% Makefile 0.02%
kafka go golang client kafka-client

kcl's People

Contributors

celrenheit avatar dbudworth avatar dependabot[bot] avatar momingkotoba avatar robsonpeixoto avatar steadbytes avatar twmb avatar zach-johnson avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

kcl's Issues

feature: list topics command

It's often useful to be able to list all available topics. kcl could support this with a kcl topic list command:

kcl topic list
Topic   Partitions  Internal?
topic1  3           false
topic2  3           false
topic3  3           false

It could also include optional internal topic filtering. What do you think?

There's some potential complication with handling Kafka versions < 0.10.0 due to kmsg.MetadataRequest.Topics taking nil or and empty slice to indicate all topics but the gist of the command is something like:

func topicListCommand(cl *client.Client) *cobra.Command {
	cmd := &cobra.Command{
		Use:   "list",
		Short: "List all topics (Kafka 0.10.0.0+)",
		Run: func(_ *cobra.Command, topics []string) {
			req := kmsg.NewMetadataRequest()
			kresp, err := cl.Client().Request(context.Background(), &req)
			out.MaybeDie(err, "unable to list topics: %v", err)
			resp := kresp.(*kmsg.MetadataResponse)
			if cl.AsJSON() {
				out.ExitJSON(resp.Topics)
			}
			tw := out.BeginTabWrite()
			defer tw.Flush()
			fmt.Fprintln(tw, "Topic\tPartitions\tInternal?")
			for _, topicResp := range resp.Topics {
				fmt.Fprintf(tw, "%s\t%d\t%t\n", topicResp.Topic, len(topicResp.Partitions), topicResp.IsInternal)
			}
		},
	}
	return cmd
}

Happy to submit a PR for implementation if desired 😄

UserCRAM was created success but cannot authenticate to cluster

Command:

/kcl --config-path config.toml admin user-scram alter --set user=user1,mechanism=scram-sha-512,iterations=8192,password=123456

Error:

org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512

kcl 0.11.0 doesn't work on Amazon Linux 2 anymore (glibc)

kcl 0.11.0 doesn't work on Amazon Linux 2 anymore due to incompatibility with the version of glibc on that system.

kcl 0.10.0 and earlier are still working on that distribution.

$ wget https://github.com/twmb/kcl/releases/download/v0.11.0/kcl_linux_amd64.gz
--2023-03-08 16:47:42--  https://github.com/twmb/kcl/releases/download/v0.11.0/kcl_linux_amd64.gz
Resolving github.com (github.com)... 140.82.121.3
Connecting to github.com (github.com)|140.82.121.3|:443... connected.
[...]
Saving to: ‘kcl_linux_amd64.gz’

2023-03-08 16:47:43 (111 MB/s) - ‘kcl_linux_amd64.gz’ saved [12819405/12819405]

$ gunzip kcl_linux_amd64.gz 
$ chmod +x kcl_linux_amd64 
$ ./kcl_linux_amd64 
./kcl_linux_amd64: /lib64/libc.so.6: version `GLIBC_2.32' not found (required by ./kcl_linux_amd64)
./kcl_linux_amd64: /lib64/libc.so.6: version `GLIBC_2.34' not found (required by ./kcl_linux_amd64)
$ rpm -q glibc
glibc-2.26-62.amzn2.x86_64

Tag 0.4.0

Hey!

Can you please tag current master with the next version?

We are using master branch now to work with AWS IAM

Unable to fix LSO on stuck partition

Hi @twmb -

I believe I'm hitting the issue described in https://issues.apache.org/jira/browse/KAFKA-12671 and I'm attempting to resolve it without entirely deleting the topic. Just for clarity, the situation I'm in is:

  • A topic with 15 partitions
  • One partition (7) is apparently stuck, I can produce data transactionally to it (using franz-go, I've also tried with kafkacat with the same results) without any errors. I can see the data on the topic by specifying read_uncommitted, but nothing with read_committed.
  • All other partitions are seemingly fine

I came across your notes in that Kafka issue and I'm hitting an error when attempting to resolve with kcl admin txn unstick-lso:

kcl admin txn unstick-lso <topic>
Issuing Metadata request for the topic to determine its number of partitions...
Metadata indicates that there are 15 partitions in the topic. Issuing ListOffsets request to determine which partitions may be stuck (i.e., have records)...
Looking for the last stable offset on a partition in <topic>...
Found partition 7 stuck at offset 144, looking for producer that caused this...
Found causing producer ID and epoch 2000/0, looking in __transaction_state to find its transactional id...

Found topic <topic> stuck with transactional id <txn-id>.
At this point we can attempt to unhang the transaction.
Issue AddPartitionsToTxn followed by EndTxn? [Y/n] Y
Proceeding to unstick...

Issuing AddPartitionsToTxn request to fake a new transaction...
partition in AddPartitionsToTxn had error: INVALID_PRODUCER_ID_MAPPING: The producer attempted to use a producer id which is not currently assigned to its transactional id.

I'm unclear how to proceed from here and was hoping you might have some ideas. I've disabled the producer that was using this transaction ID and waited for longer than transactional.id.expiration.ms and retried it with the same result.

Support force using ListOffsetsRequest to fetch offset

Why have this proposal

Some cloud service providers (such as alibabacloud) haven't implemented OffsetsForLeaderEpoch.

What happened

Franz-go uses OffsetsForLeaderEpoch to fetch leader epoch/End Offset and compare consumer offset between leader. After restart, will trigger reset offset behaviour.

Log

[root@xxxx dig]# ./kcl_linux_amd64 misc list-offsets TARGET-TOPIC
BROKER  TOPIC                  PARTITION  START  END   ERROR
101     TARGET-TOPIC  0          170    270
102     TARGET-TOPIC  1          100    100
103     TARGET-TOPIC  2          100    100
101     TARGET-TOPIC  3          100    100
102     TARGET-TOPIC  4          36     100
103     TARGET-TOPIC  5          0      85
[root@xxxx dig]# ./kcl_linux_amd64 misc offset-for-leader-epoch TARGET-TOPIC
BROKER  TOPIC                  PARTITION  LEADER EPOCH  END OFFSET  ERROR
101     TARGET-TOPIC  0          0             0
101     TARGET-TOPIC  3          0             0
102     TARGET-TOPIC  1          0             0
102     TARGET-TOPIC  4          0             0
103     TARGET-TOPIC  2          0             0
103     TARGET-TOPIC  5          0             0
[root@xxxx dig]# ./kcl_linux_amd64 group describe -v xxxx
GROUP        xxxx
COORDINATOR  103
STATE        Stable
BALANCER     cooperative-sticky
MEMBERS      3
TOPIC                  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG   MEMBER-ID                                 CLIENT-ID  HOST
TARGET-TOPIC  0          270             270             0     kgo-xxxx  kgo        /xxxx
TARGET-TOPIC  1          100             100             0     kgo-xxxx  kgo        /xxxxx
TARGET-TOPIC  2          100             100             0     kgo-xxxx  kgo        /xxxx
TARGET-TOPIC  3          100             100             0     kgo-xxxx  kgo        /xxxx
TARGET-TOPIC  4          100             100             0     kgo-xxxx  kgo        /xxxx
TARGET-TOPIC  5          85              85              0     kgo-xxxx  kgo        /xxxxx

list offsets: print unissued partitions

If some partitions cannot be requested due to metadata load failures, we currently print a log saying some things failed, and then continue on with what was successful. Instead, we can print the failures in our printed table at the end, with the failure in the error column.

feature: transactions for `kcl produce`

Similar to kafkacat, we can take all messages produced during an invocation of kcl produce and bundle them into a transaction.

As an added bonus, we can have an optional commit-after timer or message count.

Connect to kafka without config

Hello. I'm used to using kafka utils from kafka distribution like

./kafka-topics.sh --bootstrap-server hostname:9092 --describe

Is there way to use kcl without creating any config ? Maybe it's not clear for me how to create this config, how to use it. I just want to connect and get what i want from kafka by single command.
Thank you.

Feature Requests: "consume until" and produce tombstones

Hello,

There are a couple of things that would be great to have on this CLI. I'd like to be able to consume from a topic with a "until" setting - this could be either a specific offset or the "end" of the topic partitions (determined by the LSO at the start of the command).

I would also like to be able to produce tombstone messages, which as far as I can tell isn't currently possible. It looks like producing an empty value does not result in a tombstone right now. For example, kafkacat handles this with a -Z producer flag which translates empty values into nulls.

Are those things you would consider adding or would you consider PRs for these two things?

Support AlterPartitionReassignments cancel

Currently, we can issue an AlterPartitionReassignments request with something like

kcl admin partas alter 'foo:0->0,1;1->2,3;2->1,3'

The docs mention the ability to cancel a request

If a replica list is empty for a specific partition, this cancels any active
reassignment for that partition

However, it is currently unsupported.

feature: start & end times for consuming

kcl can support consuming from specific timestamps.

The syntax for this should be s@<microsecond> and e@<microsecond> (where <microsecond> refers to a unix microsecond timestamp), and this can be pieced into the existing -o flag. Both or just one can be specified.

The logic for this is to basically first get the metadata for all consumed topics, and then issue a list offsets request with Timestamp being the start timestamp. While consuming, if the record timestamp for a partition surpasses the end timestamp, stop.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.