Giter Site home page Giter Site logo

flume-kafka's Introduction

Since this plugin for flume is going to merge into flume, I've splited this plugin to two dependent plugin flumg-ng-kafka-source and flume-ng-kafka-sink. ASFv2 branch is okay, but I advised to use new plugin.

Flume-ng Kafka

This project is used for flume-ng to communicate with kafka 0.7,2.

For v0.2 now, I think the parameters pass to flume-kafka need to be handled by users, not by code. Before this version, I add many parameters of kafka and their default value in code. That is to say, whatever parameters you write in conf file, they will be passed to Kafka producer or consumers. I cannot control if the parameters you wrote will take effect. The responsibilites for using correct parameters or find out what parameters to use, in my opinion, are yours.

On the other hand, it is simple if Kafka add some new parameters:).

Configuration of Kafka Sink

agent_log.sinks.kafka.type = com.vipshop.flume.sink.kafka.KafkaSink
agent_log.sinks.kafka.channel = all_channel
agent_log.sinks.kafka.zk.connect = 127.0.0.1:2181
agent_log.sinks.kafka.topic = all
agent_log.sinks.kafka.batchsize = 200
agent_log.sinks.kafka.producer.type = async
agent_log.sinks.kafka.serializer.class = kafka.serializer.StringEncoder

Configuration of Kafka Source

agent_log.sources.kafka0.type = com.vipshop.flume.source.kafka.KafkaSource
agent_log.sources.kafka0.zk.connect = 127.0.0.1:2181
agent_log.sources.kafka0.topic = all
agent_log.sources.kafka0.groupid = es
agent_log.sources.kafka0.channels = channel0

Speical Thanks

In fact I'm a newbie in Java. I have learnt a lot from flumg-ng-rabbitmq. Thanks to Jeremy Custenborder.

flume-kafka's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

flume-kafka's Issues

kakfa 0.8 support

I'd like to ask if 0.8 support is existent for this. From the issues I've read, it sounds like there is a branch that exists that supports 0.8. Is this true?

Zkclient depency

Hello,

It seems that the project is missing the dependency on zkclient which is not included in the pom.xml

By the way when I try the flume sink I get this error (zookeeper connection looks ok):

02 Sep 2013 15:36:45,519 ERROR conf-file-poller-0 - Sink loggerSink has been removed due to an error during configuration
java.lang.NumberFormatException: null
at java.lang.Integer.parseInt(Integer.java:454)
at java.lang.Integer.parseInt(Integer.java:527)
at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:207)
at scala.collection.immutable.StringOps.toInt(StringOps.scala:31)
at kafka.producer.ZKBrokerPartitionInfo$$anonfun$kafka$producer$ZKBrokerPartitionInfo$$getZKTopicPartitionInfo$1$$anonfun$5.apply(ZKBrokerPartitionInfo.scala:167)
at kafka.producer.ZKBrokerPartitionInfo$$anonfun$kafka$producer$ZKBrokerPartitionInfo$$getZKTopicPartitionInfo$1$$anonfun$5.apply(ZKBrokerPartitionInfo.scala:167)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:474)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
at scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:521)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
at scala.collection.JavaConversions$JListWrapper.map(JavaConversions.scala:521)
at kafka.producer.ZKBrokerPartitionInfo$$anonfun$kafka$producer$ZKBrokerPartitionInfo$$getZKTopicPartitionInfo$1.apply(ZKBrokerPartitionInfo.scala:167)
at kafka.producer.ZKBrokerPartitionInfo$$anonfun$kafka$producer$ZKBrokerPartitionInfo$$getZKTopicPartitionInfo$1.apply(ZKBrokerPartitionInfo.scala:163)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:474)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
at scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:521)
at kafka.producer.ZKBrokerPartitionInfo.kafka$producer$ZKBrokerPartitionInfo$$getZKTopicPartitionInfo(ZKBrokerPartitionInfo.scala:163)
at kafka.producer.ZKBrokerPartitionInfo.(ZKBrokerPartitionInfo.scala:65)
at kafka.producer.Producer.(Producer.scala:47)
at kafka.javaapi.producer.Producer.(Producer.scala:33)
at kafka.javaapi.producer.Producer.(Producer.scala:40)
at com.vipshop.flume.KafkaUtil.getProducer(KafkaUtil.java:51)
at com.vipshop.flume.sink.kafka.KafkaSink.configure(KafkaSink.java:56)
at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:418)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:103)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)

