Giter Site home page Giter Site logo

alibaba-archive / kafka-connect-mongo Goto Github PK

View Code? Open in Web Editor NEW
29.0 7.0 13.0 498 KB

Kafka mongo connector (deeply inspired by https://github.com/DataReply/kafka-connect-mongodb)

License: Apache License 2.0

Shell 5.63% Kotlin 92.96% Makefile 0.08% JavaScript 0.57% Dockerfile 0.76%
kafka kafka-connector kafka-connect-mongo kafka-connect mongodb

kafka-connect-mongo's Introduction

kafka-connect-mongo

Build Status Docker Repository on Quay

WARNING: when upgrade from 1.5.x to 1.6.0, please read the messages below!

1.6.0 changed the package name from org.apache.kafka to com.teambition, so when you upgrade from 1.5.x, you may find your connectors breaked. So please:

  1. Save your connectors's configs to local file, you may save those configs to a local curl script like so:
curl -X PUT -H "Content-Type: application/json" http://192.168.0.22:38083/connectors/mongo_source_test/config -d '{
  "connector.class": "MongoSourceConnector",
  "databases": "teambition.tasks",
  "initial.import": "true",
  "topic.prefix": "mongo_test",
  "tasks.max": "8",
  "batch.size": "100",
  "schema.name": "mongo_test_schema",
  "name": "mongo_source_test",
  "mongo.uri": "mongodb://root:[email protected]:27017/?authSource=admin",
  "additional.filter": ""
}'
  1. Delete your connectors via curl -XDELETE http://192.168.0.22:38083/connectors/your_connector_name, this will not delete your offsets, so you will not worry about lost of your offsets.
  2. Upgrade your kafka-connect-mongo cluster to 1.6.0.
  3. Recreate your connectors (with the saved curl scripts).

Mongo connector (source)

What's a kafka connector?

Config example

name=mongo-source-connector
connector.class=MongoSourceConnector
tasks.max=1
mongo.uri=mongodb://127.0.0.1:27017
batch.size=100
schema.name=mongo_local_schema
topic.prefix=mongo_local
databases=test.users
# If this option is set to true, source connector will analyze the schema from real document type and mapping them to the top level schema types
# WARNING: mongo connector interprets the schema from the structure of document, so it can not ensure the schema always stay consist. 
# If you met an `Schema being registered is incompatible with an earlier schema` error given by schema registry, please set the `avro.compatibility.level` option of schema registry to `none` 
analyze.schema=false
schema.registry.url=http://127.0.0.1:8080
# This option works in export mode (use MongoExportSourceConnector), which will export the whole database with these filter conditions
additional.filter=
# If use ssl, add configs on jvm by set environment variables `-Djavax.net.ssl.trustStore=/secrets/truststore.jks -Djavax.net.ssl.trustStorePassword=123456 -Djavax.net.ssl.keyStore=/secrets/keystore.jks -Djavax.net.ssl.keyStorePassword=123456`
#mongo.uri=mongodb://user:[email protected]:27017/?ssl=true&authSource=admin&replicaSet=rs0&sslInvalidHostNameAllowed=true

Sink (Experimental)

name=mongo-sink-connector
connector.class=MongoSinkConnector
tasks.max=1
mongo.uri=mongodb://root:[email protected]:27017/?authSource=admin
topics=topic1,topic2
databases=mydb.topic1,mydb.topic2

Now you can only use mongo sink connector as your restore tool, you can restore data from kafka which given by mongo source connector.

The messages should contain object and id fields

LICENSE

Apache License 2.0

kafka-connect-mongo's People

Contributors

sabinthomas avatar sailxjx avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-connect-mongo's Issues

record loss because of maxMessageSize?

EDIT: Editing my earlier question, after I reviewed the code some more, and understood the purpose of the TailableAwait cursor

Hi,
I believe I am losing records when using Mongo as a source.

I reviewed the code in DatabaseReader.kt

                // Stop pulling data when length of message is too large!
                while (messages.size > maxMessageSize) {
                    log.warn("Message overwhelm! database {}, docs {}, messages {}",
                            db,
                            count,
                            messages.size)
                    Thread.sleep(500)
                }

I believe the section of code where the Thread is put to sleep if it exceeds the maxMessageSize that is currently set to 2000 causes loss of record insertion into Mongo. Is there any alternative besides Thread.sleep()?

这个需要kafka connect 什么版本呢

