Giter Site home page Giter Site logo

hpgrahsl / kafka-connect-mongodb Goto Github PK

View Code? Open in Web Editor NEW
154.0 19.0 59.0 475 KB

**Unofficial / Community** Kafka Connect MongoDB Sink Connector -> integrated 2019 into the official MongoDB Kafka Connector here: https://www.mongodb.com/kafka-connector

License: Apache License 2.0

Java 100.00%
kafka kafka-connect mongodb sink-connector sink connector change-data-capture cdc avro json

kafka-connect-mongodb's People

Contributors

adamarla avatar hpgrahsl avatar sfmontyo avatar soumabrata-chakraborty avatar victorgp avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-connect-mongodb's Issues

mongodb.delete.on.null.values not working

Hi ,

Thanks in advance for your help.

I'm using your connector 1.2 (latest version) which works for me "Insert" and "Update" as stated in your documentation. But, unfortunately I cannot get the document "Deletion" in MongoDB to work.

My configuration:
`
{
"name": "mongodb-sink-dds",
"config": {
...

"connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"mongodb.field.renamer.regexp": "[]",
"mongodb.max.batch.size": "0",
"mongodb.max.num.retries": "3",
"mongodb.change.data.capture.handler": "",
"tasks.max": "1",
"mongodb.connection.uri": "mongodb://192.168.59.101:27017/kafkaconnect?w=1&journal=true",
"topics": "ot1",
"mongodb.field.renamer.mapping": "[]",
"mongodb.writemodel.strategy": "at.grahsl.kafka.connect.mongodb.writemodel.strategy.UpdateOneTimestampsStrategy",
"mongodb.post.processor.chain": "at.grahsl.kafka.connect.mongodb.processor.DocumentIdAdder",
"mongodb.delete.on.null.values": true,
"mongodb.document.id.strategies": "",
"mongodb.value.projection.type": "none",
"mongodb.value.projection.list": "",
"mongodb.key.projection.type": "none",
"mongodb.key.projection.list": "",
"name": "mongodb-sink-dds",
"mongodb.collection": "kafkatopicdds",
"mongodb.document.id.strategy": "at.grahsl.kafka.connect.mongodb.processor.id.strategy.ProvidedInKeyStrategy",
"mongodb.retries.defer.timeout": "5000"

}
}
`

Now,
Assume the following record in MongoDB:
`{

"_id": NumberLong("1"),
"name": "JohnDoe",
"address": "some address",
"_insertedTS": ISODate("2018-10-30T12:29:11.095Z"),
"_modifiedTS": ISODate("2018-10-30T12:29:11.095Z")

}
`

Nevertheless,
The following messages have been sent to Kafka using Kafka-Streams to delete such record with no success:

Will do nothing, will not delete the record in mongodb
{"_id":1} null

Will set name and address to null but not delete the document in MongoDB
{"_id":1} {"_id":null,"name":null,"address":null}

I'm trying to understand your statement:
"The idea is that the sink connector will try to delete a mongodb document from the collection iff the whole value struct is null"
from here:
https://github.com/hpgrahsl/kafka-connect-mongodb/issues/47

Also,
For these cases the sink connector can be configured to delete records in MongoDB whenever it encounters sink records which exhibit null values
from here:
https://github.com/hpgrahsl/kafka-connect-mongodb#convention-based-deletion-on-null-values

Obviously, I'm somehow using your connector incorrectly.

Could you please clarify what you mean by deleting the record in MongoDB using convention?
Could you please indicate in your document an example message of what to sent into Kafka which would delete a document?

Many Thanks,

Kind Regards,
Luis

Convert CurrentBSONType is DOCUMENT, not when CurrentBSONType is INT32

Even if I used

distributed.properties

CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"

connector config

"connector.class":"at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",

  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "key.converter.schemas.enable": "false",
      
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter.schemas.enable": "false",

data

{ "name": "Anonymous", \n" +
" "age": 42,\n" +
" "active": true, \n" +
" "address": {"city": "Unknown", "country": "NoWhereLand"},\n" +
" "food": ["Austrian", "Italian"],\n" +
" "data": [{"k": "foo", "v": 1}],\n" +
" "lut": {"key1": 12.34, "key2": 23.45}\n" +
"}

error

Caused by: org.bson.BsonInvalidOperationException: readStartDocument can only be called when CurrentBSONType is DOCUMENT, not when CurrentBSONType is INT32.
at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:692)
at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:724)
at org.bson.AbstractBsonReader.readStartDocument(AbstractBsonReader.java:452)
at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:81)
at org.bson.BsonDocument.parse(BsonDocument.java:62)
at at.grahsl.kafka.connect.mongodb.converter.JsonRawStringRecordConverter.convert(JsonRawStringRecordConverter.java:32)
at at.grahsl.kafka.connect.mongodb.converter.SinkConverter.convert(SinkConverter.java:44)

drop SinkDocument in PostProcessor

Hey there,
first thank you for this kafka mongodb connector, the code looks well understandable.

I have a question about the PostProcessor interface. I want to implement a PostProcessor, that drops the SinkDocument, when the process(..) internals are failing. What's the best way to implement this? In the MongoDBSinkTask I see that the valueDoc is only added to the docsToWrite List, if there is a valueDoc present. I think what I could do now is to define a getNext in the PostProcessor, like this:

getNext().ifPresent(pp -> {
                BsonDocument keyDoc = doc.getKeyDoc().isPresent() ? doc.getKeyDoc().get().clone() : null;
                pp.process(new SinkDocument(keyDoc, null), orig);
            });

In order to create a null valueDoc for the next PostProcessor in the Chain. From my understanding, this does only work however, if I would put another PostProcessor at the end of the chain, am I right? So e.g. I would put the DocumentIdAdder at the end, to make sure that the valueDoc is null, if the predecessing PostProcessor (my implementation) failed.

Wouldn't it make sense to change the PostProcessor Interface to return an Optional<SinkDocument>, rather than leave it like public void process(SinkDocument doc, SinkRecord orig). Then, MongoDbSinkTask.buildWriteModel(..) would need to evaluate the return value, before the next processor receives the evaluated SinkDocument. In this case you might even not need to use getNext() in every PostProcessor implementation anymore.

