Giter Site home page Giter Site logo

ibm-messaging / kafka-connect-mq-source Goto Github PK

View Code? Open in Web Editor NEW
87.0 28.0 83.0 269 KB

This repository contains a Kafka Connect source connector for copying data from IBM MQ into Apache Kafka.

License: Apache License 2.0

Java 99.00% Shell 0.24% Dockerfile 0.76%
ibm-mq kafka-connect

kafka-connect-mq-source's Introduction

Kafka Connect source connector for IBM MQ

kafka-connect-mq-source is a Kafka Connect source connector for copying data from IBM MQ into Apache Kafka.

The connector is supplied as source code which you can easily build into a JAR file.

Note: A sink connector for IBM MQ is also available on GitHub.

Contents

Building the connector

To build the connector, you must have the following installed:

Clone the repository with the following command:

git clone https://github.com/ibm-messaging/kafka-connect-mq-source.git

Change directory into the kafka-connect-mq-source directory:

cd kafka-connect-mq-source

Build the connector using Maven:

mvn clean package

Once built, the output is a single JAR called target/kafka-connect-mq-source-<VERSION>-jar-with-dependencies.jar which contains all of the required dependencies.

Running the connector

For step-by-step instructions, see the following guides for running the connector:

To run the connector, you must have:

  • The JAR from building the connector
  • A properties file containing the configuration for the connector
  • Apache Kafka 2.6.2 or later, either standalone or included as part of an offering such as IBM Event Streams
  • IBM MQ v9 or later, or the IBM MQ on Cloud service

The connector can be run in a Kafka Connect worker in either standalone (single process) or distributed mode. It's a good idea to start in standalone mode.

Running in standalone mode

You need two configuration files, one for the configuration that applies to all of the connectors such as the Kafka bootstrap servers, and another for the configuration specific to the MQ source connector such as the connection information for your queue manager. For the former, the Kafka distribution includes a file called connect-standalone.properties that you can use as a starting point. For the latter, you can use config/mq-source.properties in this repository.

The connector connects to MQ using either a client or a bindings connection. For a client connection, you must provide the name of the queue manager, the connection name (one or more host/port pairs) and the channel name. In addition, you can provide a user name and password if the queue manager is configured to require them for client connections. If you look at the supplied config/mq-source.properties, you'll see how to specify the configuration required. For a bindings connection, you must provide provide the name of the queue manager and also run the Kafka Connect worker on the same system as the queue manager.

To run the connector in standalone mode from the directory into which you installed Apache Kafka, you use a command like this:

bin/connect-standalone.sh connect-standalone.properties mq-source.properties

Running in distributed mode

You need an instance of Kafka Connect running in distributed mode. The Kafka distribution includes a file called connect-distributed.properties that you can use as a starting point, or follow Running with Docker or Deploying to Kubernetes

To start the MQ connector, you can use config/mq-source.json in this repository after replacing all placeholders and use a command like this:

curl -X POST -H "Content-Type: application/json" http://localhost:8083/connectors \
  --data "@./config/mq-source.json"

Running with Docker

This repository includes an example Dockerfile to run Kafka Connect in distributed mode. It also adds in the MQ source connector as an available connector plugin. It uses the default connect-distributed.properties and connect-log4j.properties files.

  1. mvn clean package
  2. docker build -t kafkaconnect-with-mq-source:<TAG> .
  3. docker run -p 8083:8083 kafkaconnect-with-mq-source:<TAG>

Substitute <TAG> with the version of the connector or latest to use the latest version.

NOTE: To provide custom properties files create a folder called config containing the connect-distributed.properties and connect-log4j.properties files and use a Docker volume to make them available when running the container like this:

docker run -v $(pwd)/config:/opt/kafka/config -p 8083:8083 kafkaconnect-with-mq-source:<TAG>

To start the MQ connector, you can use config/mq-source.json in this repository after replacing all placeholders and use a command like this:

curl -X POST -H "Content-Type: application/json" http://localhost:8083/connectors \
  --data "@./config/mq-source.json"

Deploying to Kubernetes

This repository includes a Kubernetes yaml file called kafka-connect.yaml. This will create a deployment to run Kafka Connect in distributed mode and a service to access the deployment.

The deployment assumes the existence of a Secret called connect-distributed-config and a ConfigMap called connect-log4j-config. These can be created using the default files in your Kafka install, however it is easier to edit them later if comments and whitespaces are trimmed before creation.

Creating Kafka Connect configuration Secret and ConfigMap

Create Secret for Kafka Connect configuration:

  1. cp kafka/config/connect-distributed.properties connect-distributed.properties.orig
  2. sed '/^#/d;/^[[:space:]]*$/d' < connect-distributed.properties.orig > connect-distributed.properties
  3. kubectl -n <namespace> create secret generic connect-distributed-config --from-file=connect-distributed.properties

Create ConfigMap for Kafka Connect Log4j configuration:

  1. cp kafka/config/connect-log4j.properties connect-log4j.properties.orig
  2. sed '/^#/d;/^[[:space:]]*$/d' < connect-log4j.properties.orig > connect-log4j.properties
  3. kubectl -n <namespace> create configmap connect-log4j-config --from-file=connect-log4j.properties

Creating Kafka Connect deployment and service in Kubernetes

NOTE: You will need to build the Docker image and push it to your Kubernetes image repository. Remember that the supplied Dockerfile is just an example and you will have to modify it for your needs. You might need to update the image name in the kafka-connect.yaml file.

  1. Update the namespace in kafka-connect.yaml
  2. kubectl -n <namespace> apply -f kafka-connect.yaml
  3. curl <serviceIP>:<servicePort>/connector-plugins to see whether the MQ source connector is available to use

Deploying to OpenShift using Strimzi

This repository includes a Kubernetes yaml file called strimzi.kafkaconnector.yaml for use with the Strimzi operator. Strimzi provides a simplified way of running the Kafka Connect distributed worker, by defining either a KafkaConnect resource or a KafkaConnectS2I resource.

The KafkaConnectS2I resource provides a nice way to have OpenShift do all the work of building the Docker images for you. This works particularly nicely combined with the KafkaConnector resource that represents an individual connector.

The following instructions assume you are running on OpenShift and have Strimzi 0.16 or later installed.

Start a Kafka Connect cluster using KafkaConnectS2I

  1. Create a file called kafka-connect-s2i.yaml containing the definition of a KafkaConnectS2I resource. You can use the examples in the Strimzi project to get started.
  2. Configure it with the information it needs to connect to your Kafka cluster. You must include the annotation strimzi.io/use-connector-resources: "true" to configure it to use KafkaConnector resources so you can avoid needing to call the Kafka Connect REST API directly.
  3. oc apply -f kafka-connect-s2i.yaml to create the cluster, which usually takes several minutes.

Add the MQ source connector to the cluster

  1. mvn clean package to build the connector JAR.
  2. mkdir my-plugins
  3. cp target/kafka-connect-mq-source-*-jar-with-dependencies.jar my-plugins
  4. oc start-build <kafkaconnectClusterName>-connect --from-dir ./my-plugins to add the MQ source connector to the Kafka Connect distributed worker cluster. Wait for the build to complete, which usually takes a few minutes.
  5. oc describe kafkaconnects2i <kafkaConnectClusterName> to check that the MQ source connector is in the list of available connector plugins.

Start an instance of the MQ source connector using KafkaConnector

  1. cp deploy/strimzi.kafkaconnector.yaml kafkaconnector.yaml
  2. Update the kafkaconnector.yaml file to replace all of the values in <>, adding any additional configuration properties.
  3. oc apply -f kafkaconnector.yaml to start the connector.
  4. oc get kafkaconnector to list the connectors. You can use oc describe to get more details on the connector, such as its status.

