I am using Wurstmeister's spout 0.4.0.
I configured my kafka spout like the following:
I used wirbelsturm to deploy zookeeper, storm and kafka.
2014-03-28 10:23:19 o.a.z.ZooKeeper [INFO] Client environment:zookeeper.version=3.3.3-1073969, built on 02/23/2011 22:27 GMT
2014-03-28 10:23:19 o.a.z.ZooKeeper [INFO] Client environment:host.name=supervisor1
2014-03-28 10:23:19 o.a.z.ZooKeeper [INFO] Client environment:java.version=1.6.0_30
2014-03-28 10:23:19 o.a.z.ZooKeeper [INFO] Client environment:java.vendor=Sun Microsystems Inc.
2014-03-28 10:23:19 o.a.z.ZooKeeper [INFO] Client environment:java.home=/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.0.x86_64/jre
2014-03-28 10:23:19 o.a.z.ZooKeeper [INFO] Client environment:java.class.path=/opt/storm/lib/junit-3.8.1.jar:/opt/storm/lib/minlog-1.2.jar:/opt/storm/lib/tools.macro-0.1.0.jar:/opt/storm/lib/servlet-api-2.5-20081211.jar:/opt/storm/lib/carbonite-1.3.2.jar:/opt/storm/lib/compojure-1.1.3.jar:/opt/storm/lib/clojure-1.4.0.jar:/opt/storm/lib/kryo-2.17.jar:/opt/storm/lib/tools.logging-0.2.3.jar:/opt/storm/lib/slf4j-api-1.6.5.jar:/opt/storm/lib/log4j-over-slf4j-1.6.6.jar:/opt/storm/lib/commons-logging-1.1.1.jar:/opt/storm/lib/clj-stacktrace-0.2.4.jar:/opt/storm/lib/httpcore-4.1.jar:/opt/storm/lib/hiccup-0.3.6.jar:/opt/storm/lib/netty-3.6.3.Final.jar:/opt/storm/lib/jgrapht-core-0.9.0.jar:/opt/storm/lib/commons-exec-1.1.jar:/opt/storm/lib/servlet-api-2.5.jar:/opt/storm/lib/ring-servlet-0.3.11.jar:/opt/storm/lib/clj-time-0.4.1.jar:/opt/storm/lib/objenesis-1.2.jar:/opt/storm/lib/jline-2.11.jar:/opt/storm/lib/meat-locker-0.3.1.jar:/opt/storm/lib/logback-classic-1.0.6.jar:/opt/storm/lib/json-simple-1.1.jar:/opt/storm/lib/curator-framework-1.0.1.jar:/opt/storm/lib/commons-io-1.4.jar:/opt/storm/lib/curator-client-1.0.1.jar:/opt/storm/lib/guava-13.0.jar:/opt/storm/lib/storm-core-0.9.1-incubating.jar:/opt/storm/lib/commons-fileupload-1.2.1.jar:/opt/storm/lib/logback-core-1.0.6.jar:/opt/storm/lib/math.numeric-tower-0.0.1.jar:/opt/storm/lib/snakeyaml-1.11.jar:/opt/storm/lib/tools.cli-0.2.2.jar:/opt/storm/lib/core.incubator-0.1.0.jar:/opt/storm/lib/zookeeper-3.3.3.jar:/opt/storm/lib/httpclient-4.1.1.jar:/opt/storm/lib/clout-1.0.1.jar:/opt/storm/lib/jetty-6.1.26.jar:/opt/storm/lib/ring-devel-0.3.11.jar:/opt/storm/lib/asm-4.0.jar:/opt/storm/lib/ring-jetty-adapter-0.3.11.jar:/opt/storm/lib/commons-codec-1.4.jar:/opt/storm/lib/ring-core-1.1.5.jar:/opt/storm/lib/joda-time-2.0.jar:/opt/storm/lib/commons-lang-2.5.jar:/opt/storm/lib/reflectasm-1.07-shaded.jar:/opt/storm/lib/jetty-util-6.1.26.jar:/opt/storm/lib/disruptor-2.10.1.jar:/opt/storm/conf:/app/storm/supervisor/stormdist/FirehoseTopology_DEV-1-1396002185/stormjar.jar
2014-03-28 10:23:19 o.a.z.ZooKeeper [INFO] Client environment:java.library.path=/usr/local/lib:/opt/local/lib:/usr/lib
2014-03-28 10:23:19 o.a.z.ZooKeeper [INFO] Client environment:java.io.tmpdir=/tmp
2014-03-28 10:23:19 o.a.z.ZooKeeper [INFO] Client environment:java.compiler=<NA>
2014-03-28 10:23:19 o.a.z.ZooKeeper [INFO] Client environment:os.name=Linux
2014-03-28 10:23:19 o.a.z.ZooKeeper [INFO] Client environment:os.arch=amd64
2014-03-28 10:23:19 o.a.z.ZooKeeper [INFO] Client environment:os.version=2.6.32-431.el6.x86_64
2014-03-28 10:23:19 o.a.z.ZooKeeper [INFO] Client environment:user.name=storm
2014-03-28 10:23:19 o.a.z.ZooKeeper [INFO] Client environment:user.home=/home/storm
2014-03-28 10:23:19 o.a.z.ZooKeeper [INFO] Client environment:user.dir=/
2014-03-28 10:23:19 o.a.z.s.ZooKeeperServer [INFO] Server environment:zookeeper.version=3.3.3-1073969, built on 02/23/2011 22:27 GMT
2014-03-28 10:23:19 o.a.z.s.ZooKeeperServer [INFO] Server environment:host.name=supervisor1
2014-03-28 10:23:19 o.a.z.s.ZooKeeperServer [INFO] Server environment:java.version=1.6.0_30
2014-03-28 10:23:19 o.a.z.s.ZooKeeperServer [INFO] Server environment:java.vendor=Sun Microsystems Inc.
2014-03-28 10:23:19 o.a.z.s.ZooKeeperServer [INFO] Server environment:java.home=/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.0.x86_64/jre
2014-03-28 10:23:19 o.a.z.s.ZooKeeperServer [INFO] Server environment:java.class.path=/opt/storm/lib/junit-3.8.1.jar:/opt/storm/lib/minlog-1.2.jar:/opt/storm/lib/tools.macro-0.1.0.jar:/opt/storm/lib/servlet-api-2.5-20081211.jar:/opt/storm/lib/carbonite-1.3.2.jar:/opt/storm/lib/compojure-1.1.3.jar:/opt/storm/lib/clojure-1.4.0.jar:/opt/storm/lib/kryo-2.17.jar:/opt/storm/lib/tools.logging-0.2.3.jar:/opt/storm/lib/slf4j-api-1.6.5.jar:/opt/storm/lib/log4j-over-slf4j-1.6.6.jar:/opt/storm/lib/commons-logging-1.1.1.jar:/opt/storm/lib/clj-stacktrace-0.2.4.jar:/opt/storm/lib/httpcore-4.1.jar:/opt/storm/lib/hiccup-0.3.6.jar:/opt/storm/lib/netty-3.6.3.Final.jar:/opt/storm/lib/jgrapht-core-0.9.0.jar:/opt/storm/lib/commons-exec-1.1.jar:/opt/storm/lib/servlet-api-2.5.jar:/opt/storm/lib/ring-servlet-0.3.11.jar:/opt/storm/lib/clj-time-0.4.1.jar:/opt/storm/lib/objenesis-1.2.jar:/opt/storm/lib/jline-2.11.jar:/opt/storm/lib/meat-locker-0.3.1.jar:/opt/storm/lib/logback-classic-1.0.6.jar:/opt/storm/lib/json-simple-1.1.jar:/opt/storm/lib/curator-framework-1.0.1.jar:/opt/storm/lib/commons-io-1.4.jar:/opt/storm/lib/curator-client-1.0.1.jar:/opt/storm/lib/guava-13.0.jar:/opt/storm/lib/storm-core-0.9.1-incubating.jar:/opt/storm/lib/commons-fileupload-1.2.1.jar:/opt/storm/lib/logback-core-1.0.6.jar:/opt/storm/lib/math.numeric-tower-0.0.1.jar:/opt/storm/lib/snakeyaml-1.11.jar:/opt/storm/lib/tools.cli-0.2.2.jar:/opt/storm/lib/core.incubator-0.1.0.jar:/opt/storm/lib/zookeeper-3.3.3.jar:/opt/storm/lib/httpclient-4.1.1.jar:/opt/storm/lib/clout-1.0.1.jar:/opt/storm/lib/jetty-6.1.26.jar:/opt/storm/lib/ring-devel-0.3.11.jar:/opt/storm/lib/asm-4.0.jar:/opt/storm/lib/ring-jetty-adapter-0.3.11.jar:/opt/storm/lib/commons-codec-1.4.jar:/opt/storm/lib/ring-core-1.1.5.jar:/opt/storm/lib/joda-time-2.0.jar:/opt/storm/lib/commons-lang-2.5.jar:/opt/storm/lib/reflectasm-1.07-shaded.jar:/opt/storm/lib/jetty-util-6.1.26.jar:/opt/storm/lib/disruptor-2.10.1.jar:/opt/storm/conf:/app/storm/supervisor/stormdist/FirehoseTopology_DEV-1-1396002185/stormjar.jar
2014-03-28 10:23:19 o.a.z.s.ZooKeeperServer [INFO] Server environment:java.library.path=/usr/local/lib:/opt/local/lib:/usr/lib
2014-03-28 10:23:19 o.a.z.s.ZooKeeperServer [INFO] Server environment:java.io.tmpdir=/tmp
2014-03-28 10:23:19 o.a.z.s.ZooKeeperServer [INFO] Server environment:java.compiler=<NA>
2014-03-28 10:23:19 o.a.z.s.ZooKeeperServer [INFO] Server environment:os.name=Linux
2014-03-28 10:23:19 o.a.z.s.ZooKeeperServer [INFO] Server environment:os.arch=amd64
2014-03-28 10:23:19 o.a.z.s.ZooKeeperServer [INFO] Server environment:os.version=2.6.32-431.el6.x86_64
2014-03-28 10:23:19 o.a.z.s.ZooKeeperServer [INFO] Server environment:user.name=storm
2014-03-28 10:23:19 o.a.z.s.ZooKeeperServer [INFO] Server environment:user.home=/home/storm
2014-03-28 10:23:19 o.a.z.s.ZooKeeperServer [INFO] Server environment:user.dir=/
2014-03-28 10:23:23 b.s.d.worker [INFO] Launching worker for FirehoseTopology_DEV-1-1396002185 on e1364c61-e842-40f5-956e-a7cc27adc9d8:6701 with id 431a0794-40a2-4f07-b12f-5e4896297607 and conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, "topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization" true, "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 5000, "topology.skip.missing.kryo.registrations" false, "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx256m -Djava.net.preferIPv4Stack=true", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 500, "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m -Djava.net.preferIPv4Stack=true", "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "topology.executor.send.buffer.size" 1024, "storm.local.dir" "/app/storm", "storm.messaging.netty.buffer_size" 5242880, "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "nimbus1", "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2181, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, "storm.zookeeper.servers" ["zookeeper1"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" nil, "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx256m -Djava.net.preferIPv4Stack=true", "supervisor.heartbeat.frequency.secs" 5, "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, "topology.tasks" nil, "storm.messaging.netty.max_retries" 30, "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", "nimbus.thrift.max_buffer_size" 1048576, "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" [6700 6701], "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx256m -Djava.net.preferIPv4Stack=true", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" "backtype.storm.serialization.types.ListDelegateSerializer", "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" 5, "storm.thrift.transport" "backtype.storm.security.auth.SimpleTransportPlugin", "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, "storm.messaging.transport" "backtype.storm.messaging.netty.Context", "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, "drpc.request.timeout.secs" 600, "storm.local.hostname" "supervisor1", "storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts" "-Xmx256m -Djava.net.preferIPv4Stack=true", "storm.cluster.mode" "distributed", "topology.optimize" true, "topology.max.task.parallelism" nil}
2014-03-28 10:23:23 c.n.c.f.i.CuratorFrameworkImpl [INFO] Starting
2014-03-28 10:23:23 o.a.z.ZooKeeper [INFO] Initiating client connection, connectString=zookeeper1:2181 sessionTimeout=20000 watcher=com.netflix.curator.ConnectionState@241bff0d
2014-03-28 10:23:24 o.a.z.ClientCnxn [INFO] Opening socket connection to server zookeeper1/10.0.0.241:2181
2014-03-28 10:23:24 o.a.z.ClientCnxn [INFO] Socket connection established to zookeeper1/10.0.0.241:2181, initiating session
2014-03-28 10:23:24 o.a.z.ClientCnxn [INFO] Session establishment complete on server zookeeper1/10.0.0.241:2181, sessionid = 0x14508151e480009, negotiated timeout = 20000
2014-03-28 10:23:24 b.s.zookeeper [INFO] Zookeeper state update: :connected:none
2014-03-28 10:23:24 o.a.z.ClientCnxn [INFO] EventThread shut down
2014-03-28 10:23:24 o.a.z.ZooKeeper [INFO] Session: 0x14508151e480009 closed
2014-03-28 10:23:24 c.n.c.f.i.CuratorFrameworkImpl [INFO] Starting
2014-03-28 10:23:24 o.a.z.ZooKeeper [INFO] Initiating client connection, connectString=zookeeper1:2181/storm sessionTimeout=20000 watcher=com.netflix.curator.ConnectionState@bdaf1cd
2014-03-28 10:23:24 o.a.z.ClientCnxn [INFO] Opening socket connection to server zookeeper1/10.0.0.241:2181
2014-03-28 10:23:24 o.a.z.ClientCnxn [INFO] Socket connection established to zookeeper1/10.0.0.241:2181, initiating session
2014-03-28 10:23:24 o.a.z.ClientCnxn [INFO] Session establishment complete on server zookeeper1/10.0.0.241:2181, sessionid = 0x14508151e48000a, negotiated timeout = 20000
2014-03-28 10:23:24 b.s.m.TransportFactory [INFO] Storm peer transport plugin:backtype.storm.messaging.netty.Context
2014-03-28 10:23:25 b.s.d.executor [INFO] Loading executor kafkaSpout:[2 2]
2014-03-28 10:23:25 b.s.d.executor [INFO] Loaded executor tasks kafkaSpout:[2 2]
2014-03-28 10:23:25 b.s.d.executor [INFO] Finished loading executor kafkaSpout:[2 2]
2014-03-28 10:23:25 b.s.d.executor [INFO] Opening spout kafkaSpout:(2)
2014-03-28 10:23:25 b.s.d.executor [INFO] Loading executor __system:[-1 -1]
2014-03-28 10:23:25 b.s.d.executor [INFO] Loaded executor tasks __system:[-1 -1]
2014-03-28 10:23:25 c.n.c.f.i.CuratorFrameworkImpl [INFO] Starting
2014-03-28 10:23:25 o.a.z.ZooKeeper [INFO] Initiating client connection, connectString=zookeeper1:2181, sessionTimeout=20000 watcher=com.netflix.curator.ConnectionState@4be3eb6a
2014-03-28 10:23:25 o.a.z.ClientCnxn [INFO] Opening socket connection to server zookeeper1/10.0.0.241:2181
2014-03-28 10:23:25 b.s.d.executor [INFO] Finished loading executor __system:[-1 -1]
2014-03-28 10:23:25 o.a.z.ClientCnxn [INFO] Socket connection established to zookeeper1/10.0.0.241:2181, initiating session
2014-03-28 10:23:25 o.a.z.ClientCnxn [INFO] Session establishment complete on server zookeeper1/10.0.0.241:2181, sessionid = 0x14508151e48000b, negotiated timeout = 20000
2014-03-28 10:23:25 b.s.d.executor [INFO] Preparing bolt __system:(-1)
2014-03-28 10:23:25 c.n.c.f.i.CuratorFrameworkImpl [INFO] Starting
2014-03-28 10:23:25 o.a.z.ZooKeeper [INFO] Initiating client connection, connectString=zookeeper1:2181 sessionTimeout=20000 watcher=com.netflix.curator.ConnectionState@17fca9f9
2014-03-28 10:23:25 b.s.d.executor [INFO] Prepared bolt __system:(-1)
2014-03-28 10:23:25 o.a.z.ClientCnxn [INFO] Opening socket connection to server zookeeper1/10.0.0.241:2181
2014-03-28 10:23:25 b.s.d.executor [INFO] Loading executor __acker:[1 1]
2014-03-28 10:23:25 b.s.d.executor [INFO] Loaded executor tasks __acker:[1 1]
2014-03-28 10:23:25 o.a.z.ClientCnxn [INFO] Socket connection established to zookeeper1/10.0.0.241:2181, initiating session
2014-03-28 10:23:25 b.s.d.executor [INFO] Timeouts disabled for executor __acker:[1 1]
2014-03-28 10:23:25 b.s.d.executor [INFO] Finished loading executor __acker:[1 1]
2014-03-28 10:23:25 o.a.z.ClientCnxn [INFO] Session establishment complete on server zookeeper1/10.0.0.241:2181, sessionid = 0x14508151e48000c, negotiated timeout = 20000
2014-03-28 10:23:25 b.s.d.worker [INFO] Launching receive-thread for e1364c61-e842-40f5-956e-a7cc27adc9d8:6701
2014-03-28 10:23:25 b.s.d.executor [INFO] Preparing bolt __acker:(1)
2014-03-28 10:23:25 s.k.DynamicBrokersReader [INFO] Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=kafka1:9092}}
2014-03-28 10:23:25 c.n.c.f.i.CuratorFrameworkImpl [INFO] Starting
2014-03-28 10:23:25 o.a.z.ZooKeeper [INFO] Initiating client connection, connectString=zookeeper1:2181 sessionTimeout=20000 watcher=com.netflix.curator.ConnectionState@11499ff
2014-03-28 10:23:25 o.a.z.ClientCnxn [INFO] Opening socket connection to server zookeeper1/10.0.0.241:2181
2014-03-28 10:23:25 o.a.z.ClientCnxn [INFO] Socket connection established to zookeeper1/10.0.0.241:2181, initiating session
2014-03-28 10:23:25 b.s.d.executor [INFO] Prepared bolt __acker:(1)
2014-03-28 10:23:25 o.a.z.ClientCnxn [INFO] Session establishment complete on server zookeeper1/10.0.0.241:2181, sessionid = 0x14508151e48000d, negotiated timeout = 20000
2014-03-28 10:23:25 b.s.d.executor [INFO] Opened spout kafkaSpout:(2)
2014-03-28 10:23:25 b.s.d.worker [INFO] Worker has topology config {"storm.id" "FirehoseTopology_DEV-1-1396002185", "dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, "topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization" true, "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 5000, "topology.skip.missing.kryo.registrations" false, "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx256m -Djava.net.preferIPv4Stack=true", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 500, "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m -Djava.net.preferIPv4Stack=true", "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "topology.executor.send.buffer.size" 1024, "storm.local.dir" "/app/storm", "storm.messaging.netty.buffer_size" 5242880, "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "nimbus1", "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2181, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, "storm.zookeeper.servers" ["zookeeper1"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" nil, "topology.kryo.decorators" (), "topology.name" "FirehoseTopology_DEV", "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx256m -Djava.net.preferIPv4Stack=true", "supervisor.heartbeat.frequency.secs" 5, "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, "topology.tasks" nil, "storm.messaging.netty.max_retries" 30, "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", "nimbus.thrift.max_buffer_size" 1048576, "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" [6700 6701], "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, "topology.kryo.register" nil, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx256m -Djava.net.preferIPv4Stack=true", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" "backtype.storm.serialization.types.ListDelegateSerializer", "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" 5, "storm.thrift.transport" "backtype.storm.security.auth.SimpleTransportPlugin", "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, "storm.messaging.transport" "backtype.storm.messaging.netty.Context", "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, "drpc.request.timeout.secs" 600, "storm.local.hostname" "supervisor1", "storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts" "-Xmx256m -Djava.net.preferIPv4Stack=true", "storm.cluster.mode" "distributed", "topology.optimize" true, "topology.max.task.parallelism" nil}
2014-03-28 10:23:25 b.s.d.executor [INFO] Activating spout kafkaSpout:(2)
2014-03-28 10:23:25 s.k.ZkCoordinator [INFO] Refreshing partition manager connections
2014-03-28 10:23:25 b.s.d.worker [INFO] Worker 431a0794-40a2-4f07-b12f-5e4896297607 for storm FirehoseTopology_DEV-1-1396002185 on e1364c61-e842-40f5-956e-a7cc27adc9d8:6701 has finished loading
2014-03-28 10:23:25 s.k.DynamicBrokersReader [INFO] Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=kafka1:9092}}
2014-03-28 10:23:25 s.k.ZkCoordinator [INFO] Deleted partition managers: []
2014-03-28 10:23:25 s.k.ZkCoordinator [INFO] New partition managers: [Partition{host=kafka1:9092, partition=0}]
2014-03-28 10:23:26 s.k.PartitionManager [INFO] Read partition information from: /stormKafka/StormConsumer/partition_0 --> null
2014-03-28 10:23:26 b.s.util [ERROR] Async loop died!
java.lang.RuntimeException: java.nio.channels.UnresolvedAddressException
at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:83) ~[stormjar.jar:na]
at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:45) ~[stormjar.jar:na]
at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:118) ~[stormjar.jar:na]
at backtype.storm.daemon.executor$eval3848$fn__3849$fn__3864$fn__3893.invoke(executor.clj:562) ~[na:na]
at backtype.storm.util$async_loop$fn__384.invoke(util.clj:433) ~[na:na]
at clojure.lang.AFn.run(AFn.java:24) ~[clojure-1.4.0.jar:na]
at java.lang.Thread.run(Thread.java:701) ~[na:1.6.0_30]
Caused by: java.nio.channels.UnresolvedAddressException: null
at sun.nio.ch.Net.checkAddress(Net.java:89) ~[na:1.6.0_30]
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:514) ~[na:1.6.0_30]
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) ~[stormjar.jar:na]
at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44) ~[stormjar.jar:na]
at kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:129) ~[stormjar.jar:na]
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69) ~[stormjar.jar:na]
at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125) ~[stormjar.jar:na]
at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:80) ~[stormjar.jar:na]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:55) ~[stormjar.jar:na]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:45) ~[stormjar.jar:na]
at storm.kafka.PartitionManager.<init>(PartitionManager.java:77) ~[stormjar.jar:na]
at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:78) ~[stormjar.jar:na]
... 6 common frames omitted
2014-03-28 10:23:26 b.s.d.executor [ERROR]
java.lang.RuntimeException: java.nio.channels.UnresolvedAddressException
at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:83) ~[stormjar.jar:na]
at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:45) ~[stormjar.jar:na]
at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:118) ~[stormjar.jar:na]
at backtype.storm.daemon.executor$eval3848$fn__3849$fn__3864$fn__3893.invoke(executor.clj:562) ~[na:na]
at backtype.storm.util$async_loop$fn__384.invoke(util.clj:433) ~[na:na]
at clojure.lang.AFn.run(AFn.java:24) ~[clojure-1.4.0.jar:na]
at java.lang.Thread.run(Thread.java:701) ~[na:1.6.0_30]
Caused by: java.nio.channels.UnresolvedAddressException: null
at sun.nio.ch.Net.checkAddress(Net.java:89) ~[na:1.6.0_30]
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:514) ~[na:1.6.0_30]
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) ~[stormjar.jar:na]
at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44) ~[stormjar.jar:na]
at kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:129) ~[stormjar.jar:na]
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69) ~[stormjar.jar:na]
at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125) ~[stormjar.jar:na]
at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:80) ~[stormjar.jar:na]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:55) ~[stormjar.jar:na]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:45) ~[stormjar.jar:na]
at storm.kafka.PartitionManager.<init>(PartitionManager.java:77) ~[stormjar.jar:na]
at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:78) ~[stormjar.jar:na]
... 6 common frames omitted
2014-03-28 10:23:26 b.s.util [INFO] Halting process: ("Worker died")
Actually, I am not sure it is a wirbelsturm issue, the problem probably comes from here:
But my topology was working when I used a local(storm)cluster with a remote server who shared kafka and zookeeper...