This is more a question, rather than an issue right now. I would be happy if you share your thoughts about this.

Best regards,
Constantin

[feature] support JMX metrics

Similar to what other connectors offer, the MongoDB sink connector should expose JMX metrics in order to be able to monitor the processing of Kafka Connect Sink Records.

Among others, a few useful metrics could be:

  • the latest SinkRecord / BsonDocument (per topic?)
  • progress i.e. latest offset processed (per topic and parition?)
  • total number of processed SinkRecords (per topic?)
  • avg time in millis per processed SinkRecord (per topic?)
  • etc.

allow projection of fields for documents within arrays

while the current field projection mechanisms allow to project fields anywhere within nested documents it's not possible to project on fields found within documents which are themselves contained in arrays.

should work like the following based on this structure:

{ "array": [ {"k":123,"v":"abc"}, {"k":234,"v":"cde"}, {"k":345,"v":"def"} ] }

  • blacklisting on "array.k"

{ "array": [ {"v":"abc"}, {"v":"cde"}, {"v":"def"} ] }

  • whitelisting on "array.k"

{ "array": [ {"k":123}, {"k":234}, {"k":345} ] }

Connector Plugin not loaded

I have Kafka running on a K8s cluster using the confluent.io helm charts.

I then shelled into my kafka-connect pod and installed the mongo db connector. The installation works fine and my plugin path is automatically added to all the properties files. However when I curl into the /connector-plugins api endpoint it does not return the mongo connector in the list.
Env is AKS (Azure)
K8s version

Client Version: version.Info{Major:"1", Minor:"10", GitVersion:"v1.10.3", GitCommit:"2bba0127d85d5a46ab4b778548be28623b32d0b0", GitTreeState:"clean", BuildDate:"2018-05-21T09:17:39Z", GoVersion:"go1.9.3", Compiler:"gc", Platform:"windows/amd64"}
Server Version: version.Info{Major:"1", Minor:"11", GitVersion:"v1.11.3", GitCommit:"a4529464e4629c21224b3d52edfe0ea91b072862", GitTreeState:"clean", BuildDate:"2018-09-09T17:53:03Z", GoVersion:"go1.10.3", Compiler:"gc", Platform:"linux/amd64"}

Anything I am missing?

Sundip

kafka-connect-mongodb-1.2.0-jar-with-dependencies.jar contains superfluous dependencies

Hi @hpgrahsl, it appears as if the dependencies could be optimed a bit, as there are some Maven plug-in related things in there. This seems to be caused by this dependency:

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-connect-maven-plugin</artifactId>
    <version>${confluent.connect.plugin.version}</version>
</dependency>

Is this actually needed as a project dependency or should it rather be configured as a plug-in?

check CDC compatibility with Debezium Oracle connector events

Debezium introduced at least preliminary/preview support in the 0.8 release, see:

Setup a running local Oracle (XStream) environment to test compatibility with the current CDC events. The following links should be helpful:

License

Thanks for the work on this nice little driver.

Would you mind to clarify the license you're releasing under? Sorry if it's already mentioned and I missed it.

simple config option to delete records if value is null

as described in the README.md the sink connector can be operated in CDC mode. sometimes the handling of records produced by a fully fledged CDC source connector (like debezium) isn't needed.

instead the sink connector should just offer a simple configuration option which allows to delete sink records. a reasonable and simple convention - which is accordance to e.g. kafka's own topic compaction or debezium's tombstone events - would be to go for a deletion if the value of a sink record is null

Parse Json data in PostProcessor

Dear Sir,
Thanks for the great product. We tried and be able to sink data to Mongodb successfully. Below is the data in Mongodb.
{
"_id" : ObjectId("5a692f9742dc219e1e9ffcf5"),
"gateway_id" : "XX:XX:XX:XX:XX:XX",
"gateway_ip" : "10.0.0.1",
"device_id" : "YY:YY:YY:YY:YY:YY",
"device_type" : "bp",
"jdata" : "{"patientid":"1234","systolic":87,"diastolic":74,"pulserate":67}",
"data_timestamp" : "2018-01-01 09:15:00"
}

You can see the "jdata" is in a string in json format. The source data is from third part and we can't change it. So, I would like to use the PostProcess capability of your connector to parse, convert and update it before writing to Mongodb.

I have studied the class DocumentIdAdder.java. But, not so sure how to proceed. e.g. I don't know how to use SinkDocument and SinkRecord.

Regards,
wwm

The Renamer post processor does'nt call the next ones

Hi,

I've seen that when we use the RenameByMapping that extends the Renamer PostProcessor, it won't call the following postProcessors (if they exist).

I think this is a bug.

As the fix seems easy, I can provide a PR.

How to reproduce :

  • Create a sample post processor
  • Use this line when creating the connector : "mongodb.post.processor.chain":"at.grahsl.kafka.connect.mongodb.processor.field.renaming.RenameByMapping,your.sample.TestPostProcessor"

Expected Behaviour

The TestPostProcessor process method should be called after the process of RenameByMapping

Actual Behaviour

The TestPostProcessor process isn't called

Example for Java

Hello

i have kafka and mongoDb installed and running,
can you help me with a small java class example to run to send messages from MyTopic to Mongo?
( I do not have Confluent , simply kafka)

thank you

SerializationException

Hi,

I am following this blog post on medium and I'd like to persist data on MongoDB instead of Postgres.

When the connector try to read topics data, an error occurs linke bellow:

{ "state": "FAILED", "trace": "org.apache.kafka.connect.errors.DataException: recent-stats io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:96)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:453)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:287)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1\nCaused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!\n", "id": 2, "worker_id": "127.0.1.1:8083" }

But when I run the kakfa-avro-consumer the data is read successfuly.

Anybody have had the same problem?

ReplaceOneBusinessKeyFilterStrategy (Avro-Schema) - compound keys do not seem to be working correctly

I created a schema that looks something like this

