Giter Site home page Giter Site logo

marklogic / kafka-marklogic-connector Goto Github PK

View Code? Open in Web Editor NEW
8.0 11.0 16.0 33.99 MB

A Kafka connector for subscribing to topics and streaming messages into MarkLogic

Home Page: https://marklogic.github.io/kafka-marklogic-connector/

License: Other

Java 99.47% JavaScript 0.53%
kafka marklogic marklogic-database kafka-connector topic marklogic-server marklogic-data-hub stream-processing streaming

kafka-marklogic-connector's Introduction

MarkLogic Kafka Connector

The MarkLogic Kafka connector is a Kafka Connect sink and source connector. It supports writing data from a topic to a MarkLogic database as well as reading data from a MarkLogic database via an Optic query and sending the result to a topic.

Please see the User Guide for more information.

For developing and testing the MarkLogic Kafka connector, please see the guide on contributing. That guide also includes detailed instructions on how to use both Confluent Platform and Apache Kafka which may be useful if you are new to either product.

kafka-marklogic-connector's People

Contributors

bgeorge111 avatar billfarber avatar dependabot[bot] avatar iowusu avatar mitchshepherd avatar rahulvudutala avatar rjrudin avatar sameerapriyathamtadikonda avatar stuartmoorhouse avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-marklogic-connector's Issues

Temporal Support.

The connector should be able to write documents to temporal collections that are already created in the target database.

Refactor id.strategy package to Kafka-specific package

The com.marklogic.client.id.strategy package is specific to Kafka, as the main public interface method in IdStrategy depends on Kafka-specific inputs such as topic and partition. This should be moved to com.marklogic.kafka.connect.

Null objects break the the connector

When a null object is sent to the connect it throws this error

ERROR WorkerSinkTask{id=marklogic-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:558)
java.lang.NullPointerException
at com.marklogic.kafka.connect.sink.DefaultSinkRecordConverter.toContent(DefaultSinkRecordConverter.java:90)
at com.marklogic.kafka.connect.sink.DefaultSinkRecordConverter.convert(DefaultSinkRecordConverter.java:68)
at com.marklogic.kafka.connect.sink.MarkLogicSinkTask.lambda$put$0(MarkLogicSinkTask.java:123)
at java.util.ArrayList.forEach(ArrayList.java:1257)
at com.marklogic.kafka.connect.sink.MarkLogicSinkTask.put(MarkLogicSinkTask.java:119)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Ability to apply transformation and data ingestion through custom Rest API endpoint

We have started exploring MarkLogic Kafka Connector and have few queries mentioned below.

  • We store the data in ML in XML format whereas in Kafka the data is in Json. So is there any transformation mechanism/tool which allows us to convert the Json data to XML before sending to ML.
  • As the data is stored in ML through our own API’s but we didn’t find any properties to handle the API endpoint path although there are properties to define the host and port.

Provide example of properties in JSON format

If you want to add a connector via the rest endpoint you have to provide the configuration in yaml format.
Could we also provide a set of configurations in yaml format?

example of selecting connctor via rest api:

curl -d @./connect-mqtt-source.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors

whereas connect-mqtt-source.json contains the properties in json format

Simple SSL not working for MarkLogic

Issue
When SSL is turn on MarkLogic appserver, the kafka-marklogic-connector fails to start.

Error
ERROR WorkerSinkTask{id=marklogic-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
com.marklogic.client.ForbiddenUserException: Local message: User is not allowed to read resource at internal/forestinfo. Server Message: Server (not a REST instance?) did not respond with an expected REST Error message.
at com.marklogic.client.impl.OkHttpServices.checkStatus(OkHttpServices.java:4321)
at com.marklogic.client.impl.OkHttpServices.getResource(OkHttpServices.java:3056)
at com.marklogic.client.datamovement.impl.DataMovementServices.readForestConfig(DataMovementServices.java:51)
at com.marklogic.client.datamovement.impl.DataMovementManagerImpl.readForestConfig(DataMovementManagerImpl.java:160)
at com.marklogic.client.datamovement.impl.DataMovementManagerImpl.getForestConfig(DataMovementManagerImpl.java:155)
at com.marklogic.client.datamovement.impl.DataMovementManagerImpl.newWriteBatcher(DataMovementManagerImpl.java:103)
at com.marklogic.kafka.connect.sink.MarkLogicSinkTask.start(MarkLogicSinkTask.java:37)
at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:300)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2019-10-17 11:22:28,920] ERROR WorkerSinkTask{id=marklogic-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180)