Data formats

Kafka Connect is very flexible but it's important to understand the way that it processes messages to end up with a reliable system. When the connector encounters a message that it cannot process, it stops rather than throwing the message away. Therefore, you need to make sure that the configuration you use can handle the messages the connector will process.

This is rather complicated and it's likely that a future update of the connector will simplify matters.

Each message in Kafka Connect is associated with a representation of the message format known as a schema. Each Kafka message actually has two parts, key and value, and each part has its own schema. The MQ source connector does not currently make much use of message keys, but some of the configuration options use the word Value because they refer to the Kafka message value.

When the MQ source connector reads a message from MQ, it chooses a schema to represent the message format and creates an internal object called a record containing the message value. This conversion is performed using a record builder. Each record is then processed using a converter which creates the message that's published on a Kafka topic.

There are two record builders supplied with the connector, although you can write your own. The basic rule is that if you just want the message to be passed along to Kafka unchanged, the default record builder is probably the best choice. If the incoming data is in JSON format and you want to use a schema based on its structure, use the JSON record builder.

There are three converters built into Apache Kafka. You need to make sure that the incoming message format, the setting of the mq.message.body.jms configuration, the record builder and converter are all compatible. By default, everything is just treated as bytes but if you want the connector to understand the message format and apply more sophisticated processing such as single-message transforms, you'll need a more complex configuration. The following table shows the basic options that work.

Record builder class Incoming MQ message mq.message.body.jms Converter class Outgoing Kafka message
com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder Any false (default) org.apache.kafka.connect.converters.ByteArrayConverter Binary data
com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder JMS BytesMessage true org.apache.kafka.connect.converters.ByteArrayConverter Binary data
com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder JMS TextMessage true org.apache.kafka.connect.storage.StringConverter String data
com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder JSON, may have schema Not used org.apache.kafka.connect.json.JsonConverter JSON, no schema

There's no single configuration that will always be right, but here are some high-level suggestions.

  • Pass unchanged binary (or string) data as the Kafka message value
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
  • Message format is MQSTR, pass string data as the Kafka message value
mq.message.body.jms=true
value.converter=org.apache.kafka.connect.storage.StringConverter
  • Messages are JMS BytesMessage, pass byte array as the Kafka message value
mq.message.body.jms=true
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
  • Messages are JMS TextMessage, pass string data as the Kafka message value
mq.message.body.jms=true
value.converter=org.apache.kafka.connect.storage.StringConverter

The gory detail

The messages received from MQ are processed by a record builder which builds a Kafka Connect record to represent the message. There are two record builders supplied with the MQ source connector. The connector has a configuration option mq.message.body.jms that controls whether it interprets the MQ messages as JMS messages or regular MQ messages.

Record builder class mq.message.body.jms Incoming message body Value schema Value class
com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder false (default) Any OPTIONAL_BYTES byte[]
com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder true JMS BytesMessage null byte[]
com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder true JMS TextMessage null String
com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder true Everything else EXCEPTION EXCEPTION
com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder Not used JSON Depends on message Depends on message

You must then choose a converter than can handle the value schema and class. There are three basic converters built into Apache Kafka, with the likely useful combinations in bold.

Converter class Output for byte[] Output for String Output for compound schema
org.apache.kafka.connect.converters.ByteArrayConverter Binary data EXCEPTION EXCEPTION
org.apache.kafka.connect.storage.StringConverter Works, not useful String data Works, not useful
org.apache.kafka.connect.json.JsonConverter Base-64 JSON String JSON String JSON data

Key support and partitioning

By default, the connector does not use keys for the Kafka messages it publishes. It can be configured to use the JMS message headers to set the key of the Kafka records. You could use this, for example, to use the MQMD correlation identifier as the partitioning key when the messages are published to Kafka. There are four valid values for the mq.record.builder.key.header that controls this behavior.

mq.record.builder.key.header Key schema Key class Recommended value for key.converter
JMSMessageID OPTIONAL_STRING String org.apache.kafka.connect.storage.StringConverter
JMSCorrelationID OPTIONAL_STRING String org.apache.kafka.connect.storage.StringConverter
JMSCorrelationIDAsBytes OPTIONAL_BYTES byte[] org.apache.kafka.connect.converters.ByteArrayConverter
JMSDestination OPTIONAL_STRING String org.apache.kafka.connect.storage.StringConverter

In MQ, the message ID and correlation ID are both 24-byte arrays. As strings, the connector represents them using a sequence of 48 hexadecimal characters.

Accessing MQMD fields

If you write your own RecordBuilder, you can access the MQMD fields of the MQ messages as JMS message properties. By default, only a subset of the MQMD fields are available, but you can get access to all of them by setting the configuration mq.message.mqmd.read. For more information, see JMS message object properties in the MQ documentation.

Security

The connector supports authentication with user name and password and also connections secured with TLS using a server-side certificate and mutual authentication with client-side certificates. You can also choose whether to use connection security parameters (MQCSP) depending on the security settings you're using in MQ.

Setting up MQ connectivity using TLS with a server-side certificate

To enable use of TLS, set the configuration mq.ssl.cipher.suite to the name of the cipher suite which matches the CipherSpec in the SSLCIPH attribute of the MQ server-connection channel. Use the table of supported cipher suites for MQ 9.1 here as a reference. Note that the names of the CipherSpecs as used in the MQ configuration are not necessarily the same as the cipher suite names that the connector uses. The connector uses the JMS interface so it follows the Java conventions.

You will need to put the public part of the queue manager's certificate in the JSSE truststore used by the Kafka Connect worker that you're using to run the connector. If you need to specify extra arguments to the worker's JVM, you can use the EXTRA_ARGS environment variable.

Setting up MQ connectivity using TLS for mutual authentication

You will need to put the public part of the client's certificate in the queue manager's key repository. You will also need to configure the worker's JVM with the location and password for the keystore containing the client's certificate. Alternatively, you can configure a separate keystore and truststore for the connector.

Security troubleshooting

For troubleshooting, or to better understand the handshake performed by the IBM MQ Java client application in combination with your specific JSSE provider, you can enable debugging by setting javax.net.debug=ssl in the JVM environment.

Configuration

The configuration options for the Kafka Connect source connector for IBM MQ are as follows:

Name Description Type Default Valid values
topic The name of the target Kafka topic string Topic name
mq.queue.manager The name of the MQ queue manager string MQ queue manager name
mq.connection.mode The connection mode - bindings or client string client client, bindings
mq.connection.name.list List of connection names for queue manager string host(port)[,host(port),...]
mq.channel.name The name of the server-connection channel string MQ channel name
mq.queue The name of the source MQ queue string MQ queue name
mq.user.name The user name for authenticating with the queue manager string User name
mq.password The password for authenticating with the queue manager string Password
mq.user.authentication.mqcsp Whether to use MQ connection security parameters (MQCSP) boolean true
mq.ccdt.url The URL for the CCDT file containing MQ connection details string URL for obtaining a CCDT file
mq.record.builder The class used to build the Kafka Connect record string Class implementing RecordBuilder
mq.message.body.jms Whether to interpret the message body as a JMS message type boolean false
mq.record.builder.key.header The JMS message header to use as the Kafka record key string JMSMessageID, JMSCorrelationID, JMSCorrelationIDAsBytes, JMSDestination
mq.jms.properties.copy.to.kafka.headers Whether to copy JMS message properties to Kafka headers boolean false
mq.ssl.cipher.suite The name of the cipher suite for TLS (SSL) connection string Blank or valid cipher suite
mq.ssl.peer.name The distinguished name pattern of the TLS (SSL) peer string Blank or DN pattern
mq.ssl.keystore.location The path to the JKS keystore to use for SSL (TLS) connections string JVM keystore Local path to a JKS file
mq.ssl.keystore.password The password of the JKS keystore to use for SSL (TLS) connections string
mq.ssl.truststore.location The path to the JKS truststore to use for SSL (TLS) connections string JVM truststore Local path to a JKS file
mq.ssl.truststore.password The password of the JKS truststore to use for SSL (TLS) connections string
mq.ssl.use.ibm.cipher.mappings Whether to set system property to control use of IBM cipher mappings boolean
mq.batch.size The maximum number of messages in a batch (unit of work) integer 250 1 or greater
mq.message.mqmd.read Whether to enable reading of all MQMD fields boolean false

