Giter Site home page Giter Site logo

kafkat's Introduction

Build Status

kafkat

Simplified command-line administration for Kafka brokers.

Contact

Let us know! If you fork this, or if you use it, or if it helps in anyway, we'd love to hear from you! [email protected]

License & Attributions

This project is released under the Apache License Version 2.0 (APLv2).

How to release

  • update the version number in lib/kafkat/version.rb
  • execute bundle exec rake release

Usage

  • Install the gem.
gem install kafkat
  • Create a new configuration file to match your deployment.
{
  "kafka_path": "/srv/kafka/kafka_2.10-0.8.1.1",
  "log_path": "/mnt/kafka-logs",
  "zk_path": "zk0.foo.ca:2181,zk1.foo.ca:2181,zk2.foo.ca:2181/kafka"
}

Kafkat searches for this file in two places, ~/.kafkatcfg and /etc/kafkatcfg.

  • At any time, you can run kafkat to get a list of available commands and their arguments.
$ kafkat
kafkat 0.0.10: Simplified command-line administration for Kafka brokers
usage: kafkat [command] [options]

Here's a list of supported commands:

  brokers                                                             Print available brokers from Zookeeper.
  clean-indexes                                                       Delete untruncated Kafka log indexes from the filesystem.
  controller                                                          Print the current controller.
  elect-leaders [topic]                                               Begin election of the preferred leaders.
  partitions [topic]                                                  Print partitions by topic.
  partitions [topic] --under-replicated                               Print partitions by topic (only under-replicated).
  partitions [topic] --unavailable                                    Print partitions by topic (only unavailable).
  reassign [topic] [--brokers <ids>] [--replicas <n>]                 Begin reassignment of partitions.
  resign-rewrite <broker id>                                          Forcibly rewrite leaderships to exclude a broker.
  resign-rewrite <broker id> --force                                  Same as above but proceed if there are no available ISRs.
  set-replication-factor [topic] [--newrf <n>] [--brokers id[,id]]    Set the replication factor of
  shutdown <broker id>                                                Gracefully remove leaderships from a broker (requires JMX).
  topics                                                              Print all topics.
  drain <broker id> [--topic <t>] [--brokers <ids>]                   Reassign partitions from a specific broker to other brokers.
  

Important Note

The gem needs read/write access to the Kafka log directory for some operations (clean indexes).

kafkat's People

Contributors

alexism avatar lifeng5042 avatar lucaluo avatar nnalam avatar pjawahar avatar saulius avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafkat's Issues

Kafkat commands cause errors/exception in ZooKeeper.

Hello,

Any command is done with kafkat causes errors/exception in ZooKeeper logs. e.g.

kafkat partitions

Causes in ZK logs:

2017-01-10 15:46:20,636 [myid:1] - WARN  [NIOServerCxn.Factory:/10.0.0.1:2181:NIOServerCnxn@357] - caught end of stream exception
EndOfStreamException: Unable to read additional data from client sessionid 0x158abc892551348, likely client has closed socket
	at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228)
	at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
	at java.lang.Thread.run(Thread.java:745)
2017-01-10 15:46:20,636 [myid:1] - INFO  [NIOServerCxn.Factory:/10.0.0.1:2181:NIOServerCnxn@1007] - Closed socket connection for client /10.0.0.4:37103 which had sessionid 0x158abc892551348
2017-01-10 15:46:20,646 [myid:1] - ERROR [CommitProcessor:1:NIOServerCnxn@178] - Unexpected Exception: 
java.nio.channels.CancelledKeyException
	at sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:73)
	at sun.nio.ch.SelectionKeyImpl.interestOps(SelectionKeyImpl.java:77)
	at org.apache.zookeeper.server.NIOServerCnxn.sendBuffer(NIOServerCnxn.java:151)
	at org.apache.zookeeper.server.NIOServerCnxn.sendResponse(NIOServerCnxn.java:1081)
	at org.apache.zookeeper.server.FinalRequestProcessor.processRequest(FinalRequestProcessor.java:404)
	at org.apache.zookeeper.server.quorum.CommitProcessor.run(CommitProcessor.java:74)

Nodes:

10.0.0.1 => zk node.
10.0.0.4 => kafka/kafkat node.

Versions:

Kafkat (0.3.0)
ZooKeeper (3.4.6)
Kafka (0.10.0.0)

Connection to node -1 could not be established.

When I try to start the docker, this error message appears in the log and it cannot connect to Kafka:

2020-10-15 14:12:38.893 WARN 1 [| kafdrop-admin] o.a.k.c.NetworkClient : [AdminClient clientId=kafdrop-admin] Connection to node -1 (/192.168.37.31:9092) could not be established. Broker may not be available.

I have also tried with localhost:9092 and neither.