has error

2013-11-27 14:45:15,915 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:40)] Creating instance of sink: kafka, type: com.vipshop.flume.sink.kafka.KafkaSink
2013-11-27 14:45:15,920 (conf-file-poller-0) [INFO - com.vipshop.flume.KafkaUtil.getProducer(KafkaUtil.java:41)] { parameters:{topic=topic1, zkconnect=10.11.68.92:2181, batchsize=200, type=com.vipshop.flume.sink.kafka.KafkaSink, serializer.class=kafka.serializer.StringEncoder, producer.type=async, channel=svc_0_chan} }
2013-11-27 14:45:15,922 (conf-file-poller-0) [ERROR - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:145)] Failed to start agent because dependencies were not found in classpath. Error follows.
java.lang.NoClassDefFoundError: kafka/javaapi/producer/Producer
at com.vipshop.flume.KafkaUtil.getProducer(KafkaUtil.java:43)
at com.vipshop.flume.sink.kafka.KafkaSink.configure(KafkaSink.java:47)
at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:418)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:103)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
Caused by: java.lang.ClassNotFoundException: kafka.javaapi.producer.Producer
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
... 14 more

Error: java.lang.IllegalStateException: close() called when transaction is OPEN - you must either commit or rollback first

serverA kafka_2.8.0-0.8.0-beta1, serverB flume-ng1.4
server B run:
flume-ng agent -c . -f flume-kafka.conf -n a1 -Dflume.root.logger=INFO,console

flume sink config:
a1.sinks.k1.type = com.vipshop.flume.sink.kafka.KafkaSink
a1.sinks.k1.channel = c1
a1.sinks.k1.zkconnect = 192.168.165.39:2181
a1.sinks.k1.topic = test2
a1.sinks.k1.producer.type = async
a1.sinks.k1.batchsize = 400

a1.sinks.k1.serializer.class = kafka.serializer.StringEncoder

error :

2013-11-21 00:13:25,909 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:173)] Starting Sink k1
2013-11-21 00:13:25,910 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Starting Source r1
2013-11-21 00:13:25,910 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.source.ExecSource.start(ExecSource.java:163)] Exec source starting with command:tail -F /var/log/maillog
2013-11-21 00:13:25,914 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:110)] Monitoried counter group for type: SOURCE, name: r1, registered successfully.
2013-11-21 00:13:25,914 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:94)] Component type: SOURCE, name: r1 started
2013-11-21 00:13:28,924 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows.
java.lang.IllegalStateException: close() called when transaction is OPEN - you must either commit or rollback first
at com.google.common.base.Preconditions.checkState(Preconditions.java:172)
at org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
at com.vipshop.flume.sink.kafka.KafkaSink.process(KafkaSink.java:45)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:722)
2013-11-21 00:13:34,931 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows.
java.lang.IllegalStateException: close() called when transaction is OPEN - you must either commit or rollback first
at com.google.common.base.Preconditions.checkState(Preconditions.java:172)
at org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
at com.vipshop.flume.sink.kafka.KafkaSink.process(KafkaSink.java:45)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:722)

kafka 8.2

I am having trouble loading the sink.

Tried stopping kakfa and removing all topics from disk , did not help.

Flume - version flume-1.4.0

Kafka - kafka_2.8.0

Flume sink config :
applog_agent.sinks = kafka
applog_agent.sinks.kafka.type = com.vipshop.flume.sink.kafka.KafkaSink
applog_agent.sinks.kafka.channel = C1
applog_agent.sinks.kafka.zk.connect = kafkanode:2181
applog_agent.sinks.kafka.topic = all
applog_agent.sinks.kafka.batchsize = 200
applog_agent.sinks.kafka.producer.type = async

applog_agent.sinks.kafka.serializer.class = kafka.serializer.StringEncoder