schema = {
    "type": "record",
    "name": "TestPricingData",
    "namespace": "com.runtitle.data.kafka.avro",
    "doc": "Test Pricing Record",
    "fields": [
        {"name": "source_system_id", "type": "string"},
	{"name": "source_record_id", "type": "string"},
        {"name": "indexes", "type": { "type": "array", "items": "string"}},
        {"name": "raw_acres", "type": "int"}
    ]
}

the first two fields source_system_id and source_record_id together form a BusinessKey. My mongo sink config looks like this

config = {
  "name": "mongo-sink-pricing-data",
  "config": {
    "topics": "pricing-data",
    "connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://kafka.schema:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://kafka.schema:8081",
    "mongodb.connection.uri": "mongodb://docker.for.mac.localhost:27017/titletime?w=1&journal=true",
    "mongodb.document.id.strategy": "at.grahsl.kafka.connect.mongodb.processor.id.strategy.PartialValueStrategy",
    "mongodb.key.projection.list": "source_system_id, source_record_id",
    "mongodb.key.projection.type": "whitelist",
    "mongodb.collection": "pricingData",
    "mongodb.replace.one.strategy": "at.grahsl.kafka.connect.mongodb.writemodel.filter.strategy.ReplaceOneBusinessKeyFilterStrategy"
  }
}

However if I send two successive messages where I keep the same source_system_id and change the source_record_id, instead of creating a new record it updates the existing record. For e.g.
Message 1 - {source_system_id: "SYS1", source_record_id: "REC1"}
results in DB record
{_id: ObjectId("5ac704819b0241d8b69ad276"), source_system_id: "SYS1", source_record_id: "REC1"}
However if now i send
Message 2 - {source_system_id: "SYS1", source_record_id: "REC2"}
Instead of creating a new record, it updates the same record to
{_id: ObjectId("5ac704819b0241d8b69ad276"), source_system_id: "SYS1", source_record_id: "REC2"}

Recording the issue here, I will be looking into this at my end as well.

BSON dependency missing in pom.xml

Hi HP,

I think your current master is missing the following dependency in pom.xml:

<dependency>
  <groupId>org.mongodb</groupId>
  <artifactId>bson</artifactId>
  <version>${mongodb.driver.version}</version>
</dependency>

Best,
Jurgis

MongoDBConnector not assigned any tasks

Hi,

I have the following problem: I have 4 different topics, each connected to the same MongoDB instance via a separate MongoDBConnector. 2 out of the 4 connectors are running fine and have one task each assigned. The other 2 connectors don't have any task assigned (visible via the REST API, GET /connectors/{my_connector}/status).

  • Restarting the connector via POST /connectors/{my_connector}/restart does not help, after restart the connector still has no tasks
  • Completely deleting and re-creating the connector does not help either

Is there per chance any stale data on the connect worker that needs to be cleaned? Looking at the logs of the Kafka Connect Docker Container is overwhelming, since the other 2 working connectors are producing a lot of output and I don't know what to grep for.

Update: If I delete the original connector and re-create it under a different name, it seems to work: The connector assigns one task and works normally.

Any idea how to go about?

Mongo DB connection URI for SSL and sslAllowInvalidCertificates

Mongo DB connectivity with Shell is working fine.

> mongo localhost:37017/admin -u <username> -p <password> --authenticationDatabase admin --ssl --sslAllowInvalidCertificates

I can't able to connect to a mongo db with connection uri like below,

> mongo mongodb://<username>:<password>@localhost:27017/admin?ssl=true&sslAllowInvalidCertificates=true&authSource=admin

Error

2019-04-22T11:17:45.342-0400 E NETWORK  [thread1] SSL peer certificate validation failed: self signed certificate in certificate chain
2019-04-22T11:17:45.342-0400 E QUERY    [thread1] Error: socket exception [CONNECT_ERROR] for SSL peer certificate validation failed: self signed certificate in certificate chain :
connect@src/mongo/shell/mongo.js:251:13
@(connect):1:6
exception: connect failed

[1]-  Exit 1                  mongo mongodb://<username>:<password>@localhost:27017/admin?ssl=true
[2]+  Done                 sslAllowInvalidCertificates=true

Because mongo db kafka connector provides only URI format connection string for connectivity, Can you please update me how to use the sslAllowInvalidCertificates property in the connection string?

BUILD FAILURE

[WARNING] Error injecting: org.apache.avro.mojo.SchemaMojo
com.google.inject.ProvisionException: Unable to provision, see the following errors:

1) Error injecting constructor, java.lang.NoClassDefFoundError: org/codehaus/jackson/JsonPar
  at org.apache.avro.mojo.SchemaMojo.<init>(Unknown Source)
  while locating org.apache.avro.mojo.SchemaMojo

