Giter Site home page Giter Site logo

kafka-connect-rabbitmq's Introduction

Introduction

Source Connectors

RabbitMQSourceConnector

Connector is used to read from a RabbitMQ Queue or Topic.

Configuration

kafka.topic

Importance: High

Type: String

Kafka topic to write the messages to.

rabbitmq.queue

Importance: High

Type: List

rabbitmq.queue

rabbitmq.host

Importance: High

Type: String

Default Value: localhost

The RabbitMQ host to connect to. See ConnectionFactory.setHost(java.lang.String) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setHost-java.lang.String->_

rabbitmq.password

Importance: High

Type: String

Default Value: guest

The password to authenticate to RabbitMQ with. See ConnectionFactory.setPassword(java.lang.String) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setPassword-java.lang.String->_

rabbitmq.username

Importance: High

Type: String

Default Value: guest

The username to authenticate to RabbitMQ with. See ConnectionFactory.setUsername(java.lang.String) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setUsername-java.lang.String->_

rabbitmq.virtual.host

Importance: High

Type: String

Default Value: /

Converter to compose the Kafka message.

message.converter

Importance: Medium

Type: String

Default Value: com.github.themeetgroup.kafka.connect.rabbitmq.source.data.MessageConverter

The virtual host to use when connecting to the broker. See ConnectionFactory.setVirtualHost(java.lang.String) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setVirtualHost-java.lang.String->_

rabbitmq.port

Importance: Medium

Type: Int

Default Value: 5672