Plugin lib:
ls -lrt plugins.d/flume-kafka-plugin/libext
total 18820
-rw-r--r-- 1 kafkausr kafkausr 604182 Nov 26 17:21 zookeeper-3.3.4.jar
-rw-r--r-- 1 kafkausr kafkausr 6160791 Nov 26 17:21 scala-library.jar
-rw-r--r-- 1 kafkausr kafkausr 8671416 Nov 26 17:21 scala-compiler.jar
-rw-r--r-- 1 kafkausr kafkausr 2520145 Nov 26 17:21 kafka_2.8.0-0.8.0.jar
-rw-r--r-- 1 kafkausr kafkausr 53244 Nov 26 17:21 jopt-simple-3.2.jar
-rw-r--r-- 1 kafkausr kafkausr 64009 Nov 26 17:21 zkclient-0.3.jar
-rw-r--r-- 1 kafkausr kafkausr 995968 Nov 26 17:21 snappy-java-1.0.4.1.jar
-rw-r--r-- 1 kafkausr kafkausr 82123 Nov 26 17:21 metrics-core-2.2.0.jar
-rw-r--r-- 1 kafkausr kafkausr 4229 Nov 26 17:21 metrics-annotation-2.2.0.jar
ls -lrt plugins.d/flume-kafka-plugin/lib
total 12

-rw-rw-r-- 1 kafkausr kafkausr 6884 Dec 6 06:33 flume-kafka-plugin.jar

Error in Flume log :

06 Dec 2013 11:09:10,520 INFO conf-file-poller-0-SendThread(kafkanode:2181) - Socket connection established to kafkanode/x.x.x.x:2181, initiating session
06 Dec 2013 11:09:10,536 INFO conf-file-poller-0-SendThread(kafkanode:2181) - Session establishment complete on server kafkanode/x.x.x.x:2181, sessionid = 0x42c8a2118b0004, negotiated timeout = 6000
06 Dec 2013 11:09:10,540 INFO conf-file-poller-0-EventThread - zookeeper state changed (SyncConnected)
06 Dec 2013 11:09:10,633 ERROR conf-file-poller-0 - Sink kafka has been removed due to an error during configuration
java.lang.NumberFormatException: For input string: "-1, "port""
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:48)
at java.lang.Integer.parseInt(Integer.java:458)
at java.lang.Integer.parseInt(Integer.java:499)
at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:207)
at scala.collection.immutable.StringOps.toInt(StringOps.scala:31)
at kafka.cluster.Broker$.createBroker(Broker.scala:28)
at kafka.producer.ZKBrokerPartitionInfo$$anonfun$kafka$producer$ZKBrokerPartitionInfo$$getZKBrokerInfo$1.apply(ZKBrokerPartitionInfo.scala:195)
at kafka.producer.ZKBrokerPartitionInfo$$anonfun$kafka$producer$ZKBrokerPartitionInfo$$getZKBrokerInfo$1.apply(ZKBrokerPartitionInfo.scala:193)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
at kafka.producer.ZKBrokerPartitionInfo.kafka$producer$ZKBrokerPartitionInfo$$getZKBrokerInfo(ZKBrokerPartitionInfo.scala:193)
at kafka.producer.ZKBrokerPartitionInfo.(ZKBrokerPartitionInfo.scala:67)
at kafka.producer.Producer.(Producer.scala:47)
at kafka.javaapi.producer.Producer.(Producer.scala:33)
at kafka.javaapi.producer.Producer.(Producer.scala:40)
at com.vipshop.flume.KafkaUtil.getProducer(KafkaUtil.java:43)
at com.vipshop.flume.sink.kafka.KafkaSink.configure(KafkaSink.java:47)
at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:418)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:103)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

ERROR [conf-file-poller-0] (org.apache.flume.node.AbstractConfigurationProvider.loadSinks:432) - Sink kafka has been removed due to an error during configuration

27 Dec 2013 11:30:49,194 ERROR conf-file-poller-0 - Sink kafka has been removed due to an error during configuration
kafka.common.InvalidConfigException: At least one of zk.connect or broker.list must be specified
at kafka.producer.ProducerConfig.(ProducerConfig.scala:51)
at com.vipshop.flume.KafkaUtil.getProducer(KafkaUtil.java:43)
at com.vipshop.flume.sink.kafka.KafkaSink.configure(KafkaSink.java:47)
at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:418)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:103)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)

java.lang.NoSuchMethodError: org.apache.flume.ChannelFactory.getClass(Ljava/lang/String;)Ljava/lang/Class;

I'm attempting to use the kafka source functionality.

After building the jar (with dependencies) and copying into the flume directory with all the other source/sink .jar files, I am getting this error when running flume-ng

https://gist.github.com/jconerly/81dffe5c8e9f59760a98

The following is the source configuration:

agent.sources.kafka_source.type       = com.vipshop.flume.source.kafka.KafkaSource
agent.sources.kafka_source.zk.connect = 127.0.0.1:2181
agent.sources.kafka_source.topic      = data
agent.sources.kafka_source.groupid    = data-client
agent.sources.kafka_source.batch.size = 100