1 error
    at com.google.inject.internal.InjectorImpl$2.get (InjectorImpl.java:1025)
    at com.google.inject.internal.InjectorImpl.getInstance (InjectorImpl.java:1051)
    at org.eclipse.sisu.space.AbstractDeferredClass.get (AbstractDeferredClass.java:48)
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 5.600 s
[INFO] Finished at: 2018-04-17T12:42:59+03:00
[INFO] Final Memory: 22M/207M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.avro:avro-maven-plugin:1.8.2:schema (default) on project kafka-connect-mongodb
: Execution default of goal org.apache.avro:avro-maven-plugin:1.8.2:schema failed: Unable to load the mojo 'schema' in t
he plugin 'org.apache.avro:avro-maven-plugin:1.8.2'. A required class is missing: org/codehaus/jackson/JsonParseExceptio
n
[ERROR] -----------------------------------------------------
[ERROR] realm =    plugin>org.apache.avro:avro-maven-plugin:1.8.2
[ERROR] strategy = org.codehaus.plexus.classworlds.strategy.SelfFirstStrategy
[ERROR] urls[0] = file:/C:/Users/koa/.m2/repository/org/apache/avro/avro-maven-plugin/1.8.2/avro-maven-plugin-1.8.2.jar
[ERROR] urls[1] = file:/C:/Users/koa/.m2/repository/org/codehaus/plexus/plexus-interpolation/1.1/plexus-interpolation-1.
1.jar
[ERROR] urls[2] = file:/C:/Users/koa/.m2/repository/org/codehaus/plexus/plexus-utils/1.5.5/plexus-utils-1.5.5.jar
[ERROR] urls[3] = file:/C:/Users/koa/.m2/repository/junit/junit/3.8.1/junit-3.8.1.jar
[ERROR] urls[4] = file:/C:/Users/koa/.m2/repository/org/apache/maven/shared/file-management/1.2.1/file-management-1.2.1.
jar
[ERROR] urls[5] = file:/C:/Users/koa/.m2/repository/org/apache/maven/shared/maven-shared-io/1.1/maven-shared-io-1.1.jar
[ERROR] urls[6] = file:/C:/Users/koa/.m2/repository/org/apache/avro/avro-compiler/1.8.2/avro-compiler-1.8.2.jar
[ERROR] urls[7] = file:/C:/Users/koa/.m2/repository/org/apache/avro/avro/1.8.2/avro-1.8.2.jar
[ERROR] urls[8] = file:/C:/Users/koa/.m2/repository/com/thoughtworks/paranamer/paranamer/2.7/paranamer-2.7.jar
[ERROR] urls[9] = file:/C:/Users/koa/.m2/repository/org/xerial/snappy/snappy-java/1.1.1.3/snappy-java-1.1.1.3.jar
[ERROR] urls[10] = file:/C:/Users/koa/.m2/repository/org/apache/commons/commons-compress/1.8.1/commons-compress-1.8.1.ja
r
[ERROR] urls[11] = file:/C:/Users/koa/.m2/repository/org/tukaani/xz/1.5/xz-1.5.jar
[ERROR] urls[12] = file:/C:/Users/koa/.m2/repository/commons-lang/commons-lang/2.6/commons-lang-2.6.jar
[ERROR] urls[13] = file:/C:/Users/koa/.m2/repository/org/apache/velocity/velocity/1.7/velocity-1.7.jar
[ERROR] urls[14] = file:/C:/Users/koa/.m2/repository/commons-collections/commons-collections/3.2.1/commons-collections-3
.2.1.jar
[ERROR] urls[15] = file:/C:/Users/koa/.m2/repository/org/codehaus/jackson/jackson-core-asl/1.9.13/jackson-core-asl-1.9.1
3.jar
[ERROR] urls[16] = file:/C:/Users/koa/.m2/repository/org/codehaus/jackson/jackson-mapper-asl/1.9.13/jackson-mapper-asl-1
.9.13.jar
[ERROR] urls[17] = file:/C:/Users/koa/.m2/repository/joda-time/joda-time/2.7/joda-time-2.7.jar
[ERROR] urls[18] = file:/C:/Users/koa/.m2/repository/org/slf4j/slf4j-simple/1.7.7/slf4j-simple-1.7.7.jar
[ERROR] Number of foreign imports: 1
[ERROR] import: Entry[import  from realm ClassRealm[maven.api, parent: null]]
[ERROR]

Support for collection.name={$topic}

Hello!

I used another Kafka-Mongo sink connector a few months ago, and it has the feature to set the collection name to sink as "{$topic}", and i could listen to several topics and sink them to multiple collections.

If i'm not wrong, this connector do not have that feature. Is this doable? Would be a great help in my projects.

BsonInvalidOperationException: readStartDocument can only be called when CurrentBSONType is DOCUMENT, not when CurrentBSONType is STRING

Data from source system -
[email protected]# kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic TTDF.TCDCPOC_DATA_TYPES --from-beginning --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer

"57508564"      {"data":{"SEQNO":{"int":57508564},"TEXT":{"string":"Lorem ipsum dolor sit amet,"},"BIGNUM":{"long":11122233344447},"BINOBJ":{"bytes":"#~¦`¬| DATA IS STORED AS BINARY|>"},"CHAROBJ":{"string":"<text>THIS DATA IS STORED AS CLOB</text>"},"FLOATNUM":{"double":6.62607015E-34},"CHARVAR":{"string":"consectetur adipiscing elit,sed do eiusmod tempor incididunt ut labore et dolore magna aliqua."}},"headers":{"operation":"REFRESH","changeSequence":"","timestamp":"","streamPosition":"","transactionId":"","changeMask":null,"columnMask":null}}

^CProcessed a total of 6 messages

Schema registry -

{
  "subject": "TTDF.TCDCPOC_DATA_TYPES-value",
  "version": 3,
  "id": 12,
  "schema": "{"type":"record","name":"DataRecord","fields":[{"name":"data","type":{"type":"record","name":"Data","fields":[{"name":"SEQNO","type":["null","int"],"default":null},{"name":"TEXT","type":["null","string"],"default":null},{"name":"BIGNUM","type":["null","long"],"default":null},{"name":"BINOBJ","type":["null","bytes"],"default":null},{"name":"CHAROBJ","type":["null","string"],"default":null},{"name":"FLOATNUM","type":["null","double"],"default":null},{"name":"CHARVAR","type":["null","string"],"default":null}]}},{"name":"headers","type":{"type":"record","name":"Headers","fields":[{"name":"operation","type":{"type":"enum","name":"operation","symbols":["INSERT","UPDATE","DELETE","REFRESH"]}},{"name":"changeSequence","type":"string"},{"name":"timestamp","type":"string"},{"name":"streamPosition","type":"string"},{"name":"transactionId","type":"string"},{"name":"changeMask","type":["null","bytes"]},{"name":"columnMask","type":["null","bytes"]}]}}]}"
}

Errors -