Using a CCDT file

Some of the connection details for MQ can be provided in a CCDT file by setting mq.ccdt.url in the MQ source connector configuration file. If using a CCDT file the mq.connection.name.list and mq.channel.name configuration options are not required.

Externalizing secrets

KIP 297 introduced a mechanism to externalize secrets to be used as configuration for Kafka connectors.

Example: externalizing secrets with FileConfigProvider

Given a file mq-secrets.properties with the contents:

secret-key=password

Update the worker configuration file to specify the FileConfigProvider which is included by default:

# Additional properties for the worker configuration to enable use of ConfigProviders
# multiple comma-separated provider types can be specified here
config.providers=file
config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider

Update the connector configuration file to reference secret-key in the file:

mq.password=${file:mq-secret.properties:secret-key}
Using FileConfigProvider in Kubernetes

To use a file for the mq.password in Kubernetes, you create a Secret using the file as described in the Kubernetes docs.

Troubleshooting

Unable to connect to Kafka

You may receive an org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed error when trying to run the MQ source connector using SSL to connect to your Kafka cluster. In the case that the error is caused by the following exception: Caused by: java.security.cert.CertificateException: No subject alternative DNS name matching XXXXX found., Java may be replacing the IP address of your cluster with the corresponding hostname in your /etc/hosts file. For example, to push Docker images to a custom Docker repository, you may add an entry in this file which corresponds to the IP of your repository e.g. 123.456.78.90 mycluster.icp. To fix this, you can comment out this line in your /etc/hosts file.

Unsupported cipher suite

When configuring TLS connection to MQ, you may find that the queue manager rejects the cipher suite, in spite of the name looking correct. There are two different naming conventions for cipher suites (https://www.ibm.com/support/knowledgecenter/SSFKSJ_9.1.0/com.ibm.mq.dev.doc/q113220_.htm). Setting the configuration option mq.ssl.use.ibm.cipher.mappings=false often resolves cipher suite problems.

Support

Commercial support for this connector is available for customers with a support entitlement for IBM Event Automation or IBM Cloud Pak for Integration.

Issues and contributions

For issues relating specifically to this connector, please use the GitHub issue tracker. If you do want to submit a Pull Request related to this connector, please read the contributing guide first to understand how to sign your commits.

License

Copyright 2017, 2020, 2023 IBM Corporation

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.The project is licensed under the Apache 2 license.

kafka-connect-mq-source's People

Contributors

a-c-brown avatar ajborley avatar andrewdunnings avatar andrewjschofield avatar bltavares avatar dalelane avatar dependabot[bot] avatar emmahumber avatar gmcrobert avatar jhughes24816 avatar joel-hanson avatar katheris avatar khetanshu avatar mbechto avatar mimaison avatar slachiewicz avatar tagarr avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-connect-mq-source's Issues

Integration with Schema registry and JSON to AVRO Conversion

Hi Team,

We are using this connector with a distributed worker. There is a use-case to take incoming JSON message from IBM MQ , integrate the worker with Hortonwoks schema registry and send the data in AVRO format to Kafka. Is this possible with this connector (we are using v1.2.0)

Thanks & Regards

Unable to start the connector from a remote server

Hello,

I am trying to connect to a Kafka broker on a remote server over TCP. The Kafka server/zookeeper is running on a Server A and I am trying to start the adapter on Server B. The attempt is getting timed out with below error (For this testing we have opened all the traffic between the 2 servers A and B) :

[2019-07-09 14:22:03,168] ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectStandalone:122)
org.apache.kafka.connect.errors.ConnectException: Failed to connect to and describe Kafka cluster. Check worker's broker connection and security properties.
at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64)
at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:45)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:81)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262)
at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:58)
... 2 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call.

This works if I start the adapter on the server running the Kafka Server. Not sure why the remote connection is giving error.Could you please help here.

Regards

Running Kafka Connector in docker

Hello,

Actually I'm running both Kafka and IBM MQ as Docker containers. I followed all the steps but i'm blocked at this one CLASSPATH=/target/kafka-connect-mq-source-0.5-SNAPSHOT-jar-with-dependencies.jar bin/connect-standalone.sh config/connect-standalone.properties ~/mq-source.properties. Since I don't know how to specify the classpath for my container. I have tried adding the path using plugin.path at connect-standalone.properties file and running the following command :

docker run -v /home/pfe/tmp:/kafka/tmp --rm --network network-test ches/kafka connect-standalone.sh config/connect-standalone.properties /kafka/tmp/mq-source.properties

And this gives the following ERROR :

[2018-05-22 10:02:49,463] ERROR Failed to create job for /kafka/tmp/mq-source.properties (org.apache.kafka.connect.cli.ConnectStandalone:88)
[2018-05-22 10:02:49,464] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:99)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements Connector and which name
matches com.ibm.mq.kafkaconnect.MQSourceConnector, available connectors are: org.apache.kafka.connect.source.SourceConnector,
org.apache.kafka.connect.tools.MockSinkConnector, org.apache.kafka.connect.tools.VerifiableSinkConnector, org.apache.kafka.connect.file.FileStreamSinkConnector,
org.apache.kafka.connect.file.FileStreamSourceConnector, org.apache.kafka.connect.sink.SinkConnector, org.apache.kafka.connect.tools.MockSourceConnector,
org.apache.kafka.connect.tools.SchemaSourceConnector, org.apache.kafka.connect.tools.VerifiableSourceConnector, org.apache.kafka.connect.tools.MockConnector
at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:80)
at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:67)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:96)
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements Connector and which name matches com.ibm.mq.kafkaconnect.MQSourceConnector,
available connectors are: org.apache.kafka.connect.source.SourceConnector, org.apache.kafka.connect.tools.MockSinkConnector,
org.apache.kafka.connect.tools.VerifiableSinkConnector, org.apache.kafka.connect.file.FileStreamSinkConnector,
org.apache.kafka.connect.file.FileStreamSourceConnector, org.apache.kafka.connect.sink.SinkConnector, org.apache.kafka.connect.tools.MockSourceConnector,
org.apache.kafka.connect.tools.SchemaSourceConnector, org.apache.kafka.connect.tools.VerifiableSourceConnector, org.apache.kafka.connect.tools.MockConnector
at org.apache.kafka.connect.runtime.ConnectorFactory.getConnectorClass(ConnectorFactory.java:85)
at org.apache.kafka.connect.runtime.ConnectorFactory.newConnector(ConnectorFactory.java:38)
at org.apache.kafka.connect.runtime.AbstractHerder.getConnector(AbstractHerder.java:336)
at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:235)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:158)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:93)

Actually I don't know how to resolve this problem since its my first experience working with both technologies Kafka and IBM MQ And I would really appreciate your help.
Thank you !

Connecting to Kafka Topic remotly

Hello,