Recreate the Error
set up marklogic appServer with SSL turn on
set up ml.connection.simpleSsl=true on config/marklogic-sink.properties
start the kakfka-marklogic-connector

Query for documents based on a query stored in MarkLogic

This is a nice-to-have after the MarkLogicSourceTask (in #4) is first released.

The idea is that instead of having to write a query in the connector config properties file, a URI could instead be provided. At startup, the connector would then retrieve that URI, which is expected to be a serialized CTS query.

So for example, we'd have a property like this in marklogic-source.properties:

ml.source.queryURI=/my-serialized-query.json

Standalone solution, with multiple tasks and multiple partitions of a Kafka topic, does not work properly

I have changed tasks.max=3 in marklogic-sink.properties and created 3 partitions in a Kafka topic.
When I have run this standalone solution, I have noticed the following behavior.

1- 3 connector-consumer-marklogic-sink are created.
2- 3 Partitions of kafka topic available.
3- Only 1 connector-consumer-marklogic-sink out of 3 consumes the messages from one partition of Kafka topic. While other connector-consumer-marklogic-sink are idle and messages in other partitions of topic could''t be consumed.

I can configure ml.dmsdk.threadCount for Marklogic but I want parallelism for kafka topic consumer side so I thought to run distributed solution but could not find configuration for marklogic-connect-distributed.properties.

Please provide the configuration about marklogic-connect-distributed.properties

Thanks,
Anil Kumar

Capability for configuring URIs other than UUIDs

Have some configurable URI customisation strategies. Couples of ideas are

  1. URI from the value of the document in a JSON path
  2. URI from the hash of the values from the document in a JSON path
  3. URI from the Kafka meta data values (///)

Notes:
a) All the strategies should be for the uri parts between URIPrefix and URISuffix
b) The uniqueness of the URI should be on the customer/administrator who configures the connector. If the URI is not unique, the connector will silently over-write the existing document.

Read data from ML via a source task

Similar to the sink task we have already, should be straightforward to integrate a QueryBatcher into a Kafka source task - https://docs.confluent.io/current/connect/concepts.html#connect-tasks - so that data can be read from ML into a Kafka queue.

The main point of interest is how a user can configure a query for the QueryBatcher to run. This should be very similar though to how the QueryMarkLogic NiFi processor works - just look at the different ways that a QueryBatcher can execute a query, and determine how to support each approach via Kafka properties.

marklogic-sink.properties doesn't contain new security options properties

marklogic-sink.properties is missing something like:

ml.connection.enableCustomSsl=false
ml.connection.customSsl.hostNameVerifier=ANY
ml.connection.customSsl.tlsVersion=TLSv1
ml.connection.customSsl.mutualAuth=false

When I completed the Quick Start on the README, the command bin/connect-standalone.sh config/marklogic-connect-standalone.properties config/marklogic-sink.properties throws errors about those missing properties.

I could do a pull request to add those properties, if they are the correct defaults for the Quick Start to work. Or update the Quick Start instructions to include adding them?

Allow the topic to be added as collections

As a kafka-marklogic-connector user I will like to add to like to have the option to the topics which the message was sent to as a collection name in MarkLogic.

On marklogic_sink.properties I should be able to set TopicsAsCollection=true/false to allow the topic which the message sent on to be added to the MarkLogic collection name.

Document how ID strategies are used

We're getting more complete docs together for the 1.7.0 release, but this will be handled separately. Need to do some testing of this feature first to ensure it is working properly and can thus be adequately documented.

Ability to configure DHF connection properties

With the current implementation of flow runner it depends on the database names being the standard MarkLogic Data Hub names. I'd like to request the ability to configure the names of the DB being used just incase there are non standard names in use.

Specify a transform and transform options

When constructing the WriteBatcher, we should allow for configuration properties that specify the name of an ML REST transform to use. This includes the name of a transform and N sets of parameters.

The properties could be defined like this:

ml.dmsdk.transform.name=name of server transform
ml.dmsdk.transform.parameters=param1name,param1value,param2name,param2value
ml.dmsdk.transform.parameters.delimiter=,

Add validators for certain config properties

"certain config properties" = need to analyze which ones will benefit from a validator. Would include properties with bounded sets of values, such as ml.connection.securityContextType.

Validators can be specified in the MarkLogicSinkConfig class.

Support Java 11

Version 1.2.1 from hub.confluent.io throws the following error upon initialization: java.lang.UnsupportedOperationException: clientBuilder.sslSocketFactory(SSLSocketFactory) not supported on JDK 9+

Full error log:


Sep 24 16:45:58 my.domain.com connect-distributed[29018]: [2020-09-24 16:45:58,578] INFO Kafka commitId: dad78e2df6b714e3 (org.apache.kafka.common.utils.AppInfoParser:118)

Sep 24 16:45:58 my.domain.com connect-distributed[29018]: [2020-09-24 16:45:58,578] INFO Kafka startTimeMs: 1600980358578 (org.apache.kafka.common.utils.AppInfoParser:119)

Sep 24 16:45:58 my.domain.com connect-distributed[29018]: [2020-09-24 16:45:58,579] INFO [Worker clientId=connect-1, groupId=connect-cluster] Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1132)

Sep 24 16:45:58 my.domain.com connect-distributed[29018]: [2020-09-24 16:45:58,579] INFO [Consumer clientId=connector-consumer-testMarkLogic-0, groupId=connect-testMarkLogic] Subscribed to topic(s): test (org.apache.kafka.clients.consumer.KafkaConsumer:974)

Sep 24 16:45:58 my.domain.com connect-distributed[29018]: [2020-09-24 16:45:58,579] INFO Starting (com.marklogic.kafka.connect.sink.MarkLogicSinkTask:35)

Sep 24 16:45:58 my.domain.com connect-distributed[29018]: [2020-09-24 16:45:58,580] ERROR WorkerSinkTask{id=testMarkLogic-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:186)

Sep 24 16:45:58 my.domain.com connect-distributed[29018]: java.lang.UnsupportedOperationException: clientBuilder.sslSocketFactory(SSLSocketFactory) not supported on JDK 9+

Sep 24 16:45:58 my.domain.com connect-distributed[29018]: at okhttp3.internal.platform.Jdk9Platform.trustManager(Jdk9Platform.java:81)

Sep 24 16:45:58 my.domain.com connect-distributed[29018]: at okhttp3.internal.platform.Platform.buildCertificateChainCleaner(Platform.java:176)

Sep 24 16:45:58 my.domain.com connect-distributed[29018]: at okhttp3.OkHttpClient$Builder.sslSocketFactory(OkHttpClient.java:673)

Sep 24 16:45:58 my.domain.com connect-distributed[29018]: at com.marklogic.client.impl.OkHttpServices.connect(OkHttpServices.java:344)

Sep 24 16:45:58 my.domain.com connect-distributed[29018]: at com.marklogic.client.DatabaseClientFactory.newClient(DatabaseClientFactory.java:1218)

