twmb / kcl Goto Github PK
View Code? Open in Web Editor NEWYour one stop shop to do anything with Kafka. Producing, consuming, transacting, administrating; 0.8.0 through 3.2+
License: BSD 3-Clause "New" or "Revised" License
Your one stop shop to do anything with Kafka. Producing, consuming, transacting, administrating; 0.8.0 through 3.2+
License: BSD 3-Clause "New" or "Revised" License
It's often useful to be able to list all available topics. kcl
could support this with a kcl topic list
command:
kcl topic list
Topic Partitions Internal?
topic1 3 false
topic2 3 false
topic3 3 false
It could also include optional internal topic filtering. What do you think?
There's some potential complication with handling Kafka versions < 0.10.0
due to kmsg.MetadataRequest.Topics
taking nil
or and empty slice to indicate all topics but the gist of the command is something like:
func topicListCommand(cl *client.Client) *cobra.Command {
cmd := &cobra.Command{
Use: "list",
Short: "List all topics (Kafka 0.10.0.0+)",
Run: func(_ *cobra.Command, topics []string) {
req := kmsg.NewMetadataRequest()
kresp, err := cl.Client().Request(context.Background(), &req)
out.MaybeDie(err, "unable to list topics: %v", err)
resp := kresp.(*kmsg.MetadataResponse)
if cl.AsJSON() {
out.ExitJSON(resp.Topics)
}
tw := out.BeginTabWrite()
defer tw.Flush()
fmt.Fprintln(tw, "Topic\tPartitions\tInternal?")
for _, topicResp := range resp.Topics {
fmt.Fprintf(tw, "%s\t%d\t%t\n", topicResp.Topic, len(topicResp.Partitions), topicResp.IsInternal)
}
},
}
return cmd
}
Happy to submit a PR for implementation if desired 😄
Command:
/kcl --config-path config.toml admin user-scram alter --set user=user1,mechanism=scram-sha-512,iterations=8192,password=123456
Error:
org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512
kcl 0.11.0 doesn't work on Amazon Linux 2 anymore due to incompatibility with the version of glibc on that system.
kcl 0.10.0 and earlier are still working on that distribution.
$ wget https://github.com/twmb/kcl/releases/download/v0.11.0/kcl_linux_amd64.gz
--2023-03-08 16:47:42-- https://github.com/twmb/kcl/releases/download/v0.11.0/kcl_linux_amd64.gz
Resolving github.com (github.com)... 140.82.121.3
Connecting to github.com (github.com)|140.82.121.3|:443... connected.
[...]
Saving to: ‘kcl_linux_amd64.gz’
2023-03-08 16:47:43 (111 MB/s) - ‘kcl_linux_amd64.gz’ saved [12819405/12819405]
$ gunzip kcl_linux_amd64.gz
$ chmod +x kcl_linux_amd64
$ ./kcl_linux_amd64
./kcl_linux_amd64: /lib64/libc.so.6: version `GLIBC_2.32' not found (required by ./kcl_linux_amd64)
./kcl_linux_amd64: /lib64/libc.so.6: version `GLIBC_2.34' not found (required by ./kcl_linux_amd64)
$ rpm -q glibc
glibc-2.26-62.amzn2.x86_64
Hey!
Can you please tag current master with the next version?
We are using master branch now to work with AWS IAM
Hi @twmb -
I believe I'm hitting the issue described in https://issues.apache.org/jira/browse/KAFKA-12671 and I'm attempting to resolve it without entirely deleting the topic. Just for clarity, the situation I'm in is:
read_uncommitted
, but nothing with read_committed
.I came across your notes in that Kafka issue and I'm hitting an error when attempting to resolve with kcl admin txn unstick-lso
:
kcl admin txn unstick-lso <topic>
Issuing Metadata request for the topic to determine its number of partitions...
Metadata indicates that there are 15 partitions in the topic. Issuing ListOffsets request to determine which partitions may be stuck (i.e., have records)...
Looking for the last stable offset on a partition in <topic>...
Found partition 7 stuck at offset 144, looking for producer that caused this...
Found causing producer ID and epoch 2000/0, looking in __transaction_state to find its transactional id...
Found topic <topic> stuck with transactional id <txn-id>.
At this point we can attempt to unhang the transaction.
Issue AddPartitionsToTxn followed by EndTxn? [Y/n] Y
Proceeding to unstick...
Issuing AddPartitionsToTxn request to fake a new transaction...
partition in AddPartitionsToTxn had error: INVALID_PRODUCER_ID_MAPPING: The producer attempted to use a producer id which is not currently assigned to its transactional id.
I'm unclear how to proceed from here and was hoping you might have some ideas. I've disabled the producer that was using this transaction ID and waited for longer than transactional.id.expiration.ms
and retried it with the same result.
Some cloud service providers (such as alibabacloud) haven't implemented OffsetsForLeaderEpoch.
Franz-go uses OffsetsForLeaderEpoch
to fetch leader epoch/End Offset and compare consumer offset between leader. After restart, will trigger reset offset behaviour.
[root@xxxx dig]# ./kcl_linux_amd64 misc list-offsets TARGET-TOPIC
BROKER TOPIC PARTITION START END ERROR
101 TARGET-TOPIC 0 170 270
102 TARGET-TOPIC 1 100 100
103 TARGET-TOPIC 2 100 100
101 TARGET-TOPIC 3 100 100
102 TARGET-TOPIC 4 36 100
103 TARGET-TOPIC 5 0 85
[root@xxxx dig]# ./kcl_linux_amd64 misc offset-for-leader-epoch TARGET-TOPIC
BROKER TOPIC PARTITION LEADER EPOCH END OFFSET ERROR
101 TARGET-TOPIC 0 0 0
101 TARGET-TOPIC 3 0 0
102 TARGET-TOPIC 1 0 0
102 TARGET-TOPIC 4 0 0
103 TARGET-TOPIC 2 0 0
103 TARGET-TOPIC 5 0 0
[root@xxxx dig]# ./kcl_linux_amd64 group describe -v xxxx
GROUP xxxx
COORDINATOR 103
STATE Stable
BALANCER cooperative-sticky
MEMBERS 3
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG MEMBER-ID CLIENT-ID HOST
TARGET-TOPIC 0 270 270 0 kgo-xxxx kgo /xxxx
TARGET-TOPIC 1 100 100 0 kgo-xxxx kgo /xxxxx
TARGET-TOPIC 2 100 100 0 kgo-xxxx kgo /xxxx
TARGET-TOPIC 3 100 100 0 kgo-xxxx kgo /xxxx
TARGET-TOPIC 4 100 100 0 kgo-xxxx kgo /xxxx
TARGET-TOPIC 5 85 85 0 kgo-xxxx kgo /xxxxx
If some partitions cannot be requested due to metadata load failures, we currently print a log saying some things failed, and then continue on with what was successful. Instead, we can print the failures in our printed table at the end, with the failure in the error column.
Similar to kafkacat, we can take all messages produced during an invocation of kcl produce
and bundle them into a transaction.
As an added bonus, we can have an optional commit-after timer or message count.
Hello. I'm used to using kafka utils from kafka distribution like
./kafka-topics.sh --bootstrap-server hostname:9092 --describe
Is there way to use kcl without creating any config ? Maybe it's not clear for me how to create this config, how to use it. I just want to connect and get what i want from kafka by single command.
Thank you.
kcl should support using the Kafka schema registry to pretty-print consumed records.
We currently abort transactions by inspecting __transaction_state
. We can use new Kafka APIs to provide a faster way of aborting txns.
Hello,
There are a couple of things that would be great to have on this CLI. I'd like to be able to consume from a topic with a "until" setting - this could be either a specific offset or the "end" of the topic partitions (determined by the LSO at the start of the command).
I would also like to be able to produce tombstone messages, which as far as I can tell isn't currently possible. It looks like producing an empty value does not result in a tombstone right now. For example, kafkacat handles this with a -Z
producer flag which translates empty values into nulls.
Are those things you would consider adding or would you consider PRs for these two things?
Currently, we can issue an AlterPartitionReassignments request with something like
kcl admin partas alter 'foo:0->0,1;1->2,3;2->1,3'
The docs mention the ability to cancel a request
If a replica list is empty for a specific partition, this cancels any active
reassignment for that partition
However, it is currently unsupported.
kcl can support consuming from specific timestamps.
The syntax for this should be s@<microsecond>
and e@<microsecond>
(where <microsecond>
refers to a unix microsecond timestamp), and this can be pieced into the existing -o
flag. Both or just one can be specified.
The logic for this is to basically first get the metadata for all consumed topics, and then issue a list offsets request with Timestamp being the start timestamp. While consuming, if the record timestamp for a partition surpasses the end timestamp, stop.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.