I really need a Kafka-Spout to read message from kafka to my storm. I follow the wiki and get the Exception. My code is as follow:
TopologyBuilder builder = new TopologyBuilder();
Config conf = new Config();
conf.put("kafka.spout.topic", "test");
conf.put("kafka.zookeeper.connect", "127.0.0.1:2181");
conf.put("kafka.consumer.timeout.ms", 100);
builder.setSpout("Input", new KafkaSpout(),1);
builder.setBolt("output", new Mybolt(),1).shuffleGrouping("Input");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
And I get this error [java.lang.NoSuchMethodError]:
7494 [Thread-6] INFO backtype.storm.daemon.worker - Worker has topology config {"storm.id" "test-1-1400576467", "dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, "topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization" true, "kafka.zookeeper.connect" "127.0.0.1:2181", "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, "topology.skip.missing.kryo.registrations" true, "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 50, "kafka.consumer.timeout.ms" 100, "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "topology.executor.send.buffer.size" 1024, "storm.local.dir" "/tmp/b4d2a5cc-23ce-4bc3-902c-5e44ab7bbe38", "storm.messaging.netty.buffer_size" 5242880, "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "localhost", "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" nil, "topology.kryo.decorators" (), "kafka.spout.topic" "test", "topology.name" "test", "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, "topology.tasks" nil, "storm.messaging.netty.max_retries" 30, "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", "nimbus.thrift.max_buffer_size" 1048576, "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" (1024 1025 1026), "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, "topology.kryo.register" nil, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" "backtype.storm.serialization.types.ListDelegateSerializer", "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" 5, "storm.thrift.transport" "backtype.storm.security.auth.SimpleTransportPlugin", "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, "storm.messaging.transport" "backtype.storm.messaging.netty.Context", "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", "topology.optimize" true, "topology.max.task.parallelism" nil}
7495 [Thread-6] INFO backtype.storm.daemon.worker - Worker b75052a5-2ccd-4908-bd67-450177112b66 for storm test-1-1400576467 on 39bc6b46-d5d3-473c-8fba-0ef03273b500:1024 has finished loading
7619 [Thread-22-Input] ERROR backtype.storm.util - Async loop died!
java.lang.NoSuchMethodError: scala.Predef$.augmentString(Ljava/lang/String;)Ljava/lang/String;
at kafka.utils.VerifiableProperties.getBoolean(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
at kafka.consumer.ConsumerConfig.(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
at kafka.consumer.ConsumerConfig.(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
at nl.minvenj.nfi.storm.kafka.KafkaSpout.createConsumer(KafkaSpout.java:136) ~[kafka-spout-0.2-SNAPSHOT.jar:na]
at nl.minvenj.nfi.storm.kafka.KafkaSpout.open(KafkaSpout.java:207) ~[kafka-spout-0.2-SNAPSHOT.jar:na]
at backtype.storm.daemon.executor$eval5100$fn__5101$fn__5116.invoke(executor.clj:519) ~[na:na]
at backtype.storm.util$async_loop$fn__390.invoke(util.clj:431) ~[na:na]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
at java.lang.Thread.run(Thread.java:662) [na:1.6.0_26]
7620 [Thread-22-Input] ERROR backtype.storm.daemon.executor -
java.lang.NoSuchMethodError: scala.Predef$.augmentString(Ljava/lang/String;)Ljava/lang/String;
at kafka.utils.VerifiableProperties.getBoolean(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
at kafka.consumer.ConsumerConfig.(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
at kafka.consumer.ConsumerConfig.(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
at nl.minvenj.nfi.storm.kafka.KafkaSpout.createConsumer(KafkaSpout.java:136) ~[kafka-spout-0.2-SNAPSHOT.jar:na]
at nl.minvenj.nfi.storm.kafka.KafkaSpout.open(KafkaSpout.java:207) ~[kafka-spout-0.2-SNAPSHOT.jar:na]
at backtype.storm.daemon.executor$eval5100$fn__5101$fn__5116.invoke(executor.clj:519) ~[na:na]
at backtype.storm.util$async_loop$fn__390.invoke(util.clj:431) ~[na:na]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
at java.lang.Thread.run(Thread.java:662) [na:1.6.0_26]
7671 [Thread-22-Input] INFO backtype.storm.util - Halting process: ("Worker died")
Process finished with exit code 1
Please give me some suggestion, thank you.