我用connect3.2.0版本,使用connector.class=org.apache.kafka.connect.mongo.MongoCronSourceConnector 是可以倒入数据到kafka的,但是使用connector.class=org.apache.kafka.connect.mongo.MongoSourceConnector时始终没有产生topic

Issue using JsonConverter for kafka-connect-mongo as Sink

Hi

Using the kafka MongoDB connector as sink for a kafka topic, I'm getting the following error:

kafka-connect_1  | [2017-03-02 13:32:32,050] DEBUG Receive records 1 (org.apache.kafka.connect.mongo.MongoSinkTask)
kafka-connect_1  | [2017-03-02 13:32:32,051] TRACE Put record: SinkRecord{kafkaOffset=0, timestampType=CreateTime} ConnectRecord{topic='rh_stats', kafkaPartition=0, key=null, value={schema={optional=false, type=struct, fields=[{field=type, optional=false, type=string}, {field=id, optional=false, type=string}, {field=date, optional=false, type=string}]}, payload={date=2017-03-02T15:28:15+02:00, id=58b81def81b610c6168b4568, type=rh_agenda_agenda_created}}, timestamp=-1} (org.apache.kafka.connect.mongo.MongoSinkTask)
kafka-connect_1  | [2017-03-02 13:32:32,053] ERROR Task mongo-stats-connector-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask)
kafka-connect_1  | java.lang.ClassCastException: java.util.HashMap cannot be cast to org.apache.kafka.connect.data.Struct
kafka-connect_1  |      at org.apache.kafka.connect.mongo.MongoSinkTask.put(MongoSinkTask.kt:48)
kafka-connect_1  |      at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:384)
kafka-connect_1  |      at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:240)
kafka-connect_1  |      at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172)
kafka-connect_1  |      at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)
kafka-connect_1  |      at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
kafka-connect_1  |      at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
kafka-connect_1  |      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
kafka-connect_1  |      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
kafka-connect_1  |      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
kafka-connect_1  |      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
kafka-connect_1  |      at java.lang.Thread.run(Thread.java:745)
kafka-connect_1  | [2017-03-02 13:32:32,067] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask)

I tried several different payloads, from the simple:

"type":"rh_agenda_agenda_created","id":"58b81e9e81b610c8168b456b","date":"2017-03-02T15:31:11+02:00"}}

to the one seen in the error message with a schema specified, but to no avail. Always the same error happens.
I was wondering if this is a know limitation or if you have ideas on how to fix it.

The connector is created with:

curl -X POST -H "Content-Type: application/json" \
    --data '{"name": "mongo-stats-connector", "config": {"connector.class":"org.apache.kafka.connect.mongo.MongoSinkConnector", "tasks.max":"1", "mongo.uri":"mongodb://mongo:27017", "topics": "rh_stats", "databases": "test.stats"}}' http://kafka-connect:8083/connectors

ENVIRONMENT variables for the kafka-connect container:

      CONNECT_BOOTSTRAP_SERVERS: kafka:9092
      CONNECT_GROUP_ID: connect-cluster
      CONNECT_CONFIG_STORAGE_TOPIC: "connect-configs"
      CONNECT_OFFSET_STORAGE_TOPIC: "connect-offsets"
      CONNECT_STATUS_STORAGE_TOPIC: "connect-status"
      CONNECT_REST_ADVERTISED_HOST_NAME: 192.168.1.139
      CONNECT_REST_ADVERTISED_PORT: 8083
      CONNECT_REST_PORT: 8083
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
      CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
      CONNECT_LOG4J_ROOT_LOGLEVEL: INFO
      CONNECT_LOG_LEVEL: TRACE

Logging Error

I built the package using the steps listed in the documentation but facing this error in spite of changing the log4j properties file. Here is the error description :

log4j:WARN No appenders could be found for logger (org.apache.kafka.connect.mongo.tools.ImportJob). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Exception in thread "main" java.lang.IndexOutOfBoundsException: Index: 1, Size: 1 at java.util.Collections$SingletonList.get(Collections.java:4815) at org.apache.kafka.connect.mongo.tools.ImportDB.<init>(ImportData.kt:119) at org.apache.kafka.connect.mongo.tools.ImportJob.start(ImportData.kt:56) at org.apache.kafka.connect.mongo.tools.ImportDataKt.main(ImportData.kt:260)

Could you help me out with this?