I find it strange that "node -1" appears.

[Idea] Add option to display command outputs as JSON

I'm writing a script automating the restart of a kafka cluster, using kafkat restart-cluster at its center. It's calling kafkat commands via subprocesses, and needs to parse the output. It would be super convenient to have a --json (or --format=json) flag we could pass to the command-line to make it output JSON instead of human-readable text.

What do you think?

Thanks!

Shutdown functionality not working on newer Kafka versions

It seems like the following Class is being referred by the 'shutdown' function:
kafka.admin.ShutdownBroker
Which is non-existent on Kafka versions > 0.8.1.1:
0.8.1.1:
$ ls /tmp/kafka_2.10-0.8.1.1/libs*.jar|xargs -I INPUT jar tvf INPUT |grep -i 'kafka/admin/ShutdownBroker\.class' 4650 Tue Apr 22 19:25:10 UTC 2014 kafka/admin/ShutdownBroker.class $
0.11.0.0:
$ ls /tmp/kafka_2.11-0.11.0.0/libs*.jar|xargs -I INPUT jar tvf INPUT |grep -i 'kafka/admin/ShutdownBroker\.class' $
Causing:
Error: Could not find or load main class kafka.admin.ShutdownBroker
While running kafkat with any version of Kafka higher than 0.8.1.1.

Support for multiple clusters

As far as I can tell, there is no way to switch between multiple kafka clusters.
It would be nice if it could switch between clusters.

Add support for showing marked for deletion

kafkat would be great if it could detect if a topic is marked for deletion.

kafka output

$ /bin/kafka-topics.sh --list --zookeeper zoo.example.com/kafka
foo - marked for deletion
bar

kafkat output

$ kafkat topics
foo
bar

This is on kafka 8.2 which has improved support for deleting topics

`shutdown` command does not work

Using kafkat version 0.0.10:

$ kafkat | grep kafkat
kafkat 0.0.10: Simplified command-line administration for Kafka brokers
usage: kafkat [command] [options]

and given a cluster with brokers [0,1,2], kafkat shutdown always fails:

$ kafkat shutdown 2
This operation gracefully removes leaderships from broker '2'.
Proceed (y/n)?
y

Beginning shutdown.
Error: Could not find or load main class kafka.admin.ShutdownBroker

Kafka is at version 0.8.2.1.

Trollop has been deprecated

Trollop has been deprecated and is being replace with optimist. Every time I run kafkat I get the error:

[DEPRECATION] This gem has been renamed to optimist and will no longer be supported. Please switch to optimist as soon as possible.

justified output needs some spaces between columns

With long topic names, adjacent columns run into each other. For example, from kafkat set-replication-factor:

very.very.very.very.very.very.very.long.name64          [10130493, 263605528]
very.very.very.very.very.very.very.long.name65          [101304171, 382818071]

kafkat command not available

After installing kafkat using gem install, I try to run the command kafkat and I get an error saying it doesn't exist. What am I missing here?

Kafkat is too slow on large clusters/topics with large number of partitions

I believe this piece is to blame (lib/kafkat/interface/zookeeper.rb):

        partitions = []
        topic_string = pool.with_connection { |cnx| cnx.get(path1).first }
        partition_ids = pool.with_connection { |cnx| cnx.children(path2) }

        topic_json = JSON.parse(topic_string)

        threads = partition_ids.map do |id|
          id = id.to_i

          Thread.new do
            path3 = topic_partition_state_path(name, id)
            partition_string = pool.with_connection { |cnx| cnx.get(path3).first }
            partition_json = JSON.parse(partition_string)
            replicas = topic_json['partitions'][id.to_s]
            leader = partition_json['leader']
            isr = partition_json['isr']

            partition_queue << Partition.new(name, id, replicas, leader, isr)
          end
        end
        threads.map(&:join)

It basically creates a thread for every partition it has to work on. That feels a bit an overkill. And poses certain problems spawning 500 threads when you work on a topic with 500 partitions.

Reassign failed with error

kafkat reassign command failed on Cloudera Kafka 0.8.2.0-1.kafka1.3.1.p0.9

Beginning.
Exception in thread "main" java.lang.UnsupportedClassVersionError: org/apache/zookeeper/Watcher : Unsupported major.minor version 51.0
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClassCond(ClassLoader.java:631)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:615)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:283)
    at java.net.URLClassLoader.access$000(URLClassLoader.java:58)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:197)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClassCond(ClassLoader.java:631)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:615)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:283)
    at java.net.URLClassLoader.access$000(URLClassLoader.java:58)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:197)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
    at kafka.admin.ReassignPartitionsCommand$.main(ReassignPartitionsCommand.scala:40)
    at kafka.admin.ReassignPartitionsCommand.main(ReassignPartitionsCommand.scala)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.