The RabbitMQ port to connect to. See ConnectionFactory.setPort(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setPort-int->_

rabbitmq.prefetch.count

Importance: Medium

Type: Int

Default Value: 0

Maximum number of messages that the server will deliver, 0 if unlimited. See Channel.basicQos(int, boolean) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Channel.html#basicQos-int-boolean->_

rabbitmq.prefetch.global

Importance: Medium

Type: Boolean

Default Value: false

True if the settings should be applied to the entire channel rather than each consumer. See Channel.basicQos(int, boolean) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Channel.html#basicQos-int-boolean->_

rabbitmq.automatic.recovery.enabled

Importance: Low

Type: Boolean

Default Value: true

Enables or disables automatic connection recovery. See ConnectionFactory.setAutomaticRecoveryEnabled(boolean) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setAutomaticRecoveryEnabled-boolean->_

rabbitmq.connection.timeout.ms

Importance: Low

Type: Int

Default Value: 60000

Connection TCP establishment timeout in milliseconds. zero for infinite. See ConnectionFactory.setConnectionTimeout(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setConnectionTimeout-int->_

rabbitmq.handshake.timeout.ms

Importance: Low

Type: Int

Default Value: 10000

The AMQP0-9-1 protocol handshake timeout, in milliseconds. See ConnectionFactory.setHandshakeTimeout(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setHandshakeTimeout-int->_

rabbitmq.network.recovery.interval.ms

Importance: Low

Type: Int

Default Value: 10000

See ConnectionFactory.setNetworkRecoveryInterval(long) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setNetworkRecoveryInterval-long->_

rabbitmq.requested.channel.max

Importance: Low

Type: Int

Default Value: 0

Initially requested maximum channel number. Zero for unlimited. See ConnectionFactory.setRequestedChannelMax(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setRequestedChannelMax-int->_

rabbitmq.requested.frame.max

Importance: Low

Type: Int

Default Value: 0

Initially requested maximum frame size, in octets. Zero for unlimited. See ConnectionFactory.setRequestedFrameMax(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setRequestedFrameMax-int->_

rabbitmq.requested.heartbeat.seconds

Importance: Low

Type: Int

Default Value: 60

Set the requested heartbeat timeout. Heartbeat frames will be sent at about 1/2 the timeout interval. If server heartbeat timeout is configured to a non-zero value, this method can only be used to lower the value; otherwise any value provided by the client will be used. See ConnectionFactory.setRequestedHeartbeat(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setRequestedHeartbeat-int->_

rabbitmq.shutdown.timeout.ms

Importance: Low

Type: Int

Default Value: 10000

Set the shutdown timeout. This is the amount of time that Consumer implementations have to continue working through deliveries (and other Consumer callbacks) after the connection has closed but before the ConsumerWorkService is torn down. If consumers exceed this timeout then any remaining queued deliveries (and other Consumer callbacks, including the Consumer's handleShutdownSignal() invocation) will be lost. See ConnectionFactory.setShutdownTimeout(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setShutdownTimeout-int->_

rabbitmq.topology.recovery.enabled

Importance: Low

Type: Boolean

Default Value: true

Enables or disables topology recovery. See ConnectionFactory.setTopologyRecoveryEnabled(boolean) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setTopologyRecoveryEnabled-boolean->_

Examples

Standalone Example

This configuration is used typically along with standalone mode.

name=RabbitMQSourceConnector1
connector.class=com.github.themeetgroup.kafka.connect.rabbitmq.source.RabbitMQSourceConnector
tasks.max=1
kafka.topic=< Required Configuration >
rabbitmq.queue=< Required Configuration >
Distributed Example

This configuration is used typically along with distributed mode. Write the following json to connector.json, configure all of the required values, and use the command below to post the configuration to one the distributed connect worker(s).

{
  "config" : {
    "name" : "RabbitMQSourceConnector1",
    "connector.class" : "com.github.themeetgroup.kafka.connect.rabbitmq.source.RabbitMQSourceConnector",
    "tasks.max" : "1",
    "kafka.topic" : "< Required Configuration >",
    "rabbitmq.queue" : "< Required Configuration >"
  }
}

Use curl to post the configuration to one of the Kafka Connect Workers. Change http://localhost:8083/ the the endpoint of one of your Kafka Connect worker(s).

Create a new instance.

curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors

Update an existing instance.

curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/TestSinkConnector1/config

Sink Connectors

RabbitMQSinkConnector

Connector is used to read data from a Kafka topic and publish it on a RabbitMQ exchange and routing key pair.

Configuration

rabbitmq.exchange

Importance: High

Type: String

exchange to publish the messages on.

rabbitmq.routing.key

Importance: High

Type: String

routing key used for publishing the messages.

topics

Importance: High

Type: String

Kafka topic to read the messages from.

rabbitmq.host

Importance: High

Type: String

Default Value: localhost

The RabbitMQ host to connect to. See ConnectionFactory.setHost(java.lang.String) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setHost-java.lang.String->_

rabbitmq.password

Importance: High

Type: String

Default Value: guest

The password to authenticate to RabbitMQ with. See ConnectionFactory.setPassword(java.lang.String) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setPassword-java.lang.String->_

rabbitmq.username

Importance: High

Type: String

Default Value: guest

The username to authenticate to RabbitMQ with. See ConnectionFactory.setUsername(java.lang.String) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setUsername-java.lang.String->_

rabbitmq.virtual.host

Importance: High

Type: String

Default Value: /

The virtual host to use when connecting to the broker. See ConnectionFactory.setVirtualHost(java.lang.String) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setVirtualHost-java.lang.String->_

rabbitmq.port

Importance: Medium

Type: Int

Default Value: 5672

The RabbitMQ port to connect to. See ConnectionFactory.setPort(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setPort-int->_

rabbitmq.automatic.recovery.enabled

Importance: Low

Type: Boolean

Default Value: true

Enables or disables automatic connection recovery. See ConnectionFactory.setAutomaticRecoveryEnabled(boolean) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setAutomaticRecoveryEnabled-boolean->_

rabbitmq.connection.timeout.ms

Importance: Low

Type: Int

Default Value: 60000

Connection TCP establishment timeout in milliseconds. zero for infinite. See ConnectionFactory.setConnectionTimeout(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setConnectionTimeout-int->_

rabbitmq.handshake.timeout.ms

Importance: Low

Type: Int

Default Value: 10000

The AMQP0-9-1 protocol handshake timeout, in milliseconds. See ConnectionFactory.setHandshakeTimeout(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setHandshakeTimeout-int->_

rabbitmq.network.recovery.interval.ms

Importance: Low

Type: Int

Default Value: 10000

See ConnectionFactory.setNetworkRecoveryInterval(long) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setNetworkRecoveryInterval-long->_

rabbitmq.requested.channel.max

Importance: Low

Type: Int

Default Value: 0

Initially requested maximum channel number. Zero for unlimited. See ConnectionFactory.setRequestedChannelMax(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setRequestedChannelMax-int->_

rabbitmq.requested.frame.max

Importance: Low

Type: Int

Default Value: 0

Initially requested maximum frame size, in octets. Zero for unlimited. See ConnectionFactory.setRequestedFrameMax(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setRequestedFrameMax-int->_

rabbitmq.requested.heartbeat.seconds

Importance: Low

Type: Int

Default Value: 60

Set the requested heartbeat timeout. Heartbeat frames will be sent at about 1/2 the timeout interval. If server heartbeat timeout is configured to a non-zero value, this method can only be used to lower the value; otherwise any value provided by the client will be used. See ConnectionFactory.setRequestedHeartbeat(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setRequestedHeartbeat-int->_

rabbitmq.shutdown.timeout.ms

Importance: Low

Type: Int

Default Value: 10000

Set the shutdown timeout. This is the amount of time that Consumer implementations have to continue working through deliveries (and other Consumer callbacks) after the connection has closed but before the ConsumerWorkService is torn down. If consumers exceed this timeout then any remaining queued deliveries (and other Consumer callbacks, including the Consumer's handleShutdownSignal() invocation) will be lost. See ConnectionFactory.setShutdownTimeout(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setShutdownTimeout-int->_

rabbitmq.topology.recovery.enabled

Importance: Low

Type: Boolean

Default Value: true

Enables or disables topology recovery. See ConnectionFactory.setTopologyRecoveryEnabled(boolean) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setTopologyRecoveryEnabled-boolean->_

Examples

Standalone Example

This configuration is used typically along with standalone mode.

name=RabbitMQSinkConnector1
connector.class=com.github.themeetgroup.kafka.connect.rabbitmq.sink.RabbitMQSinkConnector
tasks.max=1
topics=< Required Configuration >
rabbitmq.exchange=< Required Configuration >
rabbitmq.routing.key=< Required Configuration >
topics=< Required Configuration >
Distributed Example

This configuration is used typically along with distributed mode. Write the following json to connector.json, configure all of the required values, and use the command below to post the configuration to one the distributed connect worker(s).

{
  "config" : {
    "name" : "RabbitMQSinkConnector1",
    "connector.class" : "com.github.themeetgroup.kafka.connect.rabbitmq.sink.RabbitMQSinkConnector",
    "tasks.max" : "1",
    "topics" : "< Required Configuration >",
    "rabbitmq.exchange" : "< Required Configuration >",
    "rabbitmq.routing.key" : "< Required Configuration >"
  }
}

Use curl to post the configuration to one of the Kafka Connect Workers. Change http://localhost:8083/ the the endpoint of one of your Kafka Connect worker(s).

Create a new instance.

curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors

Update an existing instance.

curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/TestSinkConnector1/config

kafka-connect-rabbitmq's People

Contributors

areimus avatar arkadiuszbicz avatar jcustenborder avatar jonahharris avatar kyumarss avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

kafka-connect-rabbitmq's Issues

Source Connector doesn't support DLX

Hello, I'm really new to Kafka Connect and RabbitMQ, so sorry for any mistakes I may write.

I can't make this connector work with a RabbitMQ queue with DLX. It works without a DLX, but we need a DLX in order to send messages to our DLQ.

This is the error I get.
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'myqueue' in vhost '/': received none but current is the value 'myqueue.dlx' of type..., class-id=50, method-id=10)

We tried the confluent Connector and it works with the DLX, but we can't afford to pay a license.

I could contribute to this project, but I don't know what is necessary in order to make it work. I'm open to learn about it.

Thanks in advance.

Source connector support rabbitmq.routing.key and rabbitmq.exchange

The program currently only supports MQ Basic consume, and does not support consume based on routing key.

Currently the routing strategy of the queue need to be defined in advance. We need to create the binding relationship between the exchange and the queue first, and then create the rabbitmq source connector. It is not convenient for later operation and maintenance. Could provide the relevant support?

Error on built ExtractHeaderTest.java

Dear friends,

I build this one but I meet the problem error log with ExtractHeaderTest.java, could you please let me know what can I do for this job? My enviroment: mvn 3.6.3, java-openjdk 1.8.0, centos 7

Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.047 sec <<< FAILURE! - in com.github.jcustenborder.kafka.connect.rabbitmq.ExtractHeaderTest
apply()  Time elapsed: 0.047 sec  <<< ERROR!
java.lang.NoClassDefFoundError: org/apache/kafka/connect/header/Header
        at com.github.jcustenborder.kafka.connect.rabbitmq.ExtractHeaderTest.apply(ExtractHeaderTest.java:21)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.connect.header.Header
        at com.github.jcustenborder.kafka.connect.rabbitmq.ExtractHeaderTest.apply(ExtractHeaderTest.java:21)


Results :

Tests in error:
  ExtractHeaderTest.apply:21 » NoClassDefFound org/apache/kafka/connect/header/H...

Tests run: 30, Failures: 0, Errors: 1, Skipped: 0

[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  18.063 s
[INFO] Finished at: 2020-05-29T17:14:07+07:00
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:2.19.1:test (default-test) on project kafka-connect-rabbitmq: There are test failures.
[ERROR]
[ERROR] Please refer to /root/jonahharris_kafka-connect-rabbitmq/target/surefire-reports for the individual test results.
[ERROR] -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException

Thank you so much

On kafka-connect-activemq sink of ActiveMQ

Hello, I am learning about the official website of'kafka-connect-activemq sink'. There is no information and documentation about sink, but I need information similar to'kafka-connect-rabbitmq sink'. I hope you can help me. Thank you !

How to build this plugin?

How can I build this plugin?
Just running maven install fails:
Failure to find com.palantir.docker.compose:docker-compose-rule-core:pom:0.34.0

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.