Giter Site home page Giter Site logo

flume-ng-rabbitmq's Introduction

Flume-ng RabbitMQ

This project provides both a RabbitMQ source and sink for Flume-NG. To use this plugin with your Flume installation, build from source using

mvn package

and put the resulting jar file in the lib directory in your flume installation.

This project is available under the Apache License.

Configuration of RabbitMQ Source

The configuration of RabbitMQ sources requires that you either declare an exchange name or a queue name.

The exchange name option is helpful if you have declared an exchange in RabbitMQ, but want to use a default named queue. If you have a predeclared queue that you want to receive events from, then you can simply declare the queue name and leave the exchange name out. Another optional configuration option is the declaration of topic routing keys that you want to listen to. This is a comma-delimited list.

Minimal Config Example

agent1.sources.rabbitmq-source1.channels = ch1  
agent1.sources.rabbitmq-source1.type = org.apache.flume.source.rabbitmq.RabbitMQSource  
agent1.sources.rabbitmq-source1.hostname = 10.10.10.173  

agent1.sources.rabbitmq-source1.queuename = log_jammin 
OR
agent1.sources.rabbitmq-source1.exchangename = log_jammin_exchange

Full Config Example

agent1.sources.rabbitmq-source1.channels = ch1  
agent1.sources.rabbitmq-source1.type = org.apache.flume.source.rabbitmq.RabbitMQSource  
agent1.sources.rabbitmq-source1.hostname = 10.10.10.173  

agent1.sources.rabbitmq-source1.queuename = log_jammin
OR
agent1.sources.rabbitmq-source1.exchangename = log_jammin_exchange

agent1.sources.rabbitmq-source1.topics = topic1,topic2
agent1.sources.rabbitmq-source1.username = rabbitmquser
agent1.sources.rabbitmq-source1.password = p@$$w0rd!
agent1.sources.rabbitmq-source1.port = 12345
agent1.sources.rabbitmq-source1.virtualhost = virtualhost1

RabbitMQ Sink

Minimal Config Example

agent1.sinks.rabbitmq-sink1.channels = ch1  
agent1.sinks.rabbitmq-sink1.type = org.apache.flume.sink.rabbitmq.RabbitMQSink  
agent1.sinks.rabbitmq-sink1.hostname = 10.10.10.173  
agent1.sinks.rabbitmq-sink1.queuename = log_jammin

Full Config Example

agent1.sinks.rabbitmq-sink1.channels = ch1  
agent1.sinks.rabbitmq-sink1.type = org.apache.flume.sink.rabbitmq.RabbitMQSink  
agent1.sources.rabbitmq-source1.hostname = 10.10.10.173  
agent1.sources.rabbitmq-source1.queuename = log_jammin
agent1.sources.rabbitmq-source1.username = rabbitmquser
agent1.sources.rabbitmq-source1.password = p@$$w0rd!
agent1.sources.rabbitmq-source1.port = 12345
agent1.sources.rabbitmq-source1.virtualhost = virtualhost1
agent1.sources.rabbitmq-source1.exchange = exchange1

flume-ng-rabbitmq's People

Contributors

dstendardi avatar jcustenborder avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

flume-ng-rabbitmq's Issues

Issue with RabbitMQSource exception during nextDelivery

Consumer reports an error during consuming rabbit messages, but the exception handling mechanism went into infinite recursion loop.

2018-03-06 12:48:45,930 ERROR org.apache.flume.source.rabbitmq.RabbitMQSource: sourceName - Exception thrown while pulling from queue. java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) at com.rabbitmq.client.QueueingConsumer.nextDelivery(QueueingConsumer.java:214) at org.apache.flume.source.rabbitmq.RabbitMQSource.doProcess(RabbitMQSource.java:117) at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:58) at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:137) at java.lang.Thread.run(Thread.java:745) 2018-03-06 12:48:45,932 WARN org.apache.flume.source.rabbitmq.RabbitMQSource: sourceName - Closing RabbitMQ connection and channel due to exception. 2018-03-06 12:48:46,943 ERROR org.apache.flume.node.PollingPropertiesFileConfigurationProvider: Unhandled error java.lang.StackOverflowError at org.apache.flume.source.BasicSourceSemantics.stop(BasicSourceSemantics.java:96) at org.apache.flume.source.rabbitmq.RabbitMQSource.doStop(RabbitMQSource.java:178) at org.apache.flume.source.BasicSourceSemantics.stop(BasicSourceSemantics.java:96) at org.apache.flume.source.rabbitmq.RabbitMQSource.doStop(RabbitMQSource.java:178) at org.apache.flume.source.BasicSourceSemantics.stop(BasicSourceSemantics.java:96) at org.apache.flume.source.rabbitmq.RabbitMQSource.doStop(RabbitMQSource.java:178) at org.apache.flume.source.BasicSourceSemantics.stop(BasicSourceSemantics.java:96) at org.apache.flume.source.rabbitmq.RabbitMQSource.doStop(RabbitMQSource.java:178) at org.apache.flume.source.BasicSourceSemantics.stop(BasicSourceSemantics.java:96) at org.apache.flume.source.rabbitmq.RabbitMQSource.doStop(RabbitMQSource.java:178) ...

Recursion look is caused by below code: (RabbitMQSource.java line 176)

@Override protected void doStop() throws FlumeException { RabbitMQUtil.close(_Connection, _Channel); super.stop(); }

Where super.stop(); will call doStop internally.

Issues with read performance

I'm not sure if I'm missing something but I'm only able to pull somewhere between 10-50 messages/second from my RMQ source but I'm able to get in the range of several thousand per second by pulling directly from RMQ or using other RMQ plugins. Any thoughts? Thanks.

Consumer not reset

I found that in situations where the _Connection and _Channel are reset (and set to null), the _Consumer is not also getting reset. For me this caused a loop in connectionReset after _Consumer.nextDelivery(). Resetting the _Consumer when the _Channel is recreated solved this for me. If I find the time to learn github pull requests, I may submit my code, but it's pretty straightforward (single line).

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.