Is it possible to run this adapter on a server other than Kafka installation and connect to the Kafka Instance remotely over TCP. Say, instead of running it on Kafka installation server and connecting to MQ remotely over client, we install it on the server hosting the MQ installation and connect to Kafka Topic remotely over TCP.

I did a quick POC by having Kafka Installation on Server A(since connector would need the Kafka Classes at runtime), setting up the connector on Server A to connect to a Kafka Topic on Server B. It worked well. Just wanted to verify in case I missed anything here or if I should take care of any further configurations.

Also, how will this connector connect to Kafka Installation over TLS. This application when connects to Kafka Installation would be acting as SSL client and Kafka Installation would be SSL Server.

Message Ordering

Hi we are using the MQ Source Connector and have a question related to getting messages in physical order from a Queue.

We have a CDC process running on a source system that publishes changes (messages) into MQ and have strict requirements to consume the messages in the physical order they are published.

In terms of connectors and tasks - can you tell me what the scaling limitations are so that I maintain the requirement of getting messages in physical order?

I am trying to avoid the situation where I would have multiple threads consuming messages from MQ if we scale number of tasks etc. and even though messages are being consumed in order on the queue one of the threads might publish faster and potentially publish a message out of order into the kafka topic. I'm not 100% sure if this is a scenario that would happen so just trying to validate. We haven't seen this occur yet in our limited testing.

On the publisher side, we are publishing messages to Kafka using message keys and using a Connect SMT (valueToKey) transformation to from a unique key field in the message header guarantee order in Kafka for the consumer.

When I read your write up in Medium - I realized that we may see an occasional duplicate message which could impact order if two changes very near each other happened on the same key and a republish occurred to due a cluster or network issue. Any insight would be very helpful. Thanks!

MQ Message persistance behavior

Hello,

I had posted a query for Sink adapter regrading the message persistence and I discovered that the sink adapter may send duplicate messages to avoid loosing the messages. Is the same behavior valid for Source Adapter as well ?

Read the message from MQ Queue---->Deliver the message to Kafka Topic------>Once the message has been delivered to Kafka Topic, issue a COMMIT for MQ GET.

In this case, if the adapter get killed just before issuing the COMMIT(and after the message is delivered to Kafka Topic), the message would be rolled back to the Queue and Adapter would re-process the same message upon restarting.

Please let me know if my understanding is correct.

Running multiple adapter processes for Multiple Queues

Hello,

I have a use case where in I need to collect messages from 2 Queues present on 2 Queue Managers and post the messages to Kafka Topic. I tried executing two adapter processes by passing 2 different mq-source.properties file(One property file for one set of Queue-Queue Manager). Running the adapter process (standalone mode) on the same server failed in the 2-nd attempt with "Address is already in use" error :

org.apache.kafka.connect.errors.ConnectException: Unable to start REST server
at org.apache.kafka.connect.runtime.rest.RestServer.start(RestServer.java:214)
at org.apache.kafka.connect.runtime.Connect.start(Connect.java:53)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:95)
Caused by: java.net.BindException: Address already in use
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Net.java:433)
at sun.nio.ch.Net.bind(Net.java:425)
at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
at org.eclipse.jetty.server.ServerConnector.openAcceptChannel(ServerConnector.java:331)
at org.eclipse.jetty.server.ServerConnector.open(ServerConnector.java:299)
at org.eclipse.jetty.server.AbstractNetworkConnector.doStart(AbstractNetworkConnector.java:80)
at org.eclipse.jetty.server.ServerConnector.doStart(ServerConnector.java:235)
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
at org.eclipse.jetty.server.Server.doStart(Server.java:398)
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
at org.apache.kafka.connect.runtime.rest.RestServer.start(RestServer.java:212)

What sort of workaround should I put in place to achieve my requirement here ?

Thanks & Regards

JMS 2.0 API Issue

Hi,

I am using this repository to build a JAR to connect to an BMC WMQ. The Queue runs on the 8.0.0.13 version. But still I get the following error while submitting a connector

“ JMSCC5007: Use of ‘createContext()’ is not supported with this instance of this connection. Only connections with a correct type of connection can support using the JMS 2.0 API”

Hashed/encoded passwords & Connection Handling

Hello,

I am running the connector on a distributed worker. The worker connects to the kafka infrastructure over TLS and SCRAM Authentication. Since , these security mechanisms are in place I have updated the below password specific details in the connect-distributed.properties file.

1-password for connecting to Kafka Broker in sasl.jaas.config property
2-password for the keystore and truststore being used

Similarly for accessing the connector over RSTAPI interface I have enabled userid/password based authentication which also has the password updated in the text file
Is there a way these passwords can be encoded and the jar accepts the encoded format ?

2-nd query is how the connection is handled with IBM MQ. Does the connection remain active 24x7 ? Or it remains idle if there is period of inactivity or consumes messages as soon as they arrive ?

Thanks & Regards

MQ Messages result in Duplicate Kafka Messages

Kaiser has been running long-term volume tests with the IBM MQ Source Connector. Occasionally we'll see one MQ Message produce 2 Kafka Messages. When this happens we see this in the logs.

Rolling back in-flight transaction

That would indicate that the session was rolled back, and any uncommitted messages were placed back on the queue, where they could be re-read. The code looks like this:

public void commit() {
    log.trace("[{}] Entry {}.commit", Thread.currentThread().getId(), this.getClass().getName());

    if (!connectInternal()) {
        return;
    }

    try {
        if (inflight) {
            inflight = false;

            if (inperil) {
                inperil = false;
                log.debug("Rolling back in-flight transaction");
                jmsCtxt.rollback();
            }
            else {
                jmsCtxt.commit();
            }
        }
    }
    catch (JMSRuntimeException jmse) {
        log.error("JMS exception {}", jmse);
        handleException(jmse);
    }

    log.trace("[{}]  Exit {}.commit", Thread.currentThread().getId(), this.getClass().getName());
}

The inPeril flag can be set here.

        if (m != null) {
            inflight = true;

            // We've received a message in a transacted session so we must only permit the transaction
            // to commit once we've passed it on to Kafka. Temporarily mark the transaction as "in-peril"
            // so that any exception thrown will result in the transaction rolling back instead of committing.
            inperil = true;
            
            sr = builder.toSourceRecord(jmsCtxt, topic, messageBodyJms, m);
            inperil = false;
        }

I checked the threads doing the work, and in our configuration it looks like multiple-threads can collide on the value of inperil? Our duplicates are caused by messages making it to Kafka, but then that batch being rolled back in MQ and re-read.

A fragment of our configuration looks like this. tasks.max > 1 would introduce additional threads?

{
"name": "MQConnector_DAKPXB30_QL.KP.AUDIT.LFS.KAFKA.EVN",
"config": {
"connector.class": "com.ibm.eventstreams.connect.mqsource.MQSourceConnector",
"tasks.max": "3",
"topic": "TP.MQ.TO.SPK.LFS.AUDIT",
"mq.connection.name.list": "szaqiexx.ssdc.kp.org(51446)",
"mq.queue.manager": "DAKPXB30",
"mq.channel.name": "EISDS.SVRCONN",
"mq.queue": "QL.KP.AUDIT.LFS.KAFKA.EVN",
"mq.record.builder": "com.ibm.mq.kafkaconnect.builders.DefaultRecordBuilder",
"mq.message.body.jms": "true"
}
}

"trace":"java.lang.NoSuchMethodError

Hi,

I just recently built the connector, but when I connect to our MQ source, I am getting the following error:

"trace":"java.lang.NoSuchMethodError: javax.jms.Connection.createSession(I)Ljavax/jms/Session;\n\tat com.ibm.msg.client.jms.admin.JmsConnectionFactoryImpl.createContext(JmsConnectionFactoryImpl.java:531)\n\tat com.ibm.eventstreams.connect.mqsource.JMSReader.connect(JMSReader.java:191)\n\tat com.ibm.eventstreams.connect.mqsource.MQSourceTask.start(MQSourceTask.java:84)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:198)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\n"

I am unsure of which package I should be uploading - since a lot of the posts I've found online is saying to update to jms 1.2 , but in the current pom.xml - it is already at 2.0

Thanks

The specified value is not allowed for 'XMSC_WMQ_CHANNEL'

Hello, I am new to IBM MQ. I am trying to read messages from a queue to Kafka with kafka-connect-mq-source. I met an error:

The specified value 'TESTQUEUE.STANDARD.RATES.RTO.TOPIC' is not allowed for 'XMSC_WMQ_CHANNEL'/
The given value is not allowed for the property specified.
Change the value to a value that is supported for the property.

I am running connector in standalone mode. Connector properties:

name=mq-IBM_TEST_MQ_1
connector.class=com.ibm.eventstreams.connect.mqsource.MQSourceConnector
tasks.max=1
mq.record.builder=com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder
key.converted=org.apache.kafka.connect.storage.StringConverter
value.converted=org.apache.kafka.connect.storage.StringConverter

topic=TEST_IBM_MQ_1
mq.queue.manager=MY.TEST.MANAGER
mq.connection.name.list=myserver.mydomain(1418)
mq.channel.name=TESTQUEUE.STANDARD.RATES.RTO.TOPIC
mq.queue=K3.TESTQUEUE.RTO.STANDARD.RATES.Q

Do you have any solutions?

Connection problem to broker on OpenShift

Hi,
I'm deploying the connector on openshift platform. Once the pod is up and running, I'm seeing that the connector is not able to connect to kafka broker -

WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient:725)

The reason being is, the bootstrap config in distributed.properties is localhost:9092. To override this (to use my broker url i.e., 1.1.2.3:9092), I provided -

env: - name: KAFKA_CONNECT_BOOTSTRAP_SERVERS
in my *.yaml file. But still I'm getting connection issue. Could you please let me know where am I going wrong/if the ENV variable is proper?

Thanks

Unrecognized character escape 'P' (code 80) error thrown by IBM MQ source connector

We are trying to use IBM MQ source connector in our environment and mainframe will send the messages to MQ and MQ source connector will pick up from that, for some of the messages we are facing the following issue. Attached the error log for your reference.

Below is the message format we receive.

"DB;0100;000;000;4134522063660201 ;000000;458;00000000010.00;458;00000000010.00;08122020;095458;458;5311;54 ;+004; ;022944840343;000000001988363;010;N; ; ;M; ;027007008188 ;75000361 ;MAS-MCVC TEST \K KL MY ;MY ;002025713;ECM; ;DCS001 ;Y;A;VISA DEBIT ;TEST ;20201208095458; ; ; ; ;Y;000001988363; ;SSL ;4134522063660201;164128127886;test ;12548K ;123 ; ;01; "

MQ_SOURCE_Conector_errorlog.txt

If it seems (some character) it is throwing error and the connector is failing.

Is there any way that connector can have some tolerance level, so that if this kind of issue happened, can it proceed with the next record instead of bringing down the entire connector.

issue:no mqjbnd in java.library.path

when I was using binding property ("mq.connection.mode": "bindings") in the connector and getting the Error Caused by: java.lang.UnsatisfiedLinkError: no mqjbnd in java.library.path. I am using MQ client 9x in the class path.

Suggest the approach for new RecordBuilder ?

Hi Andrew,
We tried this connector repo code and its running successfully.

thanks for your valuable connector code.
given record builders(default, json) are not suitable for our requirement so can you please suggest best approach/ change to be made in the code base to fulfill below requirement. thanks

IBM MQ example message(input): John31New York20180123
we know the start and end index of each field from the message

Expected json record on kafka topic(output) example:
{ "name": [{name: "John",type: string} ],
"age":[{name:31, type:int32}],
"city":[{name: "New York", type:string}],
"BirthDate": [{name:"20180123", type:date}]
}

I believe we can fulfill the requirement by creating sub class of builder

suggest or provide the approach for above json converter.

userid is getting changed at the MQ side

Hi,
I'm seeing an issue where the supplied 'used-id' is getting modified to the 'system user' and authentication failed with the following error :

The MQ Admin reported that I'm supplying different user id.
I've configured to send "MQUser" as userid(I'm executing this connector on linux server). But when it reaches MQ, the userid is modified to wljay (which is the user I logged in)

ERROR MQ error: CompCode 2, Reason 2035 MQRC_NOT_AUTHORIZED (com.ibm.eventstreams.connect.mqsource.JMSReader:413)
INFO Polling for records (com.ibm.eventstreams.connect.mqsource.MQSourceTask:120)
ERROR JMS exception {} (com.ibm.eventstreams.connect.mqsource.JMSReader:362)

com.ibm.msg.client.jms.DetailedJMSSecurityRuntimeException: JMSWMQ2008: Failed to open MQ queue 'LOC.E.KAFKA.QUEUE'.
JMS attempted to perform an MQOPEN, but IBM MQ reported an error.
Use the linked exception to determine the cause of this error. Check that the specified queue and queue manager are defined correctly.
        at com.ibm.msg.client.jms.DetailedJMSSecurityException.getUnchecked(DetailedJMSSecurityException.java:270)
        at com.ibm.msg.client.jms.internal.JmsErrorUtils.convertJMSException(JmsErrorUtils.java:173)
        at com.ibm.msg.client.jms.internal.JmsContextImpl.createConsumer(JmsContextImpl.java:416)
        at com.ibm.eventstreams.connect.mqsource.JMSReader.connectInternal(JMSReader.java:342)
        at com.ibm.eventstreams.connect.mqsource.JMSReader.receive(JMSReader.java:225)
        at com.ibm.eventstreams.connect.mqsource.MQSourceTask.poll(MQSourceTask.java:124)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:244)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:220)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: com.ibm.mq.MQException: JMSCMQ0001: IBM MQ call failed with compcode '2' ('MQCC_FAILED') reason '2035' ('MQRC_NOT_AUTHORIZED').
        at com.ibm.msg.client.wmq.common.internal.Reason.createException(Reason.java:203)
        at com.ibm.msg.client.wmq.internal.WMQMessageConsumer.checkJmqiCallSuccess(WMQMessageConsumer.java:222)
        at com.ibm.msg.client.wmq.internal.WMQMessageConsumer.checkJmqiCallSuccess(WMQMessageConsumer.java:156)
        at com.ibm.msg.client.wmq.internal.WMQConsumerShadow.initialize(WMQConsumerShadow.java:1176)
        at com.ibm.msg.client.wmq.internal.WMQSyncConsumerShadow.initialize(WMQSyncConsumerShadow.java:133)
        at com.ibm.msg.client.wmq.internal.WMQMessageConsumer.<init>(WMQMessageConsumer.java:486)
        at com.ibm.msg.client.wmq.internal.WMQSession.createConsumer(WMQSession.java:918)
        at com.ibm.msg.client.jms.internal.JmsSessionImpl.createConsumer(JmsSessionImpl.java:1032)
        at com.ibm.msg.client.jms.internal.JmsSessionImpl.createConsumer(JmsSessionImpl.java:1116)
        at com.ibm.msg.client.jms.internal.JmsContextImpl$SessionWrapper.createConsumer(JmsContextImpl.java:1991)
        at com.ibm.msg.client.jms.internal.JmsContextImpl.createConsumer(JmsContextImpl.java:403)