[2019-02-12 12:28:48,364] ERROR WorkerSinkTask{id=mongo-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache
.kafka.connect.runtime.WorkerSinkTask:584)
org.bson.BsonInvalidOperationException: readStartDocument can only be called when CurrentBSONType is DOCUMENT, not when CurrentBSONType is STRING.
        at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:690)
        at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:722)
        at org.bson.AbstractBsonReader.readStartDocument(AbstractBsonReader.java:450)
        at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:81)
        at org.bson.BsonDocument.parse(BsonDocument.java:62)
        at at.grahsl.kafka.connect.mongodb.converter.JsonRawStringRecordConverter.convert(JsonRawStringRecordConverter.java:32)
        at at.grahsl.kafka.connect.mongodb.converter.SinkConverter.convert(SinkConverter.java:44)
        at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$buildWriteModel$3(MongoDbSinkTask.java:186)
        at java.util.ArrayList.forEach(ArrayList.java:1257)
        at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.buildWriteModel(MongoDbSinkTask.java:185)
        at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.processSinkRecords(MongoDbSinkTask.java:122)
        at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$null$0(MongoDbSinkTask.java:111)
        at java.util.ArrayList.forEach(ArrayList.java:1257)
        at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$put$1(MongoDbSinkTask.java:110)
        at java.util.HashMap.forEach(HashMap.java:1289)
        at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.put(MongoDbSinkTask.java:109)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:564)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2019-02-12 12:28:48,364] ERROR WorkerSinkTask{id=mongo-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)

Config file -

{
   "name": "mongo",
   "config": {

        "key.converter":"org.apache.kafka.connect.storage.StringConverter",
        "internal.key.converter":"org.apache.kafka.connect.storage.StringConverter",
        "internal.key.converter.schemas.enable":"false",
        "key.converter.schemas.enable": false,
        "key.ignore":"true",

        "value.converter":"io.confluent.connect.avro.AvroConverter",
        "internal.value.converter":"io.confluent.connect.avro.AvroConverter",
        "value.converter.schemas.enable": true,
        "internal.value.converter.schemas.enable":"true",

        "key.converter.schema.registry.url":"http://localhost:8081",
        "value.converter.schema.registry.url":"http://localhost:8081",


        "connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
        "topics":"TTDF.TCDCPOC_DATA_TYPES",
        "mongodb.connection.uri":"mongodb://xxxx:Password1@xxxx:27017/testdb?authSource=xxx",
        "mongodb.collection":"TCDCPOC_DATA_TYPES",

        "_comment":"transforms\":\"createKey",
        "_comment":"transforms.createKey.type:org.apache.kafka.connect.transforms.Flatten$Value",
        "_comment":"transforms.Flatten.delimiter:_",
        "_comment":"transforms.createKey.type:io.confluent.connect.transforms.Drop$Key",
        "_comment":"transforms.createKey.skip.missing.or.null\":\"true",
        "_comment":"transforms.createKey.type\":\"org.apache.kafka.connect.transforms.ValueToKey",
        "_comment":"transforms.createKey.fields\":\"data.SEQNO",
        "_comment":"transforms.createKey.static.key:test"
        }
}

Schema registry URL setting from control center is missing.

Hello Team,

I tried configuring the mongoDB sink connector through Confluent Control Center. I could not see "value.converter.schema.registry.url" property to set the schema registry URL. I had to use the connect REST API to update this property at sink connector instance level.

Not sure if this is an issue or expected behavior. Please help.

Update and Delete Operation for CDC Handler

Hi,

I've noticed that Debezium MongoDB has updated to version 0.6.0, and they have differentiated the _id field type in the key's payload, is it now possible for you to implement the "update" and "delete" operation for Debezium MongoDB CDC handler?

Thanks.

Cannot insert a field with '.' ('dot') in field name

Hello,
Not able to insert records whose field contains '.' (dot) in it.
java.lang.IllegalArgumentException: Invalid BSON field name kubernetes.io/config.source

Since MongoDB 3.6, support for key names with dot and dollar sign is allowed. https://docs.mongodb.com/manual/reference/limits/#Restrictions-on-Field-Names

Also tried to work around this by using "post processor - mongodb.field.renamer.regexp" but the post processor (as the name says) does this processing after converting the record to BSON so it breaks before the post-processor itself.

Connector config -

{
  "connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
  "topics": "topic-1",
  "mongodb.connection.uri": "mongodb://localhost:27017/test",
  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "key.converter.schemas.enable": "false",
  "value.converter.schemas.enable": "false",
  "mongodb.collection": "mdb-sink-test"
}