Sep 24 16:45:58 my.domain.com connect-distributed[29018]: at com.marklogic.client.ext.DefaultConfiguredDatabaseClientFactory.newDatabaseClient(DefaultConfiguredDatabaseClientFactory.java:76)```

Upgrade to new jar versions.

The connector is using older versions of data hub and client api jars. Also, the kafka connect-api and connect-json jars are also old by view versions. The ticket is to build a newer connector version with new versions of dependent jars.

While locally working on newer versions against latest confluent kafka distribution, faced a problem of jackson version mismatch.
The solution was to exclude jackson modules in the connector jar. This aspect also might need to looked into. All kafka distributions might not have these jars provided during run time.

Log which record failed when a batch fails

The connector should handle errors that may occur on the writing of records to MarkLogic.

Minimally the error should be logged, and the number (and id's?) of the records which failed to commit should be output.

Ideally, the failed records would be retried a configurable number of times. The failed records' payloads could optionally be saved to disk or logged.

Deprecate ml.dmsdk.* properties in favor of ml.sink.dmsdk.*

I'm realizing that "ml.dmsdk" is too generic for the properties that are used for ingesting data. "ml.sink.dmsdk." would be more meaningful, and it also then allows for DMSDK to be used for other use cases - namely, export (for which we'd likely do "ml.export.*").

Support Kafka's dead letter queue

This is done by accessing the ErrantRecordReporter via the SinkTaskContext that is passed into the initialize method.

More info at https://developer.confluent.io/learn-kafka/kafka-connect/error-handling-and-dead-letter-queues/

One challenge here is that we may not be able to do this when using a WriteBatcher since each DocumentWriteOperation does not have the associated SinkRecord. So we likely can only do this when an error occurs before the SinkRecord is converted into a DocumentWriteOperation, which would still be useful - e.g. for conversion errors.

Must also account for supporting Kafka < 2.6 per this info in the Kafka javadocs:

This method was added in Apache Kafka 2.6. Sink tasks that use this method but want to maintain backward 
compatibility so they can also be deployed to older Connect runtimes should guard the call to this method 
with a try-catch block, since calling this method will result in a NoSuchMethodException or 
NoClassDefFoundError when the sink connector is deployed to Connect runtimes older than Kafka 2.6. 
For example:

      ErrantRecordReporter reporter;
      try {
          reporter = context.errantRecordReporter();
      } catch (NoSuchMethodError | NoClassDefFoundError e) {
          reporter = null;
      }

Document SSL configuration within Kafka

The marklogic-config-standalone.properties file has the config shown below for configuring SSL within Kafka. That file isn't really an appropriate place though, as it significantly increases the size of the file and makes it more difficult for a user to configure. Configuring SSL within Kafka should be addressed by the user guide for the connector, and ideally that's done by linking to something in the Kafka docs.

# The next two sections are necessary to establish an SSL connection to the Kafka servers.
# To enable SSL, uncomment and customize the lines that start with "#* " in those two sections.

# SSL connection properties
# For more information, see https://docs.confluent.io/current/kafka/encryption.html#encryption-ssl-connect
#   These top-level settings are used by the Connect worker for group coordination and to read and write to the internal
#   topics which are used to track the cluster's state (e.g. configs and offsets).
#* security.protocol=SSL
# You must create a truststore that contains either the server certificate or a trusted CA that signed the server cert.
# This is how I did it with keytools:
#   keytool -keystore kafka.truststore.jks -alias caroot -import -file ca-cert -storepass "XXXXX" -keypass "XXXXX" -noprompt
#* ssl.truststore.location=/secure/path/to/certs/kafka.client.truststore.jks
#* ssl.truststore.password=truststorePassphrase
# For now, turn off hostname verification since we're using self-signed certificates
# This might also be fixable by fixing the "Subject Alternative Name (SAN)", but I'm not a cert expert.
#* ssl.endpoint.identification.algorithm=

# Yes, both of these sections are required.
#   Connect workers manage the producers used by source connectors and the consumers used by sink connectors.
#   So, for the connectors to leverage security, you also have to override the default producer/consumer
#   configuration that the worker uses.
#* consumer.bootstrap.servers=localhost:9093
#* consumer.security.protocol=SSL
#* consumer.ssl.truststore.location=/secure/path/to/certs/kafka.client.truststore.jks
#* consumer.ssl.truststore.password=truststorePassphrase
#* consumer.ssl.endpoint.identification.algorithm=

The marklogic-sink.properties file likewise has config that is generic to configuring SSL and isn't specific to the connector. This should be in a user guide as well, or linked to via the user guide.

# You must also ensure that the server cert or the signing CA cert is imported in the JVMs cacerts file.
# These commands may be used to get the server cert and to import it into your cacerts file.
# Don't forget to customize the commands for your particular case.
#   openssl x509 -in <(openssl s_client -connect <server>:8004 -prexit 2>/dev/null) -out ~/example.crt
#   sudo keytool -importcert -file ~/example.crt -alias <server> -keystore /path/to/java/lib/security/cacerts -storepass <storepass-password>

Code refactoring: Apply factory / strategy pattern in handling additional formats.

Apply factory / strategy pattern in handling additional formats. Now, the DefaultSinkRecordConverter is handling all formats with if loops which can be made better in code maintenance by applying a strategy (or factory as it applies) pattern. This issue should not add / subtract any core features of the product and is only a tech debt.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.