Is it a known issue?

By the way I'm trying to connect to IBM WebSphere MQ for z/OS V7.1.0

Tagging your repositories

Would be amazing if you could put in right tags of kafka mq etc in your repositories. We developed the same connector (rather would have preferred to use this :)

Read Message Header from MQ

Can I read the message header information from IBM MQ and put it to Kafka along with the message body? What will be the format in Kafka when I read the data from topic using the consumer.

JMS connection with IBM-MQ is not getting closed when a stop() signal is received due to an internal exception

Hello, I have encountered an issue wherein the connection with IBM-MQ remains Inactive (not closed) when an internal exception occurs in Kafka Connect. Due to this issue, the batch of messages read in the previous poll() remains uncommitted in the IBM-MQ.

Steps to reproduce:

  1. Configure producer.max.request.size=1048576 (1 MB) in worker.properties
  2. Now send messages in IBM-MQ with size >= 1MB
  3. Check the logs (you will find an exception stating that the configured max.request.size is < than the message received. Also, a statement "Task is being killed and will not recover until manually restarted")
  4. Check the IBM-MQ (you will find the connection is still present; however, its state is Inactive)

Analysis:

Based on the further analysis, I found that when the stop(), in MQSourceTask, is executed in this scenario, the JMSReader never gets closed.

As per the existing stop() implementations, if receivingMessages == TRUE then reader.close(). Which, in this case, will never happen. As receivingMessages would be FALSE.

However after negating the condition, the issue seems to be resolved, i.e., if receivingMessages == FALSE then reader.close()

Existing logic:

synchronized(this) {
   if (receivingMessages.get()) {
        log.debug("Will close connection");
        willClose = true;
   }
 }
if (willClose) {
   // Close the connection to MQ to clean up
   if (reader != null) {
       reader.close();
   }
}

New Logic:

synchronized(this) {
   if (!receivingMessages.get()) {    // ** added NOT here **
        log.debug("Will close connection");
        willClose = true;
   }
 }
if (willClose) {
   // Close the connection to MQ to clean up
   if (reader != null) {
       reader.close();
   }
}

Please share your opinion on this and let me know if it makes sense to make the changes. Thank you !

java.lang.ClassCastException: class com.ibm.eventstreams.connect.mqsource.MQSourceConnector Error in distributed mode

Hello,

I am trying to run the MQ-Source adapter in distributed mode. The connector is created from the current master branch. The version is kafka-connect-mq-source-1.1.0-jar-with-dependencies.jar.

I have used the default connect-distributed.sh and connect-distributed.properties file. Below is the command used to run the adapter

CLASSPATH=/opt/kafka-connect-mq-source-master/target/kafka-connect-mq-source-1.1.0-jar-with-dependencies.jar bin/connect-distributed.sh config/connect-distributed.properties
The adapter process is started and I can see the process listening on port 8083.

However the REST command when executed is getting errored out :

curl -X POST -H "Content-Type: application/json" --data '{"name": "mq-source1", "config": {"connector.class":"com.ibm.eventstreams.connect.mqsource.MQSourceConnector", "tasks.max":"1", "mq.queue":"TEST", "mq.queue.manager":"TEST", "mq.channel.name":"SYSTEM_DEF_SVRCONN", "mq.record.builder":"com.ibm.eventstreams.connect.mqsource.MQSourceConnector", "topic":"TSOURCE" }}' http://localhost:8083/connectors

Error :

[2019-07-25 16:59:56,078] ERROR WorkerSourceTask{id=mq-source1-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
java.lang.ClassCastException: class com.ibm.eventstreams.connect.mqsource.MQSourceConnector
at java.lang.Class.asSubclass(Class.java:3404)
at com.ibm.eventstreams.connect.mqsource.JMSReader.configure(JMSReader.java:175)
at com.ibm.eventstreams.connect.mqsource.MQSourceTask.start(MQSourceTask.java:81)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:199)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2019-07-25 16:59:56,079] ERROR WorkerSourceTask{id=mq-source1-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)

Connector fails with more than 1 partition where queue sets MaxUncommittedMsgs: 10,000

The connector error'd out when we increased our partition for our topic to 2 :

JMSCMQ0001: IBM MQ call failed with compcode '2' ('MQCC_FAILED') reason '2024' ('MQRC_SYNCPOINT_LIMIT_REACHED')

The threads are not committing the read and have reach the Sync Point Limit. Is it possible to return the MQCMIT call to be less than the default queue value? (10000 calls)

Facing authorization issue while connecting to mq manager. But there is no ssl enabled on the IBM Mq side.

