Giter Site home page Giter Site logo

kafka-spout's Introduction

This project has been archived and is no longer being maintained.

Build Status (development branch) Coverage Status (development branch)

Kafka spout

Storm spout implementation reading messages from a kafka topic and emits these as single field tuples into a storm topology. Documentation is available on the wiki.

Development

This implementation was created by the Netherlands Forensic Institute and is still under development. Contributions are welcome, please read the contribution guidelines.

License

This work is licensed under the Apache License, Version 2.0. See LICENSE for details.

kafka-spout's People

Contributors

akaidiot avatar jramsdale avatar sandonjacobs avatar sherlocknl avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-spout's Issues

Update to 'new' kafka client API

In updating the kafka dependency version, many if not all of the client code this project uses have been deprecated. The spout should be reading messages from kafka using the new API.

Kafka-storm

I am fetching the twitter tweets using twitter4j API, I am able fetch the tweets but as soon as it starts fetching it shows
2015-06-19 11:55:00 INFO executor:0 - Processing received message source: Preprocesing:4, stream: __ack_ack, id: {}, [5974683017019096545 -7460390597578563993]
2015-06-19 11:55:00 DEBUG ClientCnxn:717 - Got ping response for sessionid: 0x14e0a7dbb2f0001 after 132ms
2015-06-19 11:55:00 INFO executor:0 - Processing received message source: Preprocesing:6, stream: __ack_ack, id: {}, [-1906696367418784568 -8126699447307841495]
2015-06-19 11:55:00 INFO executor:0 - Processing received message source: Preprocesing:6, stream: _ack_ack, id: {}, [1034666498420518616 -5290856887183245792]
2015-06-19 11:55:00 ERROR util:0 - Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
at backtype.storm.util$exit_process_BANG
.doInvoke(util.clj:325)
at clojure.lang.RestFn.invoke(RestFn.java:423)
at backtype.storm.daemon.worker$fn__4693$fn__4694.invoke(worker.clj:491)
at backtype.storm.daemon.executor$mk_executor_data$fn__3272$fn__3273.invoke(executor.clj:240)
at backtype.storm.util$async_loop$fn__460.invoke(util.clj:473)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:745)
2015-06-19 11:55:00 INFO task:0 - Emitting direct: 1; __acker __ack_ack [1034666498420518616]
2015-06-19 11:55:00 INFO task:0 - Emitting: Preprocesing default [611760453593788416, personalized fishing lure hand stamped anniversary gift birthday gift great,

Maven repository

Can you update the maven repository with the latest source code?

Will HolmesNL/Kafka-Spout do support for Storm Clusters?

I am already using nathanmarz/storm-contrib/storm-kafka, It is working fine for me as of now. Recently I heard about HolmesNL/Kafka-spout. I wanna clarify, whether Kakfa-Spout will support for Storm Clusters. If It will support, I need a simple document with example as you have already give examples for local cluster. Please do the need full.
Thank you in advance ๐Ÿ‘

Submitting in distributed storm cluster- Netty reconnect issue

Hi, i am trying to submit a topology based on your project . I seem to do fine in localcluster however, in the distributed mode the workers are not starting and i am getting an error "Netty reconnect workername" over n over again. can you pls help?

Always error whenever I use this spout, java.lang.NoSuchMethodError

I really need a Kafka-Spout to read message from kafka to my storm. I follow the wiki and get the Exception. My code is as follow:

TopologyBuilder builder = new TopologyBuilder();
Config conf = new Config();
conf.put("kafka.spout.topic", "test");
conf.put("kafka.zookeeper.connect", "127.0.0.1:2181");
conf.put("kafka.consumer.timeout.ms", 100);

    builder.setSpout("Input", new KafkaSpout(),1);
    builder.setBolt("output", new Mybolt(),1).shuffleGrouping("Input");

    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("test", conf, builder.createTopology());

And I get this error [java.lang.NoSuchMethodError]:

7494 [Thread-6] INFO backtype.storm.daemon.worker - Worker has topology config {"storm.id" "test-1-1400576467", "dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, "topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization" true, "kafka.zookeeper.connect" "127.0.0.1:2181", "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, "topology.skip.missing.kryo.registrations" true, "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 50, "kafka.consumer.timeout.ms" 100, "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "topology.executor.send.buffer.size" 1024, "storm.local.dir" "/tmp/b4d2a5cc-23ce-4bc3-902c-5e44ab7bbe38", "storm.messaging.netty.buffer_size" 5242880, "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "localhost", "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" nil, "topology.kryo.decorators" (), "kafka.spout.topic" "test", "topology.name" "test", "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, "topology.tasks" nil, "storm.messaging.netty.max_retries" 30, "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", "nimbus.thrift.max_buffer_size" 1048576, "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" (1024 1025 1026), "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, "topology.kryo.register" nil, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" "backtype.storm.serialization.types.ListDelegateSerializer", "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" 5, "storm.thrift.transport" "backtype.storm.security.auth.SimpleTransportPlugin", "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, "storm.messaging.transport" "backtype.storm.messaging.netty.Context", "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", "topology.optimize" true, "topology.max.task.parallelism" nil}
7495 [Thread-6] INFO backtype.storm.daemon.worker - Worker b75052a5-2ccd-4908-bd67-450177112b66 for storm test-1-1400576467 on 39bc6b46-d5d3-473c-8fba-0ef03273b500:1024 has finished loading
7619 [Thread-22-Input] ERROR backtype.storm.util - Async loop died!
java.lang.NoSuchMethodError: scala.Predef$.augmentString(Ljava/lang/String;)Ljava/lang/String;
at kafka.utils.VerifiableProperties.getBoolean(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
at kafka.consumer.ConsumerConfig.(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
at kafka.consumer.ConsumerConfig.(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
at nl.minvenj.nfi.storm.kafka.KafkaSpout.createConsumer(KafkaSpout.java:136) ~[kafka-spout-0.2-SNAPSHOT.jar:na]
at nl.minvenj.nfi.storm.kafka.KafkaSpout.open(KafkaSpout.java:207) ~[kafka-spout-0.2-SNAPSHOT.jar:na]
at backtype.storm.daemon.executor$eval5100$fn__5101$fn__5116.invoke(executor.clj:519) ~[na:na]
at backtype.storm.util$async_loop$fn__390.invoke(util.clj:431) ~[na:na]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
at java.lang.Thread.run(Thread.java:662) [na:1.6.0_26]
7620 [Thread-22-Input] ERROR backtype.storm.daemon.executor -
java.lang.NoSuchMethodError: scala.Predef$.augmentString(Ljava/lang/String;)Ljava/lang/String;
at kafka.utils.VerifiableProperties.getBoolean(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
at kafka.consumer.ConsumerConfig.(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
at kafka.consumer.ConsumerConfig.(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
at nl.minvenj.nfi.storm.kafka.KafkaSpout.createConsumer(KafkaSpout.java:136) ~[kafka-spout-0.2-SNAPSHOT.jar:na]
at nl.minvenj.nfi.storm.kafka.KafkaSpout.open(KafkaSpout.java:207) ~[kafka-spout-0.2-SNAPSHOT.jar:na]
at backtype.storm.daemon.executor$eval5100$fn__5101$fn__5116.invoke(executor.clj:519) ~[na:na]
at backtype.storm.util$async_loop$fn__390.invoke(util.clj:431) ~[na:na]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
at java.lang.Thread.run(Thread.java:662) [na:1.6.0_26]
7671 [Thread-22-Input] INFO backtype.storm.util - Halting process: ("Worker died")

Process finished with exit code 1

Please give me some suggestion, thank you.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.