Source Connector connected but not fetching data from mongodb

Hello, I'm trying to run kafka-connect-mongo but I'm not seeing any records being fetched from mongodb after I insert them.

I'm just doing everything locally. Here's what my connect config looks like:

name=mongo-source-connector
connector.class=org.apache.kafka.connect.mongo.MongoSourceConnector
tasks.max=1
batch.size=1

bootstrap.servers=localhost:9092
compression.type=none
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
mongo.uri=mongodb://localhost:27017/

schema.name=submissions_schema
topic.prefix=submissions
databases=test.submissions
initial.import=true

Then I run the connector:

CLASSPATH=....(all classes copied from kafka-connect-mongo script)... \
/usr/local/Cellar/kafka/0.11.0.1/bin/connect-standalone \ 
/usr/local/etc/kafka/connect-standalone.properties \
/usr/local/etc/kafka/connect-mongo-source.properties

I can see things are running and data gets imported initially:

[2018-02-16 16:06:49,490] INFO Opened connection [connectionId{localValue:1, serverValue:13}] to localhost:27017 (org.mongodb.driver.connection:71)
[2018-02-16 16:06:49,491] INFO Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 6, 2]}, minWireVersion=0, maxWireVersion=6, maxDocumentSize=16777216, roundTripTimeNanos=702961} (org.mongodb.driver.cluster:71)
[2018-02-16 16:06:49,496] INFO Opened connection [connectionId{localValue:2, serverValue:14}] to localhost:27017 (org.mongodb.driver.connection:71)
[2018-02-16 16:06:49,508] INFO Import Task finished, database test.submissions, count 4 (org.apache.kafka.connect.mongo.DatabaseReader:125)

I insert data just through the mongo console:

➜  ~ mongo
MongoDB shell version v3.6.2
connecting to: mongodb://127.0.0.1:27017
MongoDB server version: 3.6.2
> show dbs
test    0.000GB
> use test
switched to db test
> show collections
submissions
> db.submissions.insert({  test: "new record"    })
WriteResult({ "nInserted" : 1 })

But I don't see that new record get streamed to kafka. I've read that it might be the batch.size setting, but I've tried setting that to 1 and its still not working. Am I missing something?

Thank you!

Using org.apache.kafka.connect as your package namespace raises problem when using the connector with Landoop's Lenses.

When trying to use the connector with Landoop's lenses ([email protected]) the connector is blacklisted by Kafka's package name-check that is run @ https://github.com/apache/kafka/blob/0.11.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java

If the package name is org.apache.kafka when trying to create a new connector Kafka connect will fail and exit execution @ https://github.com/apache/kafka/blob/0.11.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java#L161

After renaming the package to a different name and compiling the project, everything worked as expected.

java.lang.NoClassDefFoundError: kotlin/text/StringsKt

Hello, I've tried to build mongo connector from tag 1.3.0
And having issues with it:
On first attempt when I've tried to put configuration through API it thrown this error:
Caused by: java.lang.NoClassDefFoundError: kotlin/jvm/internal/Intrinsics

after some googling I've figured out that kotlin-runtime.jar should be available in classpath, I did so and now I'm having subj:
java.lang.NoClassDefFoundError: kotlin/text/StringsKt
at org.apache.kafka.connect.mongo.MongoSourceConnector.taskConfigs(MongoSourceConnector.kt:47)
at org.apache.kafka.connect.runtime.Worker.connectorTaskConfigs(Worker.java:230)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:967)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithRetry(DistributedHerder.java:918)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$800(DistributedHerder.java:101)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$17$1.call(DistributedHerder.java:931)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$17$1.call(DistributedHerder.java:928)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:250)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:200)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

Please advise. Any help higly appreciated (maybe I need a list of jars to include into classpath or something else)...
Many thanks in advance!

Missing required import setting for Mongo source

Running against release [v1.4.1] (https://github.com/teambition/kafka-connect-mongo/releases/tag/1.4.1), the config file is missing 'initial.import' setting.

[2017-11-10 15:42:21,654] INFO Instantiated connector mongo-source-connector with version 0.10.2.1 of type org.apache.kafka.connect.mongo.MongoSourceConnector (org.apache.kafka.connect.runtime.Worker:176)
[2017-11-10 15:42:21,661] ERROR Error while starting connector mongo-source-connector (org.apache.kafka.connect.runtime.WorkerConnector:108)
org.apache.kafka.connect.errors.ConnectException: Missing initial.import config
	at org.apache.kafka.connect.mongo.MongoSourceConnector.getRequiredProp(MongoSourceConnector.kt:74)
	at org.apache.kafka.connect.mongo.MongoSourceConnector.start(MongoSourceConnector.kt:39)
	at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:100)
	at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:125)
	at org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:182)
	at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:178)
	at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.startConnector(StandaloneHerder.java:250)
	at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164)
	at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:94)

Failed to find any class that implements Connector

Hello.

I placed .jar (with or without dependencies) into plugins folder, but when I'm trying to create connector - it gives me an error in response:

    "error_code": 500,
    "message": "Failed to find any class that implements Connector and which name matches org.apache.kafka.connect.mongo.MongoSourceConnector, available connectors are: PluginDesc{klass=class io.confluent.connect.elasticsearch.ElasticsearchSinkConnector, name='io.confluent.connect.elasticsearch.ElasticsearchSinkConnector', version='4.0.0', encodedVersion=4.0.0, type=sink, typeName='sink', location='file:/usr/share/java/connector-plugins/connector-elasticsearch/'}, PluginDesc{klass=class io.debezium.connector.mongodb.MongoDbConnector, name='io.debezium.connector.mongodb.MongoDbConnector', version='0.3.0', encodedVersion=0.3.0, type=source, typeName='source', location='file:/usr/share/java/connector-plugins/debezium-connector-mongodb/'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSinkConnector, name='org.apache.kafka.connect.file.FileStreamSinkConnector', version='1.0.0-cp1', encodedVersion=1.0.0-cp1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSourceConnector, name='org.apache.kafka.connect.file.FileStreamSourceConnector', version='1.0.0-cp1', encodedVersion=1.0.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.mongo.MongoCronSourceConnector, name='org.apache.kafka.connect.mongo.MongoCronSourceConnector', version='1.0.0-cp1', encodedVersion=1.0.0-cp1, type=source, typeName='source', location='file:/usr/share/java/connector-plugins/kafka-connect-mongo/'}, PluginDesc{klass=class org.apache.kafka.connect.mongo.MongoSinkConnector, name='org.apache.kafka.connect.mongo.MongoSinkConnector', version='1.0.0-cp1', encodedVersion=1.0.0-cp1, type=sink, typeName='sink', location='file:/usr/share/java/connector-plugins/kafka-connect-mongo/'}, PluginDesc{klass=class org.apache.kafka.connect.mongo.MongoSourceConnector, name='org.apache.kafka.connect.mongo.MongoSourceConnector', version='1.0.0-cp1', encodedVersion=1.0.0-cp1, type=source, typeName='source', location='file:/usr/share/java/connector-plugins/kafka-connect-mongo/'}, PluginDesc{klass=class org.apache.kafka.connect.mongodb.MongodbSinkConnector, name='org.apache.kafka.connect.mongodb.MongodbSinkConnector', version='1.0.0-cp1', encodedVersion=1.0.0-cp1, type=sink, typeName='sink', location='file:/usr/share/java/connector-plugins/connect-mongodb-1.1-jar-with-dependencies.jar'}, PluginDesc{klass=class org.apache.kafka.connect.mongodb.MongodbSourceConnector, name='org.apache.kafka.connect.mongodb.MongodbSourceConnector', version='1.0.0-cp1', encodedVersion=1.0.0-cp1, type=source, typeName='source', location='file:/usr/share/java/connector-plugins/connect-mongodb-1.1-jar-with-dependencies.jar'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockConnector, name='org.apache.kafka.connect.tools.MockConnector', version='1.0.0-cp1', encodedVersion=1.0.0-cp1, type=connector, typeName='connector', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='1.0.0-cp1', encodedVersion=1.0.0-cp1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='1.0.0-cp1', encodedVersion=1.0.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='1.0.0-cp1', encodedVersion=1.0.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='1.0.0-cp1', encodedVersion=1.0.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='1.0.0-cp1', encodedVersion=1.0.0-cp1, type=source, typeName='source', location='classpath'}"
}

The most interesting is that this classname is present in that list below.

For comparison, Mongo sync from this pack: https://github.com/Landoop/stream-reactor
works (and creates) well.
kafka-connect-mongodb doesn't work with same error too.

Docker Compose .yml file: docker-compose.yml.txt

Build was successful:

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.