I had to modify the pom.xml to include the maven-assembly-plugin so I could build a jar that included all the dependencies. I tried deploying the regular jar first and flume complained of missing dependencies like so:

13/11/26 21:23:27 ERROR node.PollingPropertiesFileConfigurationProvider: Failed to start agent because dependencies were not found in classpath. Error follows.
java.lang.NoClassDefFoundError: kafka/consumer/ConsumerConfig

I am running Flume 1.3.1, and Kafka 0.7.2-incubating

Please advise, I'd love to get this up and running.

Flume agent checking conf file in a loop

Naming the components on the current agent.

TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS

Describing/Configuring the source

TwitterAgent.sources.Twitter.type=flume.mytwittersource.MyTwitterSourceForFlume
TwitterAgent.sources.Twitter.channels=MemChannel
TwitterAgent.sources.Twitter.consumerKey=**********
TwitterAgent.sources.Twitter.consumerSecret=*
TwitterAgent.sources.Twitter.accessToken=
*
TwitterAgent.sources.Twitter.accessTokenSecret=
***
TwitterAgent.sources.Twitter.keywords=docker,intel

Describing/Configuring the sink

TwitterAgent.sinks.HDFS.channel=MemChannel
TwitterAgent.sinks.HDFS.type=hdfs
TwitterAgent.sinks.HDFS.hdfs.path=hdfs://quickstart.cloudera:8020/user/Flume/twitter_data/
TwitterAgent.sinks.HDFS.hdfs.fileType=DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat=Text
TwitterAgent.sinks.HDFS.hdfs.batchSize=1000
TwitterAgent.sinks.HDFS.hdfs.rollSize=0
TwitterAgent.sinks.HDFS.hdfs.rollCount=10000

Describing/Configuring the channel

TwitterAgent.channels.MemChannel.type=memory
TwitterAgent.channels.MemChannel.capacity=10000
TwitterAgent.channels.MemChannel.transactionCapacity=1000

When I execute this file I am getting following message in a loop without fetching any tweets. Can you please helpme to identified what is the error on why it is not fetching any tweets?

(conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:conf/flume-twitter.conf for changes

(conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:conf/flume-twitter.conf for changes

Thanks in advance

Build issue

Hello,

I tried to build the project with maven 3.1 but it is missing 2 dependencies:

com.linkedin.kafka:kafka:jar:0.7.2, com.vipshop.scala:scala:jar:2.8.0

I was able to find the com.linkedin.kafka but the other one seems very linked to your own code.

Will it be possible to include this 2 dependencies to allow the build?

Anyways thanks a lot for your piece of code!

org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:149

Error info:
2014-08-20 18:55:51,755 (conf-file-poller-0) [ERROR - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:149)] Unhandled error
java.lang.NoSuchMethodError: scala.math.LowPriorityOrderingImplicits.ordered()Lscala/math/Ordering; at kafka.producer.ZKBrokerPartitionInfo$$anonfun$kafka$producer$ZKBrokerPartitionInfo$$getZKTopicPartitionInfo$1.apply(ZKBrokerPartitionInfo.scala:172) at kafka.producer.ZKBrokerPartitionInfo$$anonfun$kafka$producer$ZKBrokerPartitionInfo$$getZKTopicPartitionInfo$1.apply(ZKBrokerPartitionInfo.scala:163)

I use the kafka sink functionality, kafka-0.7.2.jar and flume-kafka-0.2.jar in flume lib path, kafka version is 0.7.2,flume version is 1.5.0

flume configuration:
agent_log.sources = r1
agent_log.sinks = kafka
agent_log.channels = c1

agent_log.sources.r1.type = exec
agent_log.sources.r1.channels = c1
agent_log.sources.r1.command = tail -f /var/log/test.log

agent_log.channels.c1.type = memory
agent_log.channels.c1.capacity = 1000
agent_log.channels.c1.trasactionCapacity = 100

agent_log.sinks.kafka.type = com.vipshop.flume.sink.kafka.KafkaSink
agent_log.sinks.kafka.channel = c1
agent_log.sinks.kafka.zk.connect = kafkaNode:2181
agent_log.sinks.kafka.topic = my-replicated-topic
agent_log.sinks.kafka.batchsize = 200
agent_log.sinks.kafka.producer.type = async
agent_log.sinks.kafka.serializer.class = kafka.serializer.StringEncoder

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.