Facing authorization issue while connecting to mq manager. But there is no ssl enabled on the IBM Mq side.
java: 1.8
IBM MQ version: 7.5
O.S verison:Red Hat Enterprise Linux Server release 5.11 (

[2018-01-19 07:22:12,477] ERROR MQ error: CompCode 2, Reason 2035 MQRC_NOT_AUTHORIZED (com.ibm.mq.kafkaconnect.JMSReader:309)
[2018-01-19 07:22:12,477] ERROR Exception thrown while calling task.commit() (org.apache.kafka.connect.runtime.WorkerSourceTask:387)
org.apache.kafka.connect.errors.RetriableException: com.ibm.msg.client.jms.DetailedJMSSecurityRuntimeException: JMSWMQ2013: The security authentication was not valid that was supplied for QueueManager '*******' with connection mode 'Client' and host name '$$$$$$$($$$$)'.
Please check if the supplied username and password are correct on the QueueManager to which you are connecting.
at com.ibm.mq.kafkaconnect.JMSReader.handleException(JMSReader.java:346)
at com.ibm.mq.kafkaconnect.JMSReader.connectInternal(JMSReader.java:269)

I am trying to test the connector on Ubuntu Standalone. (host issue)

Hi Andrew,

Thanks for the connector code. I hope you are having a great day. My query is kind of trivial but I am somehow not able to make this connector work.
I followed the steps mentioned (from scratch).
My Kafka is working fine. I was able to configure IBM MQ 8.0 as per the steps mentioned (working fine). I followed all the steps but somehow connector is not connecting to the queue.
The errors that I get are as following:

JMSCMQ0001: IBM MQ call failed with compcode '2' ('MQCC_FAILED') reason '2538' ('MQRC_HOST_NOT_AVAILABLE').

AMQ9204: Connection to host 'localhost:1414(1414)' rejected. [1=com.ibm.mq.jmqi.JmqiException[CC=2;RC=2538;AMQ9205: The host name supplied is not valid. [3=localhost:1414,4=TCP]],3=localhost:1414(1414),5=RemoteTCPConnection.resolveHostname]

AMQ9205: The host name supplied is not valid

JMSWMQ0018: Failed to connect to queue manager 'MYQM' with connection mode 'Client' and host name 'localhost:1414(1414)'.
Check the queue manager is started and if running in client mode, check there is a listener running.

I tried to debug my configuration details i.e. mentioned here

I think the issue is with this statement: "These instructions set up a queue manager that uses the local operating system to authenticate the user ID and password, and the user ID is called alice and the password is passw0rd."

I created a new user 'alice' with password as mentioned and added alice to mqm group.

If possible please let me know if I may be making a mistake or let me know of any resource that can be helpful.

Warm Regards.

Backward compatibility

Hi there, we currently are using MQ version 7.1.0.7 but I can see from your readme that you only support down to 8.0. What makes this connector incompatible with older versions of MQ? Would it be possible to extend this repo for use with older versions? Thank you.

Error while connecting Mq Version 7.5.0.8

First I would like to thank for sharing this mq connector repo in github.

We are using MQ version 7.5.0.8 and facing below errors while running it can you please share/ suggest the changes required in the code base to make it work. Thank you

{"name":"kafka-mq-connector", "connector":{"state":"RUNNING", "worker_id":"191.111.222.75:8083"}, "tasks":[{"state":"FAILED","trace":"org.apache.kafka.connect.errors.ConnectException:com.ibm.msg.client.jms.DetailedJMSException: JMSCC0091: The provider factory for connection type 'com.ibm.msg.client.wmq' could not be loaded.\n\tat com.mqkafka.kafkaconnector.MQReader.configure(MQReader.java:122).\n\tat com.mqkafka.kafkaconnector.MQSourceTask.start(MQSourceTask.java:36).\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execut(WorkerSourceTask.java:157).\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170).\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214).\n\tat

kafka connector log4j appender file?

Hi Andrew,
I hope you are doing good. We required kafka-connect-mq-source logs to identify connection issue/ failures, so that we are planning to persist logs in a file hence I modified the log4j.properties as below but its not writing anything to my_log target file. What is the issue here? What repository changes required to make it run? Is the properties file in right location or folder? Please help with log4j.properties file and steps to make it run in both windows/ unix. thanks

I dont see PropertyConfigurator.configure("log4j.properties") in java code base..Does it require this configuration?

What are changes required in the repository to make it work

`# Root logger option
log4j.rootLogger=INFO, file

Direct log messages to a log file
log4j.appender.file=org.apache.log4j.RollingFileAppender

#Redirect to Tomcat logs folder
#log4j.appender.file.File=${catalina.home}/logs/logging.log

log4j.appender.file.File=my_log.log
log4j.appender.file.MaxFileSize=10MB
log4j.appender.file.MaxBackupIndex=10
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n`

One more thing this connector is running fine even with IBM-MQ 7.5.x, Why not you update the help file.

I am not able to figureout route cause of the issue, also I dont see the META-INF/MANIFEST.MF in this Jar, the file itself which has (at least) this one liner:
Main-Class: com.mypackage.MyClass

Setting up Mutual TLS with MQ failed

Hello,

Working with distributed worker where the distributed worker properties file has the keystore and truststore with certs needed for kafka broker connectivity. While setting up mutual TLS for MQ, I am getting

AT QUEUE MANAGER AMQ9637E: Channel is lacking a certificate.
AT DISTRIBUTED WORKER AMQ9503: Channel negotiation failed

I started the distributed worker process without any Djavax.net.ssl.keyStore or Djavax.net.ssl.trustStore attributes being specified in EXTRA_ARGS env variable and only added the mq.ssl.keystore.location/mq.ssl.trustorestore.location (along with password attributes) to the connector [Start the distributed worker process---->issue the curl input to create a connector and in this input pass keystore and truststore with certs specific to MQ]

This attempt fails with above error.

However if I do not specify any SSL related information in the connector via the variables mq.ssl.keystore.location/mq.ssl.trustorestore.location and only specify the keystore/truststore details for worker process via below process, the mutual SSL works

export EXTRA_ARGS="-Dcom.ibm.mq.cfg.useIBMCipherMappings=false -Djavax.net.ssl.keyStore='key.jks' -Djavax.net.ssl.keyStorePassword='' -Djavax.net.debug=all -Djavax.net.ssl.trustStore=truststore.jks -Djavax.net.ssl.trustStorePassword="

Then start the distributed worker--->create a new connector and do not specify the keystore/truststore details via mq.ssl.keystore.location/mq.ssl.trustorestore.location attributes------>The connector works and MQ channel is started in Mutual SSL mode

Do we need to still export the keystore and truststore with EXTRA_ARGS if we are already passing the truststore and keystore on the connector. The documentation says -

"You will need to put the public part of the client's certificate in the queue manager's key repository. You will also need to configure the worker's JVM with the location and password for the keystore containing the client's certificate. Alternatively, you can configure a separate keystore and truststore for the connector."

I guess if I am passing the keystore and truststore in the connector ideally there should not be any requirement to pass the same via EXTRA_ARGS

Connector for Native MQ

Fantastic first steps

Please forgive my ignorance, but this connector seems to be for JMS/MQ

Will this work for a native MQ connections (I want to consume binary and/or XML messages) from clients
putting to a MQ Q (not JMS)

thanks

Use of Glassfish application server

Hello,

This is more of a query rather than an issue. I am working with Kafka connect worker in distributed mode. While going through the logs post I started the worker, I see some logs for Glassfish. My idea of this was that its a standalone java process which needs a Java runtime to execute rather than a fully fledged application server. The worker uses Java runtime to start and creates a REST service that by default listens to requests on port 8083. Not very sure as to how is Glass fish coming into picture. May be my understanding is completely wrong but it would help to have some insights on this

Thanks & Regards

MQ Source and Sink connector logs not getting displayed

MQ Source and Sink connector debug or any other form of logs not getting displayed in File or Console though both source and Sink getting successfully started and able to connect to IBM MQ and kafka respectively .Tried running the connect-standalone.bat from windows machine .

Is there any setting that has to be explicitly done, to enable writing the logs to console or File .Pls Advise

ByteArrayConverter and Avro Output

We are currently using kafka-connect-mq-source to consume from MQ and write out EBCDIC bytes to a topic. We are investigating the possibility of writing out the bytes in an Avro message (with the schema pre-registered) however I am having trouble getting it to work.

Relevant properties for our Bytes messages (which works):
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
"mq.record.builder": "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"
"mq.message.body.jms": "true"

When I try to use value.converter as the AvroConverter (with the schema.registry.url) but we get an error:
Executing stage 'VALUE_CONVERTER' with class 'io.confluent.connect.avro.AvroConverter', where source record is ......value=[B@2w1de971.....
org.apache.kafka.connect.errors.DataException: Unknown Java type for schemaless data: class [B

Any suggestions?

Huge connector output when queue manager connection broken

I was using the connector to move messages from an MQ queue to a Kafka topic, when I ended my queue manager. I wanted to test what would happen and check that the connector would successfully reconnect when I restarted my queue manager. I was redirecting the output from the connector to a file on disk. When the queue manager was stopped, the connector continuously output a large number of exceptions, so that the connector output file on disk grew by about half a gig every minute. Luckily I noticed the problem before it filled my disk, but had I not noticed it would not take long before it filled my disk. Does the connector need to retry connecting to the queue manager in a tight loop? Or would it be sufficient to retry every 10 seconds or so? Thanks

Getting [Topic authorization failed.] Error

Hello,

We have configured the Kafka Connect source connector over SCRAM authentication to connect to a Kafka Infrastructure in DISTRIBUTED MODE. Below are the properties defined in the connect-distributed.properties file

bootstrap.servers=XXXXX:9092,YYYYY:9092,ZZZZZZ:9092
offset.storage.topic=abc-connect-offsets
offset.storage.replication.factor=1
config.storage.topic=abc-connect-configs
config.storage.replication.factor=1
status.storage.topic=abc-connect-status
status.storage.replication.factor=1
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required
username=""
password="";
ssl.keystore.location=..../../...
ssl.truststore.password=.../../../
ssl.truststore.location=.../.../..
ssl.truststore.password=.../.../

And similar set of producer.<> properties
producer.bootstrap.servers
producer.security.protocol
producer.ssl.truststore.location
producer.ssl.truststore.password
producer.ssl.keystore.password
producer.ssl.truststore.type
producer.ssl.keystore.type
producer.sasl.mechanism
producer.sasl.jaas.config

I ran a quick test with kafka-console-consumer.sh for all the 3 topics - abc-connect-offsets, abc-connect-configs and abc-connect-status and I was able to connect and consume the messages already present on these topics. That helped me to verify the TLS and SCRAM authentication.
However, when we start the connector we are getting below error -

[2020-01-19 18:52:30,817] ERROR Uncaught exception in herder work thread, exiting: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:228)
org.apache.kafka.connect.errors.ConnectException: Error while attempting to create/find topic(s) 'abc-connect-offsets'
at org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:255)
at org.apache.kafka.connect.storage.KafkaOffsetBackingStore$1.run(KafkaOffsetBackingStore.java:99)
at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:126)
at org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:109)
at org.apache.kafka.connect.runtime.Worker.start(Worker.java:174)
at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:109)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:215)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [Topic authorization failed.]
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262)
at org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:228)
... 11 more
Caused by: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [Topic authorization failed.]
[2020-01-19 18:52:31,080] INFO Started o.e.j.s.ServletContextHandler@eb507b9{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:851)

Could you please let me know why is the connector failing when kafka-console-consumer.sh seems to be working fine ?

Add additional configuration for SSL keystores

When connecting to a secure MQ Channel, a personal SSL certificate (private key) may be required to establish TLS.
This should be supported in some way.

I suggest providing two JKS files (as paths on the local filesystem), each with their respective password as well, to serve as "TrustStore" and "KeyStore". Those will be optional in the configuration, so the change will be backward compatible.

I am providing a pull request with my implementation.

Event ordering in case of multiple workers added for same MQ queue

Hello Andrew- I am looking for details on the behavior of the MQ connector in distributed mode when there are multiple workers acting on the same queue and publishing to same topic. In such a case, would workers retain the order of events being recieved while publishing to the topic?

MQ Connection Handling Post Queue Manager Restarts

Hello,

We are currently using another MQ source connector provided by another vendor, however, we are looking at this as a potential alternative. One of the issues we face with our existing implementation is reconnection attempts never end up succeeding for the connector task(s) following queue manager restarts. This requires intervention of restarting all connector tasks.

We will be doing some testing, but is there detection and connection resolution provided by this connector for queue manager restarts?

Thanks in advance!

Kafka Message Formatting - JMS Attributes/MQMD Fields as Part of Value Payload

Hello,

We are currently using an MQ Source Connector provided from another vendor and use the Value to Key single message transform to key Kafka messages based on the .replyTo.name field as we have an ordering guarantee that needs to be satisfied. We are looking into this connector as an open source alternative. Here's the general format of the data we get using this other mentioned connector:

{
  "messageID": "ID:###############",
  "messageType": "bytes",
  "timestamp": EPOCH TIME,
  "deliveryMode": 1,
  "correlationID": null,
  "replyTo": {
    "destinationType": "queue",
    "name": "queue://#############/########?targetClient=1"
  },
  "destination": null,
  "redelivered": false,
  "type": null,
  "expiration": 0,
  "priority": 0,
  "properties": {
    Other_JMS_Attributes_Go_Here
    }
  },
  "bytes": "BASE 64 ENCODED JSON STRING",
  "map": null,
  "text": null
}

We've tried a multitude of config combinations, to get the JMS and MQMD fields as part of the value payload but have had no success. It seems like we can only get JMS attributes and MQMD fields as part of the headers using the mq.jms.properties.copy.to.kafka.headers configuration.

What are our options if we want to continue keying Kafka messages off of the information our MQ provider gives us in the replyTo field? Would this be possible with the default or json record builders?

"Failed to flush" "timed out while waiting for producer to flush" & "Failed to commit offsets" error with SSL setup

Hello,

I am trying to setup SSL between :

  • This connector and MQ (One way SSL.)
  • This connector and Kafka Server (To begin with One Way SSL)

I have made the necessary changes in the connect-standalone.properties(for worker) and server.properties(for Kafka Server). The connector is successfully able to make SSL connections with IBM MQ. The corresponding channel is getting started in SSL mode once we configure the SSL configurations. The adapter is also able to connect with Kafka (I don't see any errors while starting the connector)
Connector_error.log

To verify the Kafka Server SSL setup , I tested with kafka-console-consumer.sh and kafka-console-producer.sh in SSL mode and it worked fine. I am able to make SSL connections and push/consume messages using console-producer.sh & kafka-console-consumer.sh.

The issue however comes when I use the connector in SSL mode. When a message is placed to the input queue, its not getting delivered to the Kafka Server Topic. The message remains in Uncommitted state on the queue and I keep getting below entry in the connector log. No such error comes if I start the adapter in SSL disabled mode. The message is successfully delivered to the Topic in SSL disabled mode. Log entry :

[2019-07-16 12:15:22,315] INFO WorkerSourceTask{id=mq-source-0} flushing 1 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:414)
[2019-07-16 12:15:27,316] ERROR WorkerSourceTask{id=mq-source-0} Failed to flush, timed out while waiting for producer to flush outstanding 1 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:419)
[2019-07-16 12:15:27,316] ERROR WorkerSourceTask{id=mq-source-0} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:111)

Below are the SSL configuration details from the connect_standalone.properties and server.properties files

connect-standalone.properties
security.protocol=SSL
ssl.truststore.location=new_connector_ts.jks
ssl.keystore.location=new_connector_ks.jks
ssl.truststore.password=XXXXXXX
ssl.keystore.password=XXXXXXX
ssl.endpoint.identification.algorithm=

server.properties
listeners=PLAINTEXT://172.31.46.133:9092,SSL://172.31.46.133:9093
advertised.listeners=PLAINTEXT://172.31.46.133:9092,SSL://172.31.46.133:9093
listener.security.protocol.map=SSL:SSL,PLAINTEXT:PLAINTEXT
inter.broker.listener.name=PLAINTEXT
ssl.truststore.location=new_kafka_ts.jks
ssl.truststore.password=XXXXXX
ssl.keystore.location=new_kafka_ks.jks
ssl.keystore.password=XXXXX
ssl.key.password=XXXXX
ssl.client.auth=none
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS
ssl.truststore.type=JKS
ssl.endpoint.identification.algorithm=

Could you please assist. Attaching the connector log. Kindly let me know if any further info is required on this.

Thanks & Regards

Part of the message consumed from MQ is string

Hi,
I'm consuming from MQ using following settings :

{
"name": "MQ_Source_Connector",
"config":{
"connector.class": "com.ibm.eventstreams.connect.mqsource.MQSourceConnector",
"tasks.max": "1",
"topic": "mq_kafka_topic",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"mq.queue.manager": "MQ2",
"mq.connection.name.list": "1.1.1.1(1111)",
"mq.channel.name": "KH.TEST",
"mq.queue": "KH.LOCAL",
"mq.record.builder": "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder",
"mq.user.name": "ABCD"

}

}

What I've observed is, part of the message when I see in the kafka topic, is in the Stringformat, and the other part in bytes format.
E.g., 5088888�������������@�����@����������@�����

And when I changed the converter to String as follows -

    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "mq.message.body.jms": "true",
    "mq.record.builder": "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder",

I see in the topic as the vice-versa -
�������EC990TESTING3300

Any idea why is this behaviour ?

No support for Custom Partition key?

Wanted to check if there is no provision to have custom partition key? We want to read messages from MQ and have it partitioned based on key field in the MQ message before pushing them to Kafka

Multiple adapter processes polling the same Queue

Hello,

To add HA for this adapter , can we run the adapter process (polling the same queue on the same queue manager) on 2 servers. In case one of the Server is down, or the adapter process on one of the server gets killed, we would still be having 1 active adapter process.

Can two such adapter processes (connecting over TCP mode) consuming messages from the same queue cause any issues (message duplication/corruption etc) ?

Thanks

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.