Full stack trace -

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
   at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
   at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
   at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
   at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
   at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Invalid BSON field name scheduler.alpha.kubernetes.io/critical-pod
   at org.bson.AbstractBsonWriter.writeName(AbstractBsonWriter.java:532)
   at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:114)
   at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:41)
   at org.bson.codecs.configuration.LazyCodec.encode(LazyCodec.java:37)
   at org.bson.codecs.EncoderContext.encodeWithChildContext(EncoderContext.java:91)
   at org.bson.codecs.BsonDocumentCodec.writeValue(BsonDocumentCodec.java:136)
   at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:115)
   at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:41)
   at org.bson.codecs.configuration.LazyCodec.encode(LazyCodec.java:37)
   at org.bson.codecs.EncoderContext.encodeWithChildContext(EncoderContext.java:91)
   at org.bson.codecs.BsonDocumentCodec.writeValue(BsonDocumentCodec.java:136)
   at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:115)
   at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:41)
   at org.bson.codecs.BsonDocumentWrapperCodec.encode(BsonDocumentWrapperCodec.java:60)
   at org.bson.codecs.BsonDocumentWrapperCodec.encode(BsonDocumentWrapperCodec.java:29)
   at com.mongodb.operation.BulkWriteBatch$WriteRequestEncoder.encode(BulkWriteBatch.java:398)
   at com.mongodb.operation.BulkWriteBatch$WriteRequestEncoder.encode(BulkWriteBatch.java:377)
   at org.bson.codecs.BsonDocumentWrapperCodec.encode(BsonDocumentWrapperCodec.java:63)
   at org.bson.codecs.BsonDocumentWrapperCodec.encode(BsonDocumentWrapperCodec.java:29)
   at com.mongodb.internal.connection.BsonWriterHelper.writeDocument(BsonWriterHelper.java:75)
   at com.mongodb.internal.connection.BsonWriterHelper.writePayload(BsonWriterHelper.java:59)
   at com.mongodb.internal.connection.CommandMessage.encodeMessageBodyWithMetadata(CommandMessage.java:146)
   at com.mongodb.internal.connection.RequestMessage.encode(RequestMessage.java:138)
   at com.mongodb.internal.connection.CommandMessage.encode(CommandMessage.java:60)
   at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:244)
   at com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:99)
   at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:444)
   at com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:72)
   at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:200)
   at com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:269)
   at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:131)
   at com.mongodb.operation.MixedBulkWriteOperation.executeCommand(MixedBulkWriteOperation.java:433)
   at com.mongodb.operation.MixedBulkWriteOperation.executeBulkWriteBatch(MixedBulkWriteOperation.java:259)
   at com.mongodb.operation.MixedBulkWriteOperation.access$700(MixedBulkWriteOperation.java:70)
   at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:203)
   at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:194)
   at com.mongodb.operation.OperationHelper.withReleasableConnection(OperationHelper.java:424)
   at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:194)
   at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:69)
   at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:193)
   at com.mongodb.client.internal.MongoCollectionImpl.executeBulkWrite(MongoCollectionImpl.java:468)
   at com.mongodb.client.internal.MongoCollectionImpl.bulkWrite(MongoCollectionImpl.java:448)
   at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.processSinkRecords(MongoDbSinkTask.java:148)
   at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$null$0(MongoDbSinkTask.java:118)
   at java.util.ArrayList.forEach(ArrayList.java:1257)
   at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$put$1(MongoDbSinkTask.java:117)
   at java.util.HashMap.forEach(HashMap.java:1289)
   at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.put(MongoDbSinkTask.java:112)
   at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
   ... 10 more```

Issue with CDC Sink Connector from Debezium MongoDB Connector

Using the example for Change Data Capture Mode from the README, we are seeing an issue that throws an unrecoverable exception right after the connector config is posted.
Connector def posted below:
image

And error thrown posted below:
image

The collection exists in mongo and mongo is reachable from kafka connect. The confluent schema registry is running and is reachable from kafka connect. Also, other kafka streams applications are able to deserialize these debezium-generated avro messages fine from the same topic the failing connector is reading from. Please let us know if you need any more information to help us solve. Thank you.

[feature] exponential backoff for retries

The current behaviour of the connector w.r.t retries is a pretty naive one. It would make a lot of sense to implement an exponential backoff mechanism as a more robust retry policy. Thus all errors for which it is reasonable to have retries in the first place should respect new retry settings that allow for non-linear timeouts, ideally including randomized jitter. Also this new approach to deal with timeouts on retries can/should completely replace what's currently there.

Key Renaming and skip null values are not Working

Hello,

Something is not ok when I make according changes with MongoDbSink because

  1. Json Keys are not renamed
  2. Deny insert into mongo json with null values is not working even if is set to true

Bellow you find attached my conf.properties

name=MyMongoDbSinkConnector
topics=test,testkafka
tasks.max=1
#key.converter=io.confluent.connect.avro.AvroConverter
#key.converter.schema.registry.url=http://localhost:8081
#value.converter=io.confluent.connect.avro.AvroConverter
#value.converter.schema.registry.url=http://localhost:8081
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
connector.class=at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector
#specific MongoDB sink connector props
#listed below are the defaults
mongodb.connection.uri=mongodb://localhost:27017/kafka?w=1&journal=true
mongodb.collection=kafkatopic
mongodb.max.num.retries=3
mongodb.retries.defer.timeout=5000
mongodb.value.projection.type=none
mongodb.value.projection.list=
mongodb.document.id.strategy=at.grahsl.kafka.connect.mongodb.processor.id.strategy.BsonOidStrategy
mongodb.document.id.strategies=
mongodb.key.projection.type=none
mongodb.key.projection.list=
mongodb.field.renamer.mapping=[{"oldName":"key.myta","newName":"key.batteryVoltage"}]
mongodb.field.renamer.regexp=[]
mongodb.post.processor.chain=at.grahsl.kafka.connect.mongodb.processor.field.renaming.RenameByM$
#mongodb.field.renamer.regexp=[{"regexp":"^key\..my.$","pattern":"my","replace":""},{"regexp$
mongodb.change.data.capture.handler=
mongodb.delete.on.null.values=true
mongodb.replace.one.strategy=at.grahsl.kafka.connect.mongodb.writemodel.filter.strategy.ReplaceOneDefaultFilterStrategy

Also I tried with
mongodb.field.renamer.mapping=[{"oldName":"myta","newName":"batteryVoltage"}] but still nothing the json data is same.

What I send to topic is
{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"field":"myta"}]},"payload":{"myta": "myta"}

Please can someone help me on that matter ?
I must tell that except that part were json data is not postprocessed by the connector, data that is fetched from those 2 topics is inserted in collection succesfully

Also I have a question regarding method of how this connector insert data to MongoDb.
For each message that appears in broker topic this connector creates a insert cmd ? or data received in topic si stored in a buffer for 10 seconds for example and then this buffer is pushed to database with a bulk write operation ?

If bulk write is available is there a chance to change parameter ? batch size, time untill batch expires and then bulk write data to database ?

Thank you in advance,

How to make it work faster?

Hi.
I need to thank you for this project. It works well in our company.
But I need to know if there is a way to make it transfer data faster or not.
Here is a screenshot of our Control Center:
screen shot 1397-08-20 at 5 39 12 pm

As you can see, it is over 9 million records behind producer.
So, how can I make it work faster?
Maybe having multiple sink that work as a consumer group? I don't know.
Your advice can really help us here.

Regards

[feature] rate limiting throughput

Actual throughput rate of the sink connector can already be controlled at least to some degree:

Both settings help to influence the write load that hits the sink data store but at the same time it is not flexible enough in order to achieve a somewhat predictable rate limiting behaviour. A new configuration option could offer something similar to the following:

  1. Introduce a timeout parameter specified either in long ms or maybe in Java 8 style Durations. This would introduce an "artificial pause" for the worker thread during the put() call - either directly or by signalling it to the sink connectors context.

  2. Introduce a related parameter (int n >= 1) which specifies after how many put() calls successfully processes batches within one and the same put() call the timeout should have an effect e.g. every N-th batch for a topic/collection within the put() call "pause" for 100 ms

[feature] fallback to kafka topic name

The current collection/topic aware configuration options expect to provide an explicit mapping between any kafka topic name and its corresponding mongodb collection name. Besides, there is the option to specify a default mongodb collection name in case no explicit mapping is provided but this means that all sink records of potentially different kafka topics are written to the same mongodb collection. Complex connector pipelines would almost always specify proper mappings for all kafka topics in question especially since most likely different sink connector behaviour is needed for processing the contained sink records.

However, for very simple pass-through kinds of streaming ETL pipelines from kafka to mongodb it might be useful to allow to fallback not to a single default topic name but to use the name of the kafka topic as is for the mongodb collection name.

support for DBZ postgres CDC

currently the connector is able to process DBZ mysql events. initial support for postgres is based on the observation that DBZ basically produces similar CDC events for mysql and postgres. thus both can for now be supported based on the same code written for mysql. refactorings would be great to avoid redundancy. until differences are found this should be the way to go for now.

[feature request] support multiple topics

One of the most requested features based on user feedback so far is to have support for storing kafka records from different topics into separate mongodb collections. see e.g. [#45,#48]

there are basically two ways to achieve that:

  1. the naive way is to simply write every record to a collection in mongodb with the same name that's specified as record name in the sink record. problem is that it would most likely not work since we have to expect different kafka topics to contain completely different data and therefore a lot of configuration settings might be different as well.

  2. a bit more complex: enable a more flexible / sophisticated way to define all relevant configuration options on a topic level so that users can fully benefit of this feature. ideally there is an overriding mechanism in place which allows to change settings specifically on a topic-basis. at the same time this means that there should be fallback settings when nothing is defined at the topic-level.

implementing the 2nd option should be the best way to go :)

Is a connection with a MongoDB replica set possible?

Hi
I'm currently looking for a way to catch data from my kafka topics into my MongoDB replica set.

I've seen, you have defined the property "mongodb.connection.uri".
Is it possible to provide here an uri string for an MongoDB replica set, like this:

mongodb://[USER]:[PASSWORD]@host_1:27017,host_2:27017,host_3:27017/dbmane?replicaSet=replica-set-name

any list config settings aren't cleaned from whitespaces

currently list config settings are only splitting by the "," character but the resulting strings aren't trimmed from whitespaces. this should be addressed so that users aren't facing weird / unexpected behavior because e.g. spaces my end up being part of fieldnames (see this related issues #29)

JsonParseException with a string key

Hi,

I am trying to sink to mongodb from a kafka topic that has a string key (simple text) and raw JSON as the message, and I get an exception when I post my connector config to kafka-connect. (I have data in the topic already so the connector processing starts as soon as I post to kafka-connect.) Any advice would be appreciated. Note that I'm not running with confluent platform, just straight kafka. In this example, the keys for all my kafka messages are "SZ0".

The library version tested is v1.0.0.

Exception:

[2018-05-04 13:58:39,898] ERROR WorkerSinkTask{id=bare-mongodbsink.json-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:515)
org.bson.json.JsonParseException: JSON reader was expecting a value but found 'SZ0'.
        at org.bson.json.JsonReader.readBsonType(JsonReader.java:251)
        at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:682)
        at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:724)
        at org.bson.AbstractBsonReader.readStartDocument(AbstractBsonReader.java:452)
        at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:81)
        at org.bson.BsonDocument.parse(BsonDocument.java:62)
        at at.grahsl.kafka.connect.mongodb.converter.JsonRawStringRecordConverter.convert(JsonRawStringRecordConverter.java:34)
        at at.grahsl.kafka.connect.mongodb.converter.SinkConverter.convert(SinkConverter.java:44)
        at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$buildWriteModel$1(MongoDbSinkTask.java:148)
        at java.util.ArrayList.forEach(ArrayList.java:1257)
        at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.buildWriteModel(MongoDbSinkTask.java:147)
        at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.put(MongoDbSinkTask.java:107)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:495)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:288)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2018-05-04 13:58:39,901] ERROR WorkerSinkTask{id=bare-mongodbsink.json-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:517)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:288)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.bson.json.JsonParseException: JSON reader was expecting a value but found 'SZ0'.
        at org.bson.json.JsonReader.readBsonType(JsonReader.java:251)
        at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:682)
        at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:724)
        at org.bson.AbstractBsonReader.readStartDocument(AbstractBsonReader.java:452)
        at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:81)
        at org.bson.BsonDocument.parse(BsonDocument.java:62)
        at at.grahsl.kafka.connect.mongodb.converter.JsonRawStringRecordConverter.convert(JsonRawStringRecordConverter.java:34)
        at at.grahsl.kafka.connect.mongodb.converter.SinkConverter.convert(SinkConverter.java:44)
        at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$buildWriteModel$1(MongoDbSinkTask.java:148)
        at java.util.ArrayList.forEach(ArrayList.java:1257)
        at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.buildWriteModel(MongoDbSinkTask.java:147)
        at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.put(MongoDbSinkTask.java:107)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:495)
        ... 10 more
[2018-05-04 13:58:39,901] ERROR WorkerSinkTask{id=bare-mongodbsink.json-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:173)
[2018-05-04 13:58:39,902] INFO stopping MongoDB sink task (at.grahsl.kafka.connect.mongodb.MongoDbSinkTask:182)                       

Here is the config I used:

{
    "name": "bare-mongodbsink.json",
    "config": {
        "connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
          
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "key.converter.schemas.enable": false,
          
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false,
          
        "topics": "sensed-metrics",

        "mongodb.document.id.strategy": "at.grahsl.kafka.connect.mongodb.processor.id.strategy.BsonOidStrategy",
        
        "mongodb.connection.uri": "mongodb://localhost:27017/testdb?w=1&journal=true",
        "mongodb.collection": "sensedMetricsTest"
    }
}

And here is the kafka connect properties file:

bootstrap.servers=localhost:9092

group.id=connect-cluster

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=false
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.storage.StringConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.topic=connect-offsets
offset.storage.replication.factor=1

config.storage.topic=connect-configs
config.storage.replication.factor=1

status.storage.topic=connect-status
status.storage.replication.factor=1

offset.flush.interval.ms=10000



plugin.path=/opt/kafka-connectors

Thanks

Sinking aggregated data does not work

Hi. I have an issue with Sinking data to MongoDB using your code.
I read similar issues like #55 and #36 and I know possible solutions but I couldn't make them work with my situation.
I have a stream that I aggregate values of it within a window using KSQL. here is my query:

CREATE TABLE WATCHTIME_AGG AS \
 SELECT ID, MAX(progress) AS progress, MAX(position) AS position, MAX(timestamp) AS timestamp \
   FROM WATCHTIME \
   WINDOW TUMBLING (SIZE 1 HOUR) \
   GROUP BY ID;

It creates a table which it's ROWKEYs are like this:

  • 149988257825337_6696933 : Window{start=1540024620000 end=-}
  • null_11646895 : Window{start=1540024620000 end=-}
  • 153974245826038_6222197 : Window{start=1540024620000 end=-}
  • 151415963389431_11197398 : Window{start=1540024620000 end=-}
  • 153173851492214_9207426 : Window{start=1540024620000 end=-}

But it can't be Sinked because of JSON error when key type is JSON:
com.fasterxml.jackson.core.JsonParseException: Invalid UTF-8 start byte 0x90\n at [Source: (byte[])\"null_12208937\u0000\u0000\u0001f\ufffd\u0007\ufffd\u0000\"; line: 1, column: 19]

And this error when key type is String:
org.bson.json.JsonParseException: JSON reader was expecting a value but found 'null_12208937'

The key should be a valid JSON or Avro if I'm not wrong, but window aggregation won't produce JSON and won't let me change it to a valid one.

What can I do to make it work?
It's a really simple use-case and should be straightforward. But at least I could not make it work.

README section for Logical Types Conversion

Hi,

While working with logical types, I noticed that Confluent have changed their AvroConverter to rely on the property "logicalType" instead of "connect.name".
This is with reference to this commit confluentinc/schema-registry@da4d548

So, for example, for Date logical type, I can now mention "logicalType":"date" in the relevant schema field instead of "connect.name":"org.apache.kafka.connect.data.Date"

The README explicitly advises to use the connect.name instead of logicalType. Is there some aspect I am missing?

cannot see kafka topic in mongodb. question about the connection issue

Hello,
Thank you for your article and help! I followed your instruction to set up the connection between Kafka and MongoDB (sink), After I run ./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./etc/sink-connect-mongodb.properties:", I run curl......, I can get the correct response," {"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":21}" and see "avrotest in kafka topic, however I got error below and cannot find “avrotest” collection in mongo:
......
[2018-06-25 15:03:25,419] INFO 0 have been processed (org.radarcns.connect.mongodb.MongoDbSinkTask:57)
[2018-06-25 15:03:25,420] ERROR WorkerSinkTask{id=kafka-connector-mongodb-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
java.lang.NoClassDefFoundError: com/mongodb/MongoException

at org.radarcns.connect.mongodb.MongoDbSinkTask.createMongoDbWriter(MongoDbSinkTask.java:114)
at org.radarcns.connect.mongodb.MongoDbSinkTask.start(MongoDbSinkTask.java:95)
at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:281)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.mongodb.MongoException
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 11 more
[2018-06-25 15:03:25,421] ERROR WorkerSinkTask{id=kafka-connector-mongodb-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:173)
[2018-06-25 15:03:25,421] INFO Stopping MongoDBSinkTask (org.radarcns.connect.mongodb.MongoDbSinkTask:163)
[2018-06-25 15:03:25,421] INFO Stopped MongoDBSinkTask (org.radarcns.connect.mongodb.MongoDbSinkTask:185).

I know the connection between Kafka and MongoDB is failed, but cannot figure out why. Can you help to give some suggestions? Do I need to set up plugin, configuration, workers.....specifically before running the connect-standalone? what parts did I miss?

Thank you very much for your time and help! my email is [email protected]
gw

add configurable batching support

similar to what other connectors allow for there should be an optional batching support during the processing of sink records. such a feature could be used to kind of rate limit or at least throttle throughput towards the sink. in that sense it's an upper bound defining that at most N records are written towards the sink in one go.

  • introduce new configuration option for batching e.g. mongodb.max.batch.size = N
  • make it optional and backwards compatible thus a value of 0 is neutral and does no batching at all
  • any value N >= 1 batches the list the of sink records accordingly which are handed to the put() method

Exactly once sematics and DLQ confuguration.

Hi,

We are trying to use the kafka connect with the mongodb sink connector in order to sink the genearated event messages to the mongodb. Below are few of our concerns and we need your suggestions on the same.

  1. Does the connect and connector support the idempotence and exactly once semantics? I tried to pass the producer and consumer config through the sink properties. But the connect starts with it default producer and consumer config. I used consumer.enable.auto.commit=false and so on.
  2. In case when mongodb server is down the connect retries and throws error 'Task is being killed and will not reciver until manually restarted'. Do we have a way to control this with some configuration? If the mongodb servers is up and running shouldn't it reconnect automatically? Or write to DLQ in case failed due to mongo or connector error? I tried errors.deadletterquee.tooic.name=my-connector-errors .

Thanks

allow configuration / customization for write model filters

the current behavior used for the ReplaceOneModel upsert sematic is fine for typical use cases but a bit too rigid for others. thus, there should be a customization option in order for users to employ individual write model strategies.

End2End Test failing because kafka-connect cannot see MongoDbSinkConnector plugin

When attempting to execute the end2end test MinimumViableIT, MongoDbSinkConnector is not listed as an available connector in the list. Part of error HTTP response body is:

"error_code":500,"message":"Failed to find any class that implements Connector and which name matches at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector, available connectors are: ...
goes on to list all of the other connectors (FileStreamSinkConnector, FileStreamSourceConnector.. etc.) that are available.

Development machine is a Mac running OS X High Sierra with Docker Version 18.03.0-ce-mac60.

Added following to /etc/hosts
127.0.0.1 mongodb zookeeper kafkabroker schemaregistry kafkaconnect

Some sort of a bootstrapping issue? Something simple that I am missing?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.