Giter Site home page Giter Site logo

snowflake-kafka-connector's People

Contributors

binglihub avatar cyril-engels avatar dependabot[bot] avatar deweyjose avatar ivanyu avatar lizihan021 avatar mend-for-github-com[bot] avatar mingli-rui avatar seruman avatar sfc-gh-alhuang avatar sfc-gh-anwang avatar sfc-gh-bli avatar sfc-gh-gjachimko avatar sfc-gh-japatel avatar sfc-gh-jfan avatar sfc-gh-krosinski avatar sfc-gh-lshcharbaty avatar sfc-gh-mbobowski avatar sfc-gh-mnaides avatar sfc-gh-rcheng avatar sfc-gh-snowflakedb-snyk-sa avatar sfc-gh-snyk-sca-sa avatar sfc-gh-spandey avatar sfc-gh-tzhang avatar sfc-gh-wfateem avatar sfc-gh-wtrefon avatar sfc-gh-xhuang avatar sfc-gh-zefan avatar sfc-gh-zli avatar thomasdziedzic-pd avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

snowflake-kafka-connector's Issues

java.lang.NoSuchMethodError: 'org.codehaus.jackson.JsonNode org.apache.avro.Schema$Field.defaultValue()'

I am using the Snowflake sink connector with Avro serialization. When registering the connector, the user is successfully connecting to Snowflake however I am then hitting the following error at the bottom of this log:

2020-01-08 16:29:38,614 INFO   ||
[SF_KAFKA_CONNECTOR] pipe: SNOWFLAKE_KAFKA_CONNECTOR_snowflake_test_PIPE_customers_0 - service started   [com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1]
2020-01-08 16:29:38,616 INFO   ||
[SF_KAFKA_CONNECTOR] create new task in com.snowflake.kafka.connector.internal.SnowflakeSinkService - table: customers, topic: foo_debezium.debezium.customers, partition: 0   [com.snowflake.kafka.connector.internal.SnowflakeSinkServiceFactory$SnowflakeSinkServiceBuilder]
2020-01-08 16:29:38,618 INFO   ||
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.SnowflakeSinkService created   [com.snowflake.kafka.connector.internal.SnowflakeSinkServiceFactory$SnowflakeSinkServiceBuilder]
2020-01-08 16:29:38,881 INFO   ||
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask:close   [com.snowflake.kafka.connector.SnowflakeSinkTask]
2020-01-08 16:29:38,883 INFO   ||
[SF_KAFKA_CONNECTOR] pipe SNOWFLAKE_KAFKA_CONNECTOR_snowflake_test_PIPE_customers_0: cleaner terminated   [com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1]
2020-01-08 16:29:38,884 INFO   ||
[SF_KAFKA_CONNECTOR] pipe SNOWFLAKE_KAFKA_CONNECTOR_snowflake_test_PIPE_customers_0: flusher terminated   [com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1]
2020-01-08 16:29:38,885 INFO   ||
[SF_KAFKA_CONNECTOR] IngestService Closed   [com.snowflake.kafka.connector.internal.SnowflakeIngestionServiceV1]
2020-01-08 16:29:38,971 INFO   ||
[SF_KAFKA_CONNECTOR] pipe SNOWFLAKE_KAFKA_CONNECTOR_snowflake_test_PIPE_customers_0: service closed   [com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1]
2020-01-08 16:29:38,972 ERROR  ||  WorkerSinkTask{id=snowflake_test-0} Task threw an uncaught and unrecoverable exception   [org.apache.kafka.connect.runtime.WorkerTask]
java.lang.NoSuchMethodError: 'org.codehaus.jackson.JsonNode org.apache.avro.Schema$Field.defaultValue()'
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1652)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1511)
        at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1220)
        at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:100)
        at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:86)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:485)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:485)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:465)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
2020-01-08 16:29:38,973 ERROR  ||  WorkerSinkTask{id=snowflake_test-0} Task is being killed and will not recover until manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]

Here is my config file:

{
    "name": "snowflake_test",
    "config": {
        "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
        "topics": "foo_debezium.debezium.customers",
	"snowflake.topic2table.map": "foo_debezium.debezium.customers:customers",
	"snowflake.url.name": "https://foo.snowflakecomputing.com:443",
	"snowflake.user.name": "debezium",
	"snowflake.private.key": "....",
	"snowflake.private.key.passphrase": "....",
        "snowflake.database.name": "debezium_dev",
        "snowflake.schema.name": "debezium",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter": "com.snowflake.kafka.connector.records.SnowflakeAvroConverter",
	"key.converter.schema.registry.url": "http://schema-registry:8081",
        "value.converter.schema.registry.url": "http://schema-registry:8081"
    }
}

Some more details about how to reproduce:
I am running Debezium Postgres Source -> Snowflake Sink using docker. Here is my docker-compose:

version: '2'
services:
  zookeeper:
    image: debezium/zookeeper:${DEBEZIUM_VERSION}
    ports:
     - 2181:2181
     - 2888:2888
     - 3888:3888
  kafka:
    image: debezium/kafka:${DEBEZIUM_VERSION}
    ports:
     - 9092:9092
    links:
     - zookeeper
    environment:
     - ZOOKEEPER_CONNECT=zookeeper:2181
  schema-registry:
    image: confluentinc/cp-schema-registry
    ports:
     - 8181:8181
     - 8081:8081
    environment:
     - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
     - SCHEMA_REGISTRY_HOST_NAME=schema-registry
     - SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081
    links:
     - zookeeper
  connect:
    image: debezium/connect:${DEBEZIUM_VERSION}
    ports:
     - 8083:8083
    links:
     - kafka
     - schema-registry
    volumes:
     - $PWD/connect-plugins:/kafka/connect
    environment:
     - BOOTSTRAP_SERVERS=kafka:9092
     - GROUP_ID=1
     - CONFIG_STORAGE_TOPIC=my_connect_configs
     - OFFSET_STORAGE_TOPIC=my_connect_offsets
     - STATUS_STORAGE_TOPIC=my_connect_statuses

My $PWD/connect-plugin directory:

$ ls -1 connect-plugins/
debezium-connectors
snowflakeinc-snowflake-kafka-connector-1.0.0
$ ls -1 connect-plugins/debezium-connectors/
debezium-connector-postgres-1.0.0.Final.jar
debezium-core-1.0.0.Final.jar
postgresql-42.2.9.jar
protobuf-java-3.8.0.jar
$ ls -1 connect-plugins/snowflakeinc-snowflake-kafka-connector-1.0.0/
assets
doc
lib
manifest.json

Reproduce:

# Containers up
$ docker-compose -f my-docker-compose.yaml up

# Register Snowflake Connector
$ curl -X POST -H "Content-Type: application/json" --data @register-sf.json http://localhost:8083/connectors
# Everything works fine here, but there is still no Avro data in Kafka

# Now add the Debezium Postgres connector - this starts populating data in Kafka in Avro format
$ curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres-on-prem-avro.json

# There is schema registry data
$ curl -X GET http://localhost:8081/subjects/
["__debezium-heartbeat.foo_debezium-value","foo_debezium.debezium.customers-value","__debezium-heartbeat.foo_debezium-key","foo_debezium.debezium.dummy_data-key","foo_debezium.debezium.customers-key","foo_debezium.debezium.dummy_data-value"]

Any insight much appreciated!

Why does the connector name have to be a valid Snowflake identifier?

c.f. this issue #61

Why is the connector enforcing that its name (an internal Kafka Connect identifier) being a valid Snowflake identifier? If the connector is using the connector name for something on the Snowflake side then maybe it should handle the appropriate transformation to sanitise it as required itself.

Where is Broken Input going

image

As per the 0.4.0 release, the connector can handle invalid message types (not sure if it can handle nulls still), but I don't know where these invalid messages are being sent. What does 'table stage' mean as mentioned in the release?

Invalid messages aren't showing up in the internal stage when I send them and I don't see any additional tables/stages/topics being created in snowflake to handle the invalid messages.

The Snowflake Avro Value Converter silently skips messages it can't decode

As far as I can tell, if a Snowflake Connector configured to use the Snowflake Avro Converter encounters a message it can't decode, nothing is written to logs, and it skips the message with no external indication. This is extremely frustrating for debugging.

If we could even have just a message that said something like "the message at topic/partition/offset was discarded because it couldn't be decoded" (no need to include the message contents, since it might be huge, and the topic/partition/offset combo lets me investigate it if I want to) would be tremendously helpful.

java.lang.NoClassDefFoundError: org/bouncycastle/jce/provider/BouncyCastleProvider

Install connector plugin

confluent-hub install --no-prompt snowflakeinc/snowflake-kafka-connector:0.5.1 

Create connector

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/test/config \
    -d '{
        "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
        "tasks.max":1,
        "topics":"mysql-01-asgard.demo.transactions",
        "snowflake.topic2table.map": "mysql-01-asgard.demo.transactions:TRANSACTIONS",
        "snowflake.url.name":"asdf.us-east-1.snowflakecomputing.com",
        "snowflake.user.name":"kafka",
        "snowflake.user.role":"SYSADMIN",
        "snowflake.private.key":"xxx",
        "snowflake.database.name":"DEMO_DB",
        "snowflake.schema.name":"PUBLIC",
        "key.converter":"org.apache.kafka.connect.storage.StringConverter",
        "value.converter":"com.snowflake.kafka.connector.records.SnowflakeAvroConverter",
        "value.converter.schema.registry.url":"https://${CCLOUD_SCHEMA_REGISTRY_URL}",
        "value.converter.basic.auth.credentials.source":"USER_INFO", "value.converter.basic.auth.user.info":"${CCLOUD_SCHEMA_REGISTRY_API_KEY}:${CCLOUD_SCHEMA_REGISTRY_API_SECRET}",
        "offset.flush.interval.ms":"60000",
    "offset.flush.timeout.ms":"10000",
    "buffer.count.records":"100",
    "buffer.size.bytes":"65536"
     }'

Fails:

[2019-10-17 08:05:39,689] INFO [test|worker] Creating connector test of type com.snowflake.kafka.connector.SnowflakeSinkConnector (org.apache.kafka.connect.runtime.Worker:246)
[2019-10-17 08:05:39,696] INFO [test|worker] Instantiated connector test with version 0.5.1 of type class com.snowflake.kafka.connector.SnowflakeSinkConnector (org.apache.kafka.connect.runtime.Worker:249)
[2019-10-17 08:05:39,700] INFO [test|worker]
[SF_KAFKA_CONNECTOR] Snowflake Kafka Connector Version: 0.5.1 (com.snowflake.kafka.connector.Utils:80)
[2019-10-17 08:05:39,759] INFO [test|worker]
[SF_KAFKA_CONNECTOR] SnowflakeSinkConnector:start (com.snowflake.kafka.connector.SnowflakeSinkConnector:87)
[2019-10-17 08:05:39,759] INFO [test|worker]
[SF_KAFKA_CONNECTOR] buffer.flush.time set to default 30 seconds (com.snowflake.kafka.connector.SnowflakeSinkConnector:126)
[2019-10-17 08:05:39,764] ERROR [test|worker] WorkerConnector{id=test} Error while starting connector (org.apache.kafka.connect.runtime.WorkerConnector:119)
java.lang.NoClassDefFoundError: org/bouncycastle/jce/provider/BouncyCastleProvider
  at com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceFactory$SnowflakeConnectionServiceBuilder.setProperties(SnowflakeConnectionServiceFactory.java:40)
  at com.snowflake.kafka.connector.SnowflakeSinkConnector.start(SnowflakeSinkConnector.java:137)
  at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111)
  at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136)
  at org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:196)
  at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:252)
  at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1079)
  at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:117)
  at org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1095)
  at org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1091)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.bouncycastle.jce.provider.BouncyCastleProvider
  at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  ... 14 more

Connectors that reads topics with dot in their name does not write any data to snowflake database

Below is the configuration

name=SnowflakeConnector
connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
snowflake.topic2table.map=devdb.engines:engines
tasks.max=1
topics=devdb.engines
snowflake.url.name=***
snowflake.database.name=mydb
snowflake.schema.name=public
buffer.count.records=10000
snowflake.user.name=***
snowflake.private.key=***
snowflake.user.role=***
value.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter
key.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter
buffer.size.bytes=5242880

The connector above runs normally without an issue but no data ever appears in the snowflakedb

If I copy the data from the above topic to a new topic called devdb_engines and update the connector's configuration with the above topicname, then data are being written normally in the snowflakedb

String Format Decimals Converting To Unknown Codes

We are using SnowflakeAvroConverter with defined schema registry. The connector is decoding these values to unknown string formats when storing data in Snowflake. We are using 1.0.0 (in prod) but the latest on local seems to give the same error. We tried available sql functions within snowflake to decode these values but no use. The data is in an unusable state.

For e.g.
"0.2700" in Kafka stored in Snowflake DB as "\n๏ฟฝ"
"0.3500" in Kafka stored in Snowflake DB as "\rยฌ"

Schema registry definition:

{
  "name": "percent_field",
  "type": [
    "null",
    {
      "type": "bytes",
      "scale": 4,
      "precision": 64,
      "connect.version": 1,
      "connect.parameters": {
        "scale": "4"
      },
      "connect.name": "org.apache.kafka.connect.data.Decimal",
      "logicalType": "decimal"
    }
  ],
  "default": null
},

Message: unable to decode base64 string: invalid characters encountered in base64 data

Hitting following exception with snowflake-kafka-connector-1.2.3.jar

Private key generated following the steps mentioned here https://docs.snowflake.com/en/user-guide/kafka-connector-install.html#using-key-pair-authentication

On further analysis of the code, I can see we are appending Begin/End Encrypted Private Key which will not work with Base64.decode

https://github.com/snowflakedb/snowflake-kafka-connector/blob/master/src/main/java/com/snowflake/kafka/connector/internal/EncryptionUtils.java#L24

Just to do a small test I had written a small Java code to decode the key and it fails initially. Passes when I comment out append statement which adds Begin and End encryption key text

The code is at the bottom of the page.

"trace": "com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException: \n[SF_KAFKA_CONNECTOR] Exception: Invalid encrypted private key or passphrase\n[SF_KAFKA_CONNECTOR] Error Code: 0018\n[SF_KAFKA_CONNECTOR] Detail: failed to decrypt private key. Please verify input private key and passphrase. Snowflake Kafka Connector only supports encryption algorithms in FIPS 140-2\n[SF_KAFKA_CONNECTOR] Message: unable to decode base64 string: invalid characters encountered in base64 data\n[SF_KAFKA_CONNECTOR] org.bouncycastle.util.encoders.Base64.decode(Unknown Source)\n[SF_KAFKA_CONNECTOR] org.bouncycastle.util.io.pem.PemReader.loadObject(Unknown Source)\n[SF_KAFKA_CONNECTOR] org.bouncycastle.util.io.pem.PemReader.readPemObject(Unknown Source)\n[SF_KAFKA_CONNECTOR] org.bouncycastle.openssl.PEMParser.readObject(PEMParser.java:92)\n[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.EncryptionUtils.parseEncryptedPrivateKey(EncryptionUtils.java:40)\n[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.InternalUtils.createProperties(InternalUtils.java:162)\n[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceFactory$SnowflakeConnectionServiceBuilder.setProperties(SnowflakeConnectionServiceFactory.java:40)\n[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.SnowflakeSinkConnector.start(SnowflakeSinkConnector.java:140)\n[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111)\n[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136)\n[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:196)\n[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:252)\n[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1079)\n[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:117)\n[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1095)\n[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1091)\n[SF_KAFKA_CONNECTOR] java.util.concurrent.FutureTask.run(FutureTask.java:266)\n[SF_KAFKA_CONNECTOR] java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n[SF_KAFKA_CONNECTOR] java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n[SF_KAFKA_CONNECTOR] java.lang.Thread.run(Thread.java:748)\n\tat com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:367)\n\tat com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:343)\n\tat com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:330)\n\tat com.snowflake.kafka.connector.internal.EncryptionUtils.parseEncryptedPrivateKey(EncryptionUtils.java:51)\n\tat com.snowflake.kafka.connector.internal.InternalUtils.createProperties(InternalUtils.java:162)\n\tat com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceFactory$SnowflakeConnectionServiceBuilder.setProperties(SnowflakeConnectionServiceFactory.java:40)\n\tat com.snowflake.kafka.connector.SnowflakeSinkConnector.start(SnowflakeSinkConnector.java:140)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:196)\n\tat org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:252)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1079)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:117)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1095)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1091)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\n"
  },
import org.bouncycastle.util.encoders.Base64;

public class Main {

    public static void main(String[] args) {
        String userData;
        try {
            String key = "-----BEGIN ENCRYPTED PRIVATE KEY-----\n" +
                    "MIIFLTBXBgkqhkiG9w0BBQ0wSjApBgkqhkiG9w0BBQwwHAQIPR0Nozv8sKECAggA\n" +
                    "MAwGCCqGSIb3DQIJBQAwHQYJYIZIAWUDBAEqBBAY+DZPwO+29JoC8l7NdMtVBIIE\n" +
                    "0PDqdGdVo2XnKfaF3PI+CVjY0pXMHgCPrcKNmDpnjvan5nmiWLWO+sNRDpywsNwD\n" +
                    "qifKnCXD6lPl+IkRDNl0yw6Yk6rkva8FZOY5MlwcTnkuBvvpxegKAcW1vAcJ28An\n" +
                    "H3Nl+RbS4NZ4bc+omuvqNuiid6b51Rx3FP9cC5OazVfAt/ocWmUraIxp1siPtdJb\n" +
                    "/6H7k2e5fj4Q57EdDh8SCr5stmdcv04F78K/xiwtW3DHD46d3uNNVS7LVclB+XcR\n" +
                    "6nntthXa9RggRPSBTzCDMVacnxOeoryRs6aY70pmdMo9O8VeGf1yZUJD8ZRfR995\n" +
                    "JkZYsqro3YF8MW9EJaRAInUfoUcLli3U7yYsLxfCmk6wFvg6QPsBU9jc7NWlimyQ\n" +
                    "ZKhgrWqdTV2nXPberR1TPp9tHJu+REHtJvU1eqLsYPfBJCzBgVeZTXePGXhYZTJG\n" +
                    "Q/A5/yx4bwOATmeAolpNs5fFk45lqblzFYScYNDrq0qZ9bGGE0q9/R4gjf0RL4wb\n" +
                    "Jbqf1Kplv/GZMC/Tlj7SaZpoxyKLLzJGOxbE8HjIc+idYQuVnlwfINnLMcaDCube\n" +
                    "Xctyi5ot0lHao6YRb25v4BeBDyG81gJ7IUKULXmn13ZBH6LvKKsa2KwCQj8mtJ9o\n" +
                    "ZQ6I6DmxkQFWSOvUNihWq0+Lc/VGyrY78LZWVVKKaEc9VlLy/VcK7oijIuKZROLH\n" +
                    "L0BsJWF/eh9xWhRxSa+GpdLZL5bbG7UKu5VdYRjA54+tL0jJ+oZMhDQO1Da/F0h2\n" +
                    "u8dmK2JFu4MmH3TPDHZihr1bg3nDmda7wcigW4d+CnY3xfoqSPRMK/eyaIlKC+vG\n" +
                    "T9qchSRCY4YTd+xD/hgZApQSi756ro559FddFVdnF9xet4PpEaZxftswCq2wBBwu\n" +
                    "r93ql3HvqGqkMVz6llfb/lOvYD3+84rM/zzdqB+85EEnvFDNcZeNlhGsfvjNn8IP\n" +
                    "fxr89NJOQFreHJHw3yKRPyGkbQd+eVzZGQv1RgMJbfXD3tslvZwgpQ2lrP7EvHmn\n" +
                    "pNFdK4PZIxXV0j8eiNIHf9Uw2apcbSul/e5IM04p7nlaKNQfD3wLTM37zbe0tU2D\n" +
                    "GxZ3b5G4tABXdq2q699vcFnObqANeG5zC10xohFYuExjttplqFdNTuS1ffhumNiA\n" +
                    "582Dd2/CXwTlORacZ7T3mE6CqujZ/iXM93zjmKUIp/RiR+G5nV44uXR2VxtXEWc+\n" +
                    "E6bG8i+X+ogjB645Mw1OxYbgb3d8eyQby+JIgHN3hRUvQJsRe/Q+BRf0cG+wTMAf\n" +
                    "m+DKQxooe9ABkypET2rtlwMGb4/ffOS1etzP8hnG7vh+f2ElqG+iQIEffwHguUj0\n" +
                    "QnQFWWkRVfsVVOocIuar74I+tZ5JpPPedMrskSBeNwZXp0EM+drNUJb0PlT9X9Bz\n" +
                    "xK6Z9GZ9MjR6gpPznhsG8uyrby2Y19H/YfqVF6PfqqXMJSD1Ya3Q4lg1GM4ZqYBO\n" +
                    "o70B4QZRFtQSm4gHEnLkkK6agcn7ULQP5JZDP+3oz71qz44oFYcE6X6Vanf21TCD\n" +
                    "FeGDhA6ZM9Q6GYnhwb6cWtPn6bsP+emCrXmO6LuT66wEjxi7c/yxjIbL8uvDzFGw\n" +
                    "cNMoLbqHuJJj0igJfekMjkcVfY/kP+4X/wVUnKIxdMf3\n" +
                    "-----END ENCRYPTED PRIVATE KEY-----";

            key = key.replaceAll("-+[A-Za-z ]+-+", "");
            key = key.replaceAll("\\s", "");

            StringBuilder builder = new StringBuilder();
            //Comment this line for success
            builder.append("-----BEGIN ENCRYPTED PRIVATE KEY-----");
            for (int i = 0; i < key.length(); i++)
            {
                if (i % 64 == 0)
                {
                    builder.append("\n");
                }
                builder.append(key.charAt(i));
            }
            //Comment this line for success
            builder.append("\n-----END ENCRYPTED PRIVATE KEY-----");
            key = builder.toString();

            userData = new String( Base64.decode( key ) );
            System.out.println(userData);
        } catch ( Exception e ) {
            e.printStackTrace();
        }
    }
}

30X Snowpipe usage compared to AWS Snowpipe

I've tried to use the snowflake kafka connector and it works great with version 0.5.5 but the issue is that the data imported is not as compressed as the data imported via S3 (60B vs 144B per row calculated with a total data pool of 1.7TB and 20GB respectively) and the snowpipe credit usage is 30X what it is while using AWS snowpipe following this tutorial (https://docs.snowflake.net/manuals/user-guide/data-load-snowpipe-auto-s3.html)

Right now on average we're spending slightly over 1 credit per day to load the data via S3 through our old pipeline, but running the snowflake kafka connector for 8 hours has already cost us close to 10 credits. This is nearly a 30X increase as prorated the rest of the day we would be spending 30 credits.

Are there any optimizations that can be made to avoid this and make data transfer more efficient? It seems like the connector makes a stage on S3 so I'm a little confused by the vast increase in cost.

SinkTask hasn't been initialized before calling PUT function

This is an exact dup of #77
Snowflake connector version 1.0.0
Kafka connect version 2.1.2

The error persists over connector restarts and triggers each time. I have not managed to successfully get a single message into Snowflake.

[2020-01-09 13:31:00,983] ERROR WorkerSinkTask{id=kafkaingest-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:585)
com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException:
[SF_KAFKA_CONNECTOR] Exception: Failed to put records
[SF_KAFKA_CONNECTOR] Error Code: 5014
[SF_KAFKA_CONNECTOR] Detail: SinkTask hasn't been initialized before calling PUT function
	at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:362)
	at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:321)
	at com.snowflake.kafka.connector.SnowflakeSinkTask.getSink(SnowflakeSinkTask.java:94)
	at com.snowflake.kafka.connector.SnowflakeSinkTask.put(SnowflakeSinkTask.java:195)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
[2020-01-09 13:31:00,990] ERROR WorkerSinkTask{id=kafkaingest-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException:
[SF_KAFKA_CONNECTOR] Exception: Failed to put records
[SF_KAFKA_CONNECTOR] Error Code: 5014
[SF_KAFKA_CONNECTOR] Detail: SinkTask hasn't been initialized before calling PUT function
	at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:362)
	at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:321)
	at com.snowflake.kafka.connector.SnowflakeSinkTask.getSink(SnowflakeSinkTask.java:94)
	at com.snowflake.kafka.connector.SnowflakeSinkTask.put(SnowflakeSinkTask.java:195)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
	... 10 more

Production ready status?

Hey! Thanks for supporting kafka-connect platform.

Just wondering, is there any version that I can deploy to production?
After quick evaluation, v0.4.0 fails at some point with non-recoverable error (failed to drop stage).

I've tried to use v0.5.0, but, got a NullPointerException, was not able to solve problem.
We are using topic2table mapping, so, according to latest changes - it seems there was a bug introduced.

So, going back to original question: is there any version (maybe v0.5.1?) that can be considered stable to be used in production?

topic2table map is not being parsed correctly

topic2table map is not being parsed correctly. In this call, the property name (i.e. snowflake.topic2table.map), not its value, is being passed to Utils.parseTopicToTableMap().

Utils.parseTopicToTableMap(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP)

Below is the call stack

  [1] com.snowflake.kafka.connector.Utils.parseTopicToTableMap (Utils.java:408)
  [2] com.snowflake.kafka.connector.SnowflakeSinkTask.start (SnowflakeSinkTask.java:85)
  [3] org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart (WorkerSinkTask.java:300)
  [4] org.apache.kafka.connect.runtime.WorkerSinkTask.execute (WorkerSinkTask.java:189)
  [5] org.apache.kafka.connect.runtime.WorkerTask.doRun (WorkerTask.java:177)
  [6] org.apache.kafka.connect.runtime.WorkerTask.run (WorkerTask.java:227)
  [7] java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
  [8] java.util.concurrent.FutureTask.run (FutureTask.java:266)
  [9] java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1,142)
  [10] java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:617)
  [11] java.lang.Thread.run (Thread.java:745)

Note that an earlier call is being made correctly:

parseTopicToTableMap(config.get(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP))==null)

Error message should be more specific

Snowflake Kafka Connector Version: 0.5.1

When running the connector I get the following error. It would be really useful if the connector could throw specific details about which configuration it is expecting / missing values / incorrect value. As it is the user just has to guess :)

[SF_KAFKA_CONNECTOR] Exception: Invalid input connector configuration
[SF_KAFKA_CONNECTOR] Error Code: 0001
[SF_KAFKA_CONNECTOR] Detail: input kafka connector configuration is null, missing required values, or wrong input value
  at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:347)
  at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:306)
  at com.snowflake.kafka.connector.Utils.validateConfig(Utils.java:400)
  at com.snowflake.kafka.connector.SnowflakeSinkConnector.start(SnowflakeSinkConnector.java:131)
  at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111)
  at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136)
  at org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:196)
  at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:252)
  at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1079)
  at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:117)
  at org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1095)
  at org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1091)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

Questions about Fault-Tolerance and Exactly-Once Delivery mechanisms

The current docs for this connector state

Messages will neither be duplicated nor silently dropped. Messages will be delivered exactly once, or an error message will be generated. If an error is detected while loading a record (for example, the record was expected to be a well-formed JSON or Avro record, but wasnโ€™t well-formed, then the record is not loaded; instead, an error message is returned.

I have several questions about this:

  1. Can you explain what the mechanism is for ensuring that rows are only inserted once into the ingest table?
  2. How is that mechanism tolerant of tasks crashing and restarting at a particular offset? Or of having that partition assigned to another task while the first is down?
  3. How can I see these error messages?
  4. Will one row in a batch causing an error stop the rest of the batch?

Later in the same section you call attention to

Instances of the Kafka connector do not communicate with each other. If you start multiple instances of the connector on the same topics or partitions, then multiple copies of the same row might be inserted into the table. This is not recommended; each topic should be processed by only one instance of the connector.

  1. When you use the word "instance" here, does that mean connector registration, or connector task? (I'm pretty sure I know the answer to this one, but I just want to have it explicitly confirmed).

Offset not being set/read correctly

  • Connector version : 1.0.0
  • Kafka Connect confluent-5.3.0-2.12

Hi,

We are facing multiple issues related to reliability of messages being sent to snowflake with the connector. Raising ticket for one such issue.

Problem:

  • We stopped the job and reset the offset to an earlier one to reload some messages again. However, got this warning multiple times and missed more messages than before.

[2020-02-17 17:10:31,751] WARN WorkerSinkTask{id=snowflake_gis_stats_v5-0} Ignoring invalid task provided offset gis-stats-v5-0/OffsetAndMetadata{offset=9223372036854775807, leaderEpoch=null, metadata=''} -- not yet consumed, taskOffset=9223372036854775807 currentOffset=8825666 (org.apache.kafka.connect.runtime.WorkerSinkTask:416)

This does not happen often but we encountered it yesterday. The taskOffset looks weird.

Snowflake Connector not shutting down properly

After upgrading to 0.4.0, the connector seems to shut down improperly where it fails to drop the internal stage. When I shut down the connector, I get the following error message:

[2019-09-10 15:39:42,757] ERROR Graceful stop of task snowflake_sink-0 failed. (org.apache.kafka.connect.runtime.Worker)
[2019-09-10 15:39:43,314] ERROR [SF_KAFKA_CONNECTOR] Failed to drop empty internal stage: SNOWFLAKE_KAFKA_CONNECTOR_snowflake_sink_STAGE_Test4. It can safely be removed. Exception: java.lang.IllegalStateException: Shutdown in progress (com.snowflake.kafka.connector.SnowflakeSinkConnector)

When I check in Snowflake, the internal stage still appears to be there (just as the error message states). When I run the connector again with the internal stage still present, it actually deletes the stage but still throws the following error and crashes:

[2019-09-10 15:41:36,622] ERROR WorkerConnector{id=snowflake_sink} Error while starting connector (org.apache.kafka.connect.runtime.WorkerConnector)
com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException:
[SF_KAFKA_CONNECTOR] Exception: Connector stopped due to invalid configuration
[SF_KAFKA_CONNECTOR] Error Code: 5006
[SF_KAFKA_CONNECTOR] Detail: Exception while creating or validating table, stage, or pipe.
[SF_KAFKA_CONNECTOR] Message:
[SF_KAFKA_CONNECTOR] [SF_KAFKA_CONNECTOR] Exception: Incompatible stage
[SF_KAFKA_CONNECTOR] [SF_KAFKA_CONNECTOR] Error Code: 5004
[SF_KAFKA_CONNECTOR] [SF_KAFKA_CONNECTOR] Detail: Stage contains files that were not created by the Snowflake Kafka Connector. The stage needs a careful cleanup.
[SF_KAFKA_CONNECTOR] [SF_KAFKA_CONNECTOR] Message: Stage name: SNOWFLAKE_KAFKA_CONNECTOR_snowflake_sink_STAGE_Test4
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:347)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.SnowflakeSinkConnector.start(SnowflakeSinkConnector.java:217)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:196)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:231)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.standalone.StandaloneHerder.startConnector(StandaloneHerder.java:289)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:205)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:113)
        at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:347)
        at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:323)
        at com.snowflake.kafka.connector.SnowflakeSinkConnector.start(SnowflakeSinkConnector.java:231)
        at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111)
        at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136)
        at org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:196)
        at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:231)
        at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.startConnector(StandaloneHerder.java:289)
        at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:205)
        at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:113)

When I shut down the connector and restart it once more (after getting this error), it works and I can continue sending messages as usual. I'm curious as to why the internal stage isn't being properly dropped when I stop the connector.

ERROR Failed to create job for snowflake.properties

Hi team,

Can you please help, I am using the standalone.sh and getting the error , Below are the config files.

[2019-07-24 07:47:41,918] ERROR Failed to create job for /Users/skumar/kafka/code/snowflake.properties (org.apache.kafka.connect.cli.ConnectStandalone:110)
[2019-07-24 07:47:41,918] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:121)

connect-standalone.sh config/connect-standalone.properties snowflake.properties

Here is the snowflake.properties file:

{
"name":"XYZCompanySensorData",
"Config":{
"connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
"tasks.max":"8",
"topics":"snowflake_test_topic",
"snowflake.topic2table.map": "snowflake_test_topic:snowflake_test_topic",
"buffer.count.records":"5",
"buffer.size.bytes":"2000",
"snowflake.url.name":"xxxxx.snowflakecomputing.com",
"snowflake.user.name":"xxxx",
"snowflake.private.key":"NDEJrJRQquVqg==", -- private key
"snowflake.database.name":"xxxxx",
"snowflake.schema.name":"xxxxx",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"com.snowflake.kafka.connector.records.SnowflakeJsonConverter"
}
}

------- connect-standalone.properties . --- file

These are defaults. This file just demonstrates how to override some settings.

bootstrap.servers=localhost:9092

The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will

need to configure these based on the format they want their data in when loaded from or stored into Kafka

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply

it to

key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets

Flush much faster than normal, which is useful for testing/debugging

offset.flush.interval.ms=10000

Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins

(connectors, converters, transformations). The list should consist of top level directories that include

any combination of:

a) directories immediately containing jars with plugins and their dependencies

b) uber-jars with plugins and their dependencies

c) directories immediately containing the package directory structure of classes of plugins and their dependencies

Note: symlinks will be followed to discover dependencies or plugins.

Examples:

plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,

plugin.path=/Users/skumar/kafka/kafka_connect/connectors/snowflake
#,/Users/skumar/kafka/kafka_connect/connectors/kafka-connect-twitter-0.2.26/usr/share/kafka-connect/kafka-connect-twitter

Connector fails with unrecoverable exception (Error Code: 3001)

Snowflake Kafka Connector fails sporadically with the error message below:
"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception."
[...]
[SF_KAFKA_CONNECTOR] Exception: Failed to ingest file
[SF_KAFKA_CONNECTOR] Error Code: 3001

For some reason, it sometimes helps to restart tasks / the whole connector but it starts failing again after a few hours or days.
We are using the connector to ingest AVRO formatted messages from several Kafka topics into Snowflake. The versions we are facing the issues with are 1.1.0 and even 1.2.3.

Our configuration looks as follows:
connector_config.txt

Log message from the Connect worker:
log_msg.txt

v0.5.1 does not work in Distributed Connector mode

[2019-10-21 11:54:21,875] ERROR WorkerSinkTask{id=MY_TOPIC-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask)
java.lang.NullPointerException
at com.snowflake.kafka.connector.SnowflakeSinkTask.put(SnowflakeSinkTask.java:165)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

java.lang.NoSuchMethodError: org.bouncycastle.crypto.CryptoServicesRegistrar.isInApprovedOnlyMode

Hitting following exception with snowflake-kafka-connector-1.0.0.jar
java.lang.NoSuchMethodError: org.bouncycastle.crypto.CryptoServicesRegistrar.isInApprovedOnlyMode

Confluent Docker:

git clone https://github.com/confluentinc/examples
cd examplesย git checkout 5.3.1-post ย  ย  โ† Version number may be different

Snowflake Kafka Connector: snowflake-kafka-connector-1.0.0.jar

(base) Balbirs-MBP:cp-all-in-one balbirsinghmatharu$ docker exec 780e0268538d ls /usr/share/confluent-hub-components/
confluentinc-kafka-connect-datagen
confluentinc-kafka-connect-gcs
snowflake-kafka-connector-1.0.0.jar

Connector Config:

(base) Balbirs-MBP:cp-all-in-one balbirsinghmatharu$ cat ../../../kafka/snowflake_connector.config 
{
  "name":"snowflakeConnector",
  "config": {
    "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
    "tasks.max":"8",
    "topics":"users",
    "snowflake.url.name":"xyz.snowflakecomputing.com",
    "snowflake.user.name":"balbir",
    "snowflake.private.key":"/Users/balbirsinghmatharu/Desktop/DataDrive/phData/Snowflakes/rsa_key.p8",
    "snowflake.private.key.passphrase":"XXXXXXXX",
    "snowflake.database.name":"USER_balbir",
    "snowflake.schema.name":"KAFKA",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "value.converter":"com.snowflake.kafka.connector.records.SnowflakeJsonConverter",
    "value.converter.schema.registry.url":"http://localhost:8081"
  }
}

Exception when verifying the connector status:

(base) Balbirs-MBP:cp-all-in-one balbirsinghmatharu$ curl http://localhost:8083/connectors/snowflakeConnector/status | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  2110  100  2110    0     0   294k      0 --:--:-- --:--:-- --:--:--  294k
{
  "name": "snowflakeConnector",
  "connector": {
    "state": "FAILED",
    "worker_id": "connect:8083",
    "trace": "java.lang.NoSuchMethodError: org.bouncycastle.crypto.CryptoServicesRegistrar.isInApprovedOnlyMode()Z\n\tat org.bouncycastle.jcajce.provider.ProvSecureHash$MD5.configure(Unknown Source)\n\tat org.bouncycastle.jcajce.provider.BouncyCastleFipsProvider.<init>(Unknown Source)\n\tat org.bouncycastle.jcajce.provider.BouncyCastleFipsProvider.<init>(Unknown Source)\n\tat org.bouncycastle.jcajce.provider.BouncyCastleFipsProvider.<init>(Unknown Source)\n\tat com.snowflake.kafka.connector.internal.EncryptionUtils.parseEncryptedPrivateKey(EncryptionUtils.java:35)\n\tat com.snowflake.kafka.connector.internal.InternalUtils.createProperties(InternalUtils.java:162)\n\tat com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceFactory$SnowflakeConnectionServiceBuilder.setProperties(SnowflakeConnectionServiceFactory.java:40)\n\tat com.snowflake.kafka.connector.SnowflakeSinkConnector.start(SnowflakeSinkConnector.java:137)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:196)\n\tat org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:260)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1183)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1400(DistributedHerder.java:125)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1199)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1195)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\n"
  },
  "tasks": [],
  "type": "sink"
}

0.4.0 release?

Don't mean to be pushy, especially since you've been so accommodating on the 2 feature requests I made, but I'd really like to use said features! Any chance you can release them as 0.4.0?

RequestBuilder Insert returns 500 Internal Server Error

Our user is connected successfully using key-pair auth as expected. The Connector is getting to the point of creating the stage, the pipe, and the destination table, but is failing after the REST API call Created Insert Request:

connect_1             | [SF_KAFKA_CONNECTOR] Creating new stage SNOWFLAKE_KAFKA_CONNECTOR_snowflake_test_STAGE_debezium_dev.debezium.customers.   [com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1]
connect_1             | 2020-01-10 14:58:04,187 INFO   ||
connect_1             | [SF_KAFKA_CONNECTOR] create stage SNOWFLAKE_KAFKA_CONNECTOR_snowflake_test_STAGE_debezium_dev.debezium.customers   [com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceV1]
connect_1             | 2020-01-10 14:58:07,646 INFO   ||
connect_1             | [SF_KAFKA_CONNECTOR] list stage SNOWFLAKE_KAFKA_CONNECTOR_snowflake_test_STAGE_debezium_dev.debezium.customers retrieved 0 file names   [com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceV1]
connect_1             | 2020-01-10 14:58:07,650 INFO   ||
connect_1             | [SF_KAFKA_CONNECTOR] pipe SNOWFLAKE_KAFKA_CONNECTOR_snowflake_test_PIPE_debezium_dev.debezium.customers_0, recovered from existing pipe   [com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1]
connect_1             | 2020-01-10 14:58:07,658 INFO   ||
connect_1             | [SF_KAFKA_CONNECTOR] pipe SNOWFLAKE_KAFKA_CONNECTOR_snowflake_test_PIPE_debezium_dev.debezium.customers_0: cleaner started   [com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1]
connect_1             | 2020-01-10 14:58:07,664 INFO   ||
connect_1             | [SF_KAFKA_CONNECTOR] pipe SNOWFLAKE_KAFKA_CONNECTOR_snowflake_test_PIPE_debezium_dev.debezium.customers_0: flusher started   [com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1]
connect_1             | 2020-01-10 14:58:21,609 INFO   ||  WorkerSinkTask{id=snowflake_test-0} Committing offsets asynchronously using sequence number 1: {foo_debezium.debezium.customers-0=OffsetAndMetadata{offset=12, leaderEpoch=null, metadata=''}}   [org.apache.kafka.connect.runtime.WorkerSinkTask]
connect_1             | [pool-15-thread-1] INFO net.snowflake.ingest.SimpleIngestManager - Sending Request UUID -
connect_1             | [pool-15-thread-1] INFO net.snowflake.ingest.connection.RequestBuilder - Created Insert Request : https://foodev.snowflakecomputing.com:443/v1/data/pipes/debezium_dev.debezium.SNOWFLAKE_KAFKA_CONNECTOR_snowflake_test_PIPE_debezium_dev.debezium.customers_0/insertFiles?requestId=1170b5ba-b709-4848-b5d3-48fe62ea2e3b
connect_1             | [pool-15-thread-1] INFO net.snowflake.ingest.utils.HttpUtil - Sleep time in millisecond: 2000
connect_1             | [pool-15-thread-1] INFO net.snowflake.ingest.utils.HttpUtil - Sleep time in millisecond: 4000
connect_1             | [pool-15-thread-1] INFO net.snowflake.ingest.utils.HttpUtil - Sleep time in millisecond: 8000
connect_1             | 2020-01-10 14:58:49,843 INFO   ||  WorkerSourceTask{id=foo-debezium-connector-0} Committing offsets   [org.apache.kafka.connect.runtime.WorkerSourceTask]
connect_1             | 2020-01-10 14:58:49,849 INFO   ||  WorkerSourceTask{id=foo-debezium-connector-0} flushing 0 outstanding messages for offset commit   [org.apache.kafka.connect.runtime.WorkerSourceTask]
connect_1             | 2020-01-10 14:58:49,900 INFO   ||  WorkerSourceTask{id=foo-debezium-connector-0} Finished commitOffsets successfully in 52 ms   [org.apache.kafka.connect.runtime.WorkerSourceTask]
connect_1             | [pool-15-thread-1] INFO net.snowflake.ingest.utils.HttpUtil - Reach the max retry time.
connect_1             | [pool-15-thread-1] INFO net.snowflake.ingest.SimpleIngestManager - Attempting to unmarshall insert response - HttpResponseProxy{HTTP/1.1 500 Internal Server Error [Cache-Control: must-revalidate,no-cache,no-store, Content-Type: text/html;charset=ISO-8859-1, Date: Fri, 10 Jan 2020 14:58:55 GMT, Server: nginx, Strict-Transport-Security: max-age=31536000, X-Content-Type-Options: nosniff, X-Frame-Options: deny, Content-Length: 1469, Connection: keep-alive] ResponseEntityProxy{[Content-Type: text/html;charset=ISO-8859-1,Content-Length: 1469,Chunked: false]}}
connect_1             | [pool-15-thread-1] WARN net.snowflake.ingest.connection.ServiceResponseHandler - Exceptional Status Code found in unmarshallInsert Response  - 500
connect_1             | [pool-15-thread-1] ERROR net.snowflake.ingest.connection.ServiceResponseHandler - Status code 500 found in response from service
connect_1             | [pool-14-thread-1] INFO net.snowflake.ingest.connection.RequestBuilder - Final History URIBuilder - https://foodev.snowflakecomputing.com:443/v1/data/pipes/debezium_dev.debezium.SNOWFLAKE_KAFKA_CONNECTOR_snowflake_test_PIPE_debezium_dev.debezium.customers_0/insertReport?requestId=bb62fa74-62cf-40d8-b94a-2c6597458181
connect_1             | [pool-14-thread-1] INFO net.snowflake.ingest.utils.HttpUtil - Sleep time in millisecond: 2000
connect_1             | [pool-14-thread-1] INFO net.snowflake.ingest.utils.HttpUtil - Sleep time in millisecond: 4000
connect_1             | [pool-14-thread-1] INFO net.snowflake.ingest.utils.HttpUtil - Sleep time in millisecond: 8000
connect_1             | [pool-14-thread-1] INFO net.snowflake.ingest.utils.HttpUtil - Reach the max retry time.
connect_1             | [pool-14-thread-1] INFO net.snowflake.ingest.SimpleIngestManager - Attempting to unmarshall history response - HttpResponseProxy{HTTP/1.1 500 Internal Server Error [Cache-Control: must-revalidate,no-cache,no-store, Content-Type: text/html;charset=ISO-8859-1, Date: Fri, 10 Jan 2020 14:59:23 GMT, Server: nginx, Strict-Transport-Security: max-age=31536000, X-Content-Type-Options: nosniff, X-Frame-Options: deny, Content-Length: 1470, Connection: keep-alive] ResponseEntityProxy{[Content-Type: text/html;charset=ISO-8859-1,Content-Length: 1470,Chunked: false]}}
connect_1             | [pool-14-thread-1] WARN net.snowflake.ingest.connection.ServiceResponseHandler - Exceptional Status Code found in unmarshallHistoryResponse - 500
connect_1             | [pool-14-thread-1] ERROR net.snowflake.ingest.connection.ServiceResponseHandler - Status code 500 found in response from service

EDIT: We have also tried giving admin privileges to the user, which has not affected the error.

I see we are not the first to encounter this problem (https://support.snowflake.net/s/question/0D50Z00009VuxuiSAB/snowpipe-rest-api-throwing-http-error-500). We have tried a lot of fixes but to no avail - this is a hard blocker for us! Thanks!

0.5.2: private key does not validate

Logs are as follows:

[SF_KAFKA_CONNECTOR] Exception: Invalid encrypted private key or passphrase
[SF_KAFKA_CONNECTOR] Error Code: 0018
[SF_KAFKA_CONNECTOR] Detail: failed to decrypt private key
[SF_KAFKA_CONNECTOR] Message: Cannot find any provider supporting 1.2.840.113549.1.5.13
[SF_KAFKA_CONNECTOR] java.base/javax.crypto.Cipher.getInstance(Cipher.java:565)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.InternalUtils.parseEncryptedPrivateKey(InternalUtils.java:96)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.InternalUtils.createProperties(InternalUtils.java:197)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceFactory$SnowflakeConnectionServiceBuilder.setProperties(SnowflakeConnectionServiceFactory.java:40)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.SnowflakeSinkConnector.start(SnowflakeSinkConnector.java:137)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:195)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:253)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.standalone.StandaloneHerder.startConnector(StandaloneHerder.java:289)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:205)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:109)
	at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:367)
	at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:343)
	at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:330)
	at com.snowflake.kafka.connector.internal.InternalUtils.parseEncryptedPrivateKey(InternalUtils.java:109)
	at com.snowflake.kafka.connector.internal.InternalUtils.createProperties(InternalUtils.java:197)
	at com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceFactory$SnowflakeConnectionServiceBuilder.setProperties(SnowflakeConnectionServiceFactory.java:40)
	at com.snowflake.kafka.connector.SnowflakeSinkConnector.start(SnowflakeSinkConnector.java:137)
	at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111)
	at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136)
	at org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:195)
	at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:253)
	at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.startConnector(StandaloneHerder.java:289)
	at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:205)
	at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:109)

I believe this is related to this commit which removed the BouncyCastle crypto provider which is suggested by https://stackoverflow.com/questions/46767281/reading-pkcs8-in-pem-format-cannot-find-provider/46770084#46770084

6761699

Missing Metadata

According to the 0.4.0 release, it stated there would be a header, key, timestamp, and Schema ID included in the record_metadata column. Since the 0.3.2 release, I've noticed a timestamp has been added but nothing else. Where are the additional metadata elements (especially the Kafka key)?

image

Getting NPE in snowflake Kafka connector(version 0.3)

==> Getting following NPE continuossly and worker threads getting restarts

[SF_KAFKA_CONNECTOR] pipe SNOWFLAKE_KAFKA_CONNECTOR_ dropped (com.snowflake.kafka.connector.internal.SnowflakeJDBCWrapper)
[2019-12-17 11:03:30,616] ERROR WorkerSinkTask{id=<>-2} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException

[2019-12-17 11:01:22,388] INFO SinkConnectorConfig values:
config.action.reload = restart
connector.class = com.snowflake.kafka.connector.SnowflakeSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = CorrigoConnector
tasks.max = 4
topics = [name]
topics.regex =
transforms = []
value.converter = class com.snowflake.kafka.connector.records.SnowflakeAvroConverter
(org.apache.kafka.connect.runtime.SinkConnectorConfig)

{"id":0,"state":"FAILED","worker_id":"0.0.0.0:443","trace":"org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.lang.NullPointerException\n\tat java.nio.ByteBuffer.wrap(ByteBuffer.java:396)\n\tat com.snowflake.kafka.connector.records.SnowflakeAvroConverter.toConnectData(SnowflakeAvroConverter.java:75)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:514)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)\n\t... 13 more\n"},{"id":1,"state":"FAILED","worker_id":"0.0.0.0:443","trace":"org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.lang.NullPointerException\n\tat java.nio.ByteBuffer.wrap(ByteBuffer.java:396)\n\tat com.snowflake.kafka.connector.records.SnowflakeAvroConverter.toConnectData(SnowflakeAvroConverter.java:75)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:514)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)\n\t... 13 more\n"},{"id":2,"state":"FAILED","worker_id":"0.0.0.0:443","trace":"org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.lang.NullPointerException\n\tat java.nio.ByteBuffer.wrap(ByteBuffer.java:396)\n\tat com.snowflake.kafka.connector.records.SnowflakeAvroConverter.toConnectData(SnowflakeAvroConverter.java:75)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:514)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)\n\t... 13 more\n"},{"id":3,"state":"FAILED","worker_id":"0.0.0.0:443","trace":"org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.lang.NullPointerException\n\tat java.nio.ByteBuffer.wrap(ByteBuffer.java:396)\n\tat com.snowflake.kafka.connector.records.SnowflakeAvroConverter.toConnectData(SnowflakeAvroConverter.java:75)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:514)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)\n\t...

Crash when trying to handle broken data

After upgrading the connector 0.4.0, it is constantly crashing, probably on handling broken data (from looking at the stack trace).

The configuration includes:

    "errors.tolerance": "all",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "com.snowflake.kafka.connector.records.SnowflakeJsonConverter",

stack trace:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.NullPointerException
	at com.snowflake.kafka.connector.records.SnowflakeRecordContent.getBrokenData(SnowflakeRecordContent.java:81)
	at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.writeBrokenDataToTableStage(SnowflakeSinkServiceV1.java:358)
	at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.insert(SnowflakeSinkServiceV1.java:321)
	at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.access$100(SnowflakeSinkServiceV1.java:167)
	at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1.insert(SnowflakeSinkServiceV1.java:78)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
	at com.snowflake.kafka.connector.SnowflakeSinkTask.put(SnowflakeSinkTask.java:160)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
	... 10 more

Connector is Requiring 2 Databases Undocumented

The connector is using two databases that are not documented so far as I can see anywhere. They seem to following this naming convention:

  • SNOWFLAKE_KAFKA_CONNECTOR_${name}_PIPE_${snowflake.database.name}
  • SNOWFLAKE_KAFKA_CONNECTOR_${name}_STAGE_${snowflake.database.name}
  1. Why is this? Why can't it simply use ${snowflake.database.name}? Is it configurable?
  2. Can this please be added to the docs?

Snowflake connector 0.5.0 and 0.5.1 both throwing NPE

Stacktrace:

[SF_KAFKA_CONNECTOR] SnowflakeSinkTask:start (com.snowflake.kafka.connector.SnowflakeSinkTask:78)
[2019-10-22 12:07:31,561] INFO
[SF_KAFKA_CONNECTOR] initialized the snowflake connection (com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceV1:38)
[2019-10-22 12:07:31,562] INFO WorkerSinkTask{id=kafkaingest-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:303)
[2019-10-22 12:07:31,836] INFO Cluster ID: npJAAcx-SjyK242XsgS1MQ (org.apache.kafka.clients.Metadata:285)
[2019-10-22 12:07:31,838] INFO [Consumer clientId=consumer-2, groupId=connect-kafkaingest] Discovered group coordinator 10.225.171.232:9093 (id: 2147483646 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:654)
[2019-10-22 12:07:31,851] INFO [Consumer clientId=consumer-2, groupId=connect-kafkaingest] Revoking previously assigned partitions [] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:458)
[2019-10-22 12:07:31,855] INFO [Consumer clientId=consumer-2, groupId=connect-kafkaingest] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:486)
[2019-10-22 12:07:32,485] ERROR WorkerSinkTask{id=kafkaingest-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:585)
java.lang.NullPointerException
	at com.snowflake.kafka.connector.SnowflakeSinkTask.put(SnowflakeSinkTask.java:168)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
[2019-10-22 12:07:32,487] ERROR WorkerSinkTask{id=kafkaingest-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.NullPointerException
	at com.snowflake.kafka.connector.SnowflakeSinkTask.put(SnowflakeSinkTask.java:168)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
	... 10 more
[2019-10-22 12:07:32,489] ERROR WorkerSinkTask{id=kafkaingest-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)
[2019-10-22 12:07:32,489] INFO
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask:stop (com.snowflake.kafka.connector.SnowflakeSinkTask:112)

Configuration:

name=kafkaingest
connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
tasks.max=1
topics=barTopic
buffer.count.records=100
buffer.flush.time=60
buffer.size.bytes=65536
snowflake.url.name=xxx.ap-southeast-2.snowflakecomputing.com
snowflake.user.name=kafka_ingest
snowflake.private.key=xxx
snowflake.private.key.passphrase=xxx
snowflake.database.name=xxx
snowflake.schema.name=xxx
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter

Connector fails to create table

Hi, first of all, thanks a lot for building & maintaining this connector, it will probably help lots of developers & companies like us. What I am trying to do is, replicating data from one PostgreSQL database to snowflake by using debezium as source and this connector as sink. Debezium part works flawless as it's a bit more mature but I am having problems with snowflake sink connector. According to logs, it can not create a table because the session does not have a current database even though I set the database name while configuring the connector.

Here is the full log;

[SF_KAFKA_CONNECTOR] Creating new table test_source_public_items_196930796. (com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1)
[SF_KAFKA_CONNECTOR] Exception: Failed to prepare SQL statement
[SF_KAFKA_CONNECTOR] Error Code: 2001
[SF_KAFKA_CONNECTOR] Detail: SQL Exception, reported by Snowflake JDBC
[SF_KAFKA_CONNECTOR] Message: Cannot perform CREATE TABLE. This session does not have a current database. Call 'USE DATABASE', or use a qualified name.
[SF_KAFKA_CONNECTOR] net.snowflake.client.jdbc.SnowflakeUtil.checkErrorAndThrowExceptionSub(SnowflakeUtil.java:139)
[SF_KAFKA_CONNECTOR] net.snowflake.client.jdbc.SnowflakeUtil.checkErrorAndThrowException(SnowflakeUtil.java:64)
[SF_KAFKA_CONNECTOR] net.snowflake.client.core.StmtUtil.pollForOutput(StmtUtil.java:495)
[SF_KAFKA_CONNECTOR] net.snowflake.client.core.StmtUtil.execute(StmtUtil.java:372)
[SF_KAFKA_CONNECTOR] net.snowflake.client.core.SFStatement.executeHelper(SFStatement.java:505)
[SF_KAFKA_CONNECTOR] net.snowflake.client.core.SFStatement.executeQueryInternal(SFStatement.java:249)
[SF_KAFKA_CONNECTOR] net.snowflake.client.core.SFStatement.executeQuery(SFStatement.java:187)
[SF_KAFKA_CONNECTOR] net.snowflake.client.core.SFStatement.execute(SFStatement.java:796)
[SF_KAFKA_CONNECTOR] net.snowflake.client.jdbc.SnowflakeStatementV1.executeInternal(SnowflakeStatementV1.java:308)
[SF_KAFKA_CONNECTOR] net.snowflake.client.jdbc.SnowflakePreparedStatementV1.execute(SnowflakePreparedStatementV1.java:509)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceV1.createTable(SnowflakeConnectionServiceV1.java:65)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceV1.createTable(SnowflakeConnectionServiceV1.java:79)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.createTableAndStage(SnowflakeSinkServiceV1.java:681)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.init(SnowflakeSinkServiceV1.java:231)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.insert(SnowflakeSinkServiceV1.java:315)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.access$100(SnowflakeSinkServiceV1.java:172)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1.insert(SnowflakeSinkServiceV1.java:77)
[SF_KAFKA_CONNECTOR] java.util.ArrayList.forEach(ArrayList.java:1257)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.SnowflakeSinkTask.put(SnowflakeSinkTask.java:195)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
[SF_KAFKA_CONNECTOR] java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[SF_KAFKA_CONNECTOR] java.util.concurrent.FutureTask.run(FutureTask.java:266)
[SF_KAFKA_CONNECTOR] java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[SF_KAFKA_CONNECTOR] java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[SF_KAFKA_CONNECTOR] java.lang.Thread.run(Thread.java:748)

And here is the configuration for the connector if it helps;

{
  "key.converter.schema.registry.url": "http://schema-registry:8081",
  "name": "test_sink",
  "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
  "tasks.max": "1",
  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "topics": [
    "test_source.public.items"
  ],
  "snowflake.url.name": "....",
  "snowflake.user.name": "kafka_connect",
  "snowflake.private.key": "....",
  "snowflake.database.name": "kafka_connect_test",
  "snowflake.schema.name": "kafka_schema",
  "value.converter.schema.registry.url": "http://schema-registry:8081"
}

The version of Snowflake sink connector I am using is 0.5.4.

Deadlock when initializing converters

Hi,

We have the following issue with the connector. Two instances of the connector defined, one uses SnowflakeAvroConverter as value.converter, another, SnowflakeJsonConverter. When Kafka Connect initializes everything, it deadlocks in a way like this

Found one Java-level deadlock:
=============================
"pool-14-thread-6":
  waiting to lock monitor 0x00007f2d980339c8 (object 0x00000000d100e920, a org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader),
  which is held by "pool-14-thread-5"
"pool-14-thread-5":
  waiting to lock monitor 0x00007f2d98032b58 (object 0x00000000d2b03558, a org.apache.kafka.connect.runtime.isolation.PluginClassLoader),
  which is held by "pool-14-thread-1"
"pool-14-thread-1":
  waiting to lock monitor 0x00007f2d980339c8 (object 0x00000000d100e920, a org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader),
  which is held by "pool-14-thread-5"

Java stack information for the threads listed above:
===================================================
"pool-14-thread-6":
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:709)
	at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:471)
	at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:464)
	at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:62)
	at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:75)
	at org.apache.kafka.connect.runtime.ConnectorConfig.<init>(ConnectorConfig.java:205)
	at org.apache.kafka.connect.runtime.ConnectorConfig.<init>(ConnectorConfig.java:201)
	at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:405)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:865)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:110)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:880)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:876)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
"pool-14-thread-5":
	at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:91)
	- waiting to lock <0x00000000d2b03558> (a org.apache.kafka.connect.runtime.isolation.PluginClassLoader)
	at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.loadClass(DelegatingClassLoader.java:361)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:709)
	at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:471)
	at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:464)
	at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:62)
	at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:75)
	at org.apache.kafka.connect.runtime.ConnectorConfig.<init>(ConnectorConfig.java:205)
	at org.apache.kafka.connect.runtime.ConnectorConfig.<init>(ConnectorConfig.java:201)
	at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:405)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:865)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:110)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:880)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:876)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
"pool-14-thread-1":
	at java.lang.ClassLoader.loadClass(ClassLoader.java:404)
	- waiting to lock <0x00000000d100e920> (a org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
	at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.loadClass(DelegatingClassLoader.java:352)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
	- locked <0x00000000d045eed8> (a java.lang.Object)
	at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
	- locked <0x00000000d045eed8> (a java.lang.Object)
	- locked <0x00000000d2b03558> (a org.apache.kafka.connect.runtime.isolation.PluginClassLoader)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.ClassLoader.defineClass1(Native Method)
	at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
	at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
	at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
	at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
	at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:96)
	- locked <0x00000000d045ef20> (a java.lang.Object)
	- locked <0x00000000d2b03558> (a org.apache.kafka.connect.runtime.isolation.PluginClassLoader)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at com.snowflake.kafka.connector.records.SnowflakeConverter.<clinit>(SnowflakeConverter.java:39)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:709)
	at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:471)
	at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:464)
	at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:62)
	at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:75)
	at org.apache.kafka.connect.runtime.ConnectorConfig.<init>(ConnectorConfig.java:205)
	at org.apache.kafka.connect.runtime.ConnectorConfig.<init>(ConnectorConfig.java:201)
	at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:234)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:908)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:110)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:924)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:920)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

I managed to boil it down to the following reproducing code:

package org.apache.kafka.connect.runtime;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.connect.runtime.isolation.Plugins;

import static org.apache.kafka.connect.runtime.WorkerConfig.PLUGIN_PATH_CONFIG;

public class Xxxxxx {
    public static void main(String[] args) throws InterruptedException {
        Map<String, String> props = new HashMap<>();
        props.put(PLUGIN_PATH_CONFIG, "<path>");
        Plugins plugins = new Plugins(props);
        plugins.compareAndSwapWithDelegatingLoader();

        Thread1 t1 = new Thread1(plugins);
        Thread2 t2 = new Thread2(plugins);

        t1.start();
        t2.start();

        t1.join();
        t2.join();
    }

    private static class Thread1 extends Thread {
        private final Plugins plugins;

        private Thread1(final Plugins plugins) {
            super("Thread-1");
            this.plugins = plugins;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Map<String, String> props = new HashMap<>();
            props.put("name", "snowflake_avro");
            props.put("connector.class", "com.snowflake.kafka.connector.SnowflakeSinkConnector");
            props.put("value.converter", "com.snowflake.kafka.connector.records.SnowflakeAvroConverter");
            ConnectorConfig connectorConfig = new ConnectorConfig(plugins, props);
        }
    }

    private static class Thread2 extends Thread {
        private final Plugins plugins;

        private Thread2(final Plugins plugins) {
            super("Thread-2");
            this.plugins = plugins;
        }

        @Override
        public void run() {
            Map<String, String> props = new HashMap<>();
            props.put("name", "snowflake_json");
            props.put("connector.class", "com.snowflake.kafka.connector.SnowflakeSinkConnector");
            props.put("value.converter", "com.snowflake.kafka.connector.records.SnowflakeJsonConverter");
            ConnectorConfig connectorConfig = new ConnectorConfig(plugins, props);
        }
    }
}

(reproduces in ~40% of runs).

I'm not absolutely sure now, but this may be caused by https://issues.apache.org/jira/browse/KAFKA-7421. This issue may be fixed at some point, but who knows when and if it will be backported.

I propose to address this with #24. The idea is very simple: delay ObjectMapper initialization to object creation by making it non-static. It's not far (in real time) from the class loading, but it's enought to break the lock. The change is very small in introduces close to 0 overhead.

Avro dependencies should be provided by runtime

Version 1.1.0 has org.apache.avro 19.0 and io.confluent.kafka-connect-avro-converter 5.3.0(which has org.apache.avro 1.8.1 as compile time dependency) as compile time dependencies. Installation Prerequisites document says that you can use Avro 1.8.2+ and Kafka Connect Avro Connector 5.0.0+.

Since Avro version 1.9.0 comes with the artifact it overrides Avro dependency in the runtime. Due to API changes on version 1.9.0 this causes other issues for other connectors that using it.

In our case, we have Avro version 1.8.2 and Kafka Avro Converter version 5.1.2 in our Kafka Connect deployment. While running with Snowflake Kafka Connector, Debezium connector throws following;

java.lang.NoSuchMethodError: 'void org.apache.avro.Schema.addProp(java.lang.String, org.codehaus.jackson.JsonNode)'
	at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:916)
	at io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:1059)
	at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:900)
	at io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:1059)
	at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:900)
	at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:732)
	at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:726)
	at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:365)
	at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:80)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:270)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:270)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:294)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:229)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

Avro converter is optional, due to that I think both Avro and Kafka Avro Converter should be provided by runtime.

Timeline for getting the connector on Confluent Hub?

It's my understanding that uploading Connectors to Confluent Hub isn't as easy as uploading a jar to a package repository, but it would be nice to know what your timeline for getting this Connector onto Confluent Hub is. Are you waiting for 1.0? Is there some approval process that's in the works but not yet complete? Thanks!

A record_timestamp column

We are using the connector in production but we find that we have to rely on user created timestamps or Kafka timestamps for ETL which isn't great. We want to be able to use the timestamp of when each record gets written down in Snowflake. In order to do that we currently have to alter each KafkaConnect created table in Snowflake with default current_timestamp() as the value of the record_timestamp . The problem, of course, is that it's an asynchronous operation. We would find it very useful if the connector could create this column by itself or there was a mechanism for creating this third column with the connector configuration

Enhancement Request: Publish committed offsets in a clean & configurable way

I would like to submit an enhancement request to publish committed offsets to a designated topic in Kafka, whereby a microservice could listen for and process them shortly after the records were committed to Snowflake. Some background info in Stackoverflow can be found here:
https://stackoverflow.com/questions/59758541/writing-records-into-snowflake-from-kafka/59808570#59808570

The wrapped technique suggested in Stackoverflow can work but support directly within the connector would be more appropriate.

WorkerSincTask Error 5014

[kafkaconnect-9e4947f-2]2019-10-29T18:57:52.282205[kafka-connect][2019-10-29 18:57:52,281] ERROR WorkerSinkTask{id=SnowflakeSink-0} Commit of offsets threw an unexpected exception for sequence number 1: null (org.apache.kafka.connect.runtime.WorkerSinkTask:259)
[kafkaconnect-9e4947f-2]2019-10-29T18:57:52.282205[kafka-connect]com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException:
[kafkaconnect-9e4947f-2]2019-10-29T18:57:52.282205[kafka-connect][SF_KAFKA_CONNECTOR] Exception: Failed to put records
[kafkaconnect-9e4947f-2]2019-10-29T18:57:52.282205[kafka-connect][SF_KAFKA_CONNECTOR] Error Code: 5014
[kafkaconnect-9e4947f-2]2019-10-29T18:57:52.282205[kafka-connect][SF_KAFKA_CONNECTOR] Detail: SinkTask hasn't been initialized before calling PUT function```

Can anyone help me identify the root cause of this error?

Commit offset error

I'm running into an issue when trying to initialize the connector to read from a Kafka topic (running on 2.3.0). This is Stack trace below:

[SF_KAFKA_CONNECTOR] Creating new table OS_KAFKA_TEST. (com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1) [task-thread-KAFKA_SF_TEST-0]
2019-10-16 18:05:14,984 WARN WorkerSinkTask{id=KAFKA_SF_TEST-0} Offset commit failed during close (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-KAFKA_SF_TEST-0]
2019-10-16 18:05:14,984 ERROR WorkerSinkTask{id=KAFKA_SF_TEST-0} Commit of offsets threw an unexpected exception for sequence number 1: null (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-KAFKA_SF_TEST-0]
java.lang.NullPointerException
	at com.snowflake.kafka.connector.SnowflakeSinkTask.lambda$preCommit$1(SnowflakeSinkTask.java:186)
	at java.util.HashMap.forEach(HashMap.java:1289)
	at com.snowflake.kafka.connector.SnowflakeSinkTask.preCommit(SnowflakeSinkTask.java:183)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:378)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:590)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
2019-10-16 18:05:14,984 ERROR WorkerSinkTask{id=KAFKA_SF_TEST-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-KAFKA_SF_TEST-0]
java.lang.NullPointerException
	at com.snowflake.kafka.connector.SnowflakeSinkTask.close(SnowflakeSinkTask.java:153)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:396)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:590)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
2019-10-16 18:05:14,984 ERROR WorkerSinkTask{id=KAFKA_SF_TEST-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-KAFKA_SF_TEST-0]

Task crashes when using Avro message format

Hi, I am using the sink connector version 0.5.4 and everything is working as expected if I use JSON as the message format but when I switch to Avro, then task crashes.

Here is the Debezium source configuration;

{
  "name": "debezium_test_source",
  "config": {
    "tasks.max": "1",
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.delete.handling.mode": "rewrite",
    "slot.name": "test_slot",
    "plugin.name": "wal2json",
    "database.server.name": "test_database",
    "database.dbname": "test_database",
    "database.hostname": "db-host-name",
    "database.port": "5432",
    "database.user": "db-user",
    "database.password": "db-password",
    "schema.whitelist": "public",
    "table.whitelist": "public.test_table"
  }
}

and here is the snowflake sink configuration;

{
  "name": "snowflake_test_sink",
  "config": {
    "tasks.max": "1",
    "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
    "key.converter": "com.snowflake.kafka.connector.records.SnowflakeAvroConverter",
    "value.converter": "com.snowflake.kafka.connector.records.SnowflakeAvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "topics": "test_database.public.test_table",
    "snowflake.url.name": "....",
    "snowflake.user.name": "....",
    "snowflake.private.key": "....",
    "snowflake.private.key.passphrase": "....",
    "snowflake.database.name": "....",
    "snowflake.schema.name": "...."
  }
}

and finally here is the log;

[SF_KAFKA_CONNECTOR] SnowflakeSinkTask:close (com.snowflake.kafka.connector.SnowflakeSinkTask)
[2019-11-06 22:53:24,636] ERROR WorkerSinkTask{id=snowflake_test_sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
java.lang.NullPointerException
 at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.stopCleaner(SnowflakeSinkServiceV1.java:266)
 at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.close(SnowflakeSinkServiceV1.java:649)
 at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.access$1500(SnowflakeSinkServiceV1.java:172)
 at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1.lambda$close$0(SnowflakeSinkServiceV1.java:91)
 at java.util.HashMap.forEach(HashMap.java:1289)
 at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1.close(SnowflakeSinkServiceV1.java:90)
 at com.snowflake.kafka.connector.SnowflakeSinkTask.close(SnowflakeSinkTask.java:183)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:396)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:590)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
 at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
 at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
[2019-11-06 22:53:24,638] ERROR WorkerSinkTask{id=snowflake_test_sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
[2019-11-06 22:53:24,639] INFO
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask:stop (com.snowflake.kafka.connector.SnowflakeSinkTask)

Jar 1.0.2 Class not found exception BouncyCastleFipsProvider

Hi,

Getting below exception when using an encrypted key. Added bc-fips and changed like below to add the required classes.

ERROR:

"Caused by: java.lang.ClassNotFoundException: org.bouncycastle.jcajce.provider.BouncyCastleFipsProvider"

Added bc-fips and changed like below to add the required classes.

CHANGES IN POM:

<dependency>
            <groupId>org.bouncycastle</groupId>
            <artifactId>bcpkix-fips</artifactId>
            <version>1.0.3</version>
            <!--scope>provided</scope-->
        </dependency>
        <dependency>
            <groupId>org.bouncycastle</groupId>
            <artifactId>bc-fips</artifactId>
            <version>1.0.2</version>
            <!--scope>provided</scope-->
        </dependency>

ERROR AFTER ADDING bc-fips:

2020-03-13 17:17:12,296 ERROR WorkerConnector{id=snowflake_sink_messages} Error while starting connector (org.apache.kafka.connect.runtime.WorkerConnector) [pool-10-thread-1
]
org.bouncycastle.crypto.fips.FipsOperationError: org.bouncycastle.crypto.fips.FipsOperationError: Module checksum failed: expected [d68017e6f45512a51a862c77e4fa2348d0d24d816bc3f76fcf30375052216e86] 
got [f4fd53410d9b8c9c253ec0f349c26f6b19c0c7befabd5d09be48cc857661fc74]

Connector fails to commit offsets if stage has files older than 1 hour

On the 24th of March, connector failed due to Snowflake incident and it was closed more than 1 hours;

WorkerSinkTask{id=dwh_toyblast_connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted.
[org.apache.kafka.connect.runtime.WorkerSinkTask]
com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException: 
[SF_KAFKA_CONNECTOR] Exception: Failed to upload file
[SF_KAFKA_CONNECTOR] Error Code: 2003
[SF_KAFKA_CONNECTOR] Detail: Failed to upload file to Snowflake Stage though JDBC
[SF_KAFKA_CONNECTOR] Message: Internal error: Processing aborted due to error 370001:4037195285; incident 8554447.
[SF_KAFKA_CONNECTOR] net.snowflake.client.jdbc.SnowflakeUtil.checkErrorAndThrowExceptionSub(SnowflakeUtil.java:152)
[SF_KAFKA_CONNECTOR] net.snowflake.client.jdbc.SnowflakeUtil.checkErrorAndThrowException(SnowflakeUtil.java:77)
[SF_KAFKA_CONNECTOR] net.snowflake.client.core.StmtUtil.pollForOutput(StmtUtil.java:495)
[SF_KAFKA_CONNECTOR] net.snowflake.client.core.StmtUtil.execute(StmtUtil.java:372)
[SF_KAFKA_CONNECTOR] net.snowflake.client.core.SFStatement.executeHelper(SFStatement.java:575)
[SF_KAFKA_CONNECTOR] net.snowflake.client.jdbc.SnowflakeFileTransferAgent.parseCommandInGS(SnowflakeFileTransferAgent.java:1391)
[SF_KAFKA_CONNECTOR] net.snowflake.client.jdbc.SnowflakeFileTransferAgent.parseCommand(SnowflakeFileTransferAgent.java:1029)
[SF_KAFKA_CONNECTOR] net.snowflake.client.jdbc.SnowflakeFileTransferAgent.<init>(SnowflakeFileTransferAgent.java:1003)
[SF_KAFKA_CONNECTOR] net.snowflake.client.jdbc.SnowflakeConnectionV1.uploadStreamInternal(SnowflakeConnectionV1.java:1122)
[SF_KAFKA_CONNECTOR] net.snowflake.client.jdbc.SnowflakeConnectionV1.compressAndUploadStream(SnowflakeConnectionV1.java:1046)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceV1.put(SnowflakeConnectionServiceV1.java:603)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.flush(SnowflakeSinkServiceV1.java:453)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.insert(SnowflakeSinkServiceV1.java:423)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.access$100(SnowflakeSinkServiceV1.java:232)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1.insert(SnowflakeSinkServiceV1.java:88)
[SF_KAFKA_CONNECTOR] java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.SnowflakeSinkTask.put(SnowflakeSinkTask.java:203)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
[SF_KAFKA_CONNECTOR] java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
[SF_KAFKA_CONNECTOR] java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[SF_KAFKA_CONNECTOR] java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[SF_KAFKA_CONNECTOR] java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[SF_KAFKA_CONNECTOR] java.base/java.lang.Thread.run(Thread.java:834)
	at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:367)
	at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:343)
	at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:330)
	at com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceV1.put(SnowflakeConnectionServiceV1.java:607)
	at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.flush(SnowflakeSinkServiceV1.java:453)
	at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.insert(SnowflakeSinkServiceV1.java:423)
	at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.access$100(SnowflakeSinkServiceV1.java:232)
	at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1.insert(SnowflakeSinkServiceV1.java:88)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
	at com.snowflake.kafka.connector.SnowflakeSinkTask.put(SnowflakeSinkTask.java:203)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

After we restarted it, we noticed for one of the topics consumer lag was increasing in linear scale.
consumerlag

Checking Kafka Connect logs, it was commiting same offset every interval (which offset was not present due to log retention);

INFO   ||  WorkerSinkTask{id=dwh_toyblast_connector-0} Committing offsets asynchronously using sequence number 42145: {toyblast.toyblast_prod.users-0=OffsetAndMetadata{offset=3013837293, leaderEpoch=null, metadata=''}, toyblast.toyblast_prod.android_payments-0=OffsetAndMetadata{offset=2243967, leaderEpoch=null, metadata=''}, toyblast.toyblast_prod.ios_payments-0=OffsetAndMetadata{offset=4598722, leaderEpoch=null, metadata=''}}  

Even though we were able to see recent data in stage tables, their offsets were not comitted. After restarting connector, we noticed recover step was re-insgesting all the files in the stage.

Stage for the problematic topic had more than 22000 files when we noticed the problem, starting from 24th of March and it kept on increasing.

After further debugging, we found that step that marks files older than 1 hour as failed before updating offsets was getting stuck.

tmpFileNames.forEach(
name ->
{
long time = FileNameUtils.fileNameToTimeIngested(name);
if (time < currentTime - ONE_HOUR)
{
failedFiles.add(name);
tmpFileNames.remove(name);
}
else if (time < currentTime - TEN_MINUTES)
{
oldFiles.add(name);
}
}
);

In this block tmpFileNames list getting mutated during iteration. After first hit to following lines was making it block and never reach to updateOffset() step.

Missed messages on restarting connect job

  • Connector version : 1.0.0
  • Kafka Connect confluent-5.3.0-2.12

Scenario:

  • Kafka connect running in K8s in standalone mode
  • One topic -> one consumer group
  • Kubernetes CronJob (restarts every day - once)
  • buffer.count.records=400000
  • buffer.flush.time=1800

While restarting, sometimes the consumer group rebalances and we miss the last 1800(buffer.flush.time) seconds data.

Message:

[2020-02-17 05:30:34,146] INFO [Consumer clientId=connector-consumer-snowflake_gis_stats_v5-1, groupId=connect-snowflake_gis_stats_v5] Attempt to heartbeat failed since group is rebalancing (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:936)

Whenever we encounter this message while restarting, we miss the messages collected in the last 1800(buffer.flush.time) seconds.

Creating Token... messages printed are very high frequency in Connect log

about every 30min Connect log gets flooded with following messages from Snowflake connector which are printed at very high rate within one second.
All the messages in the same 1sec burst show same token / subject / issuer. Perhaps only one can be printed as they all seem to show same info.

Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11423)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11424)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11425)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11426)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11427)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11428)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11429)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11430)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11431)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11434)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11433)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11435)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11436)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11437)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11432)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11438)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11440)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11439)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11441)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11442)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11443)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11444)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11445)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11446)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11447)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11448)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11449)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11450)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...

Error Code: 2001 Failed to Prepare SQL Statement

I'm encountering an error when trying to sinc:

[SF_KAFKA_CONNECTOR] Exception: Failed to prepare SQL statement
[SF_KAFKA_CONNECTOR] Error Code: 2001
[SF_KAFKA_CONNECTOR] Detail: SQL Exception, reported by Snowflake JDBC
[SF_KAFKA_CONNECTOR] Message: Cannot perform CREATE TABLE. This session does not have a current database. Call 'USE DATABASE', or use a qualified name.

My configuration is as follows:

{
    "name": "SnowflakeSink",
    "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "com.snowflake.kafka.connector.records.SnowflakeAvroConverter",
    "value.converter.schema.registry.url": "SCHEMA_URL,
    "value.converter.basic.auth.credentials.source": "USER_INFO",
    "value.converter.basic.auth.user.info": "SHEMA_USER:SCHEMA_PASS",
    "topics.regex": ".*AVRO.*",
    "errors.retry.timeout": "1000",
    "errors.retry.delay.max.ms": "3000",
    "errors.log.enable": "true",
    "errors.log.include.messages": "true",
    "snowflake.url.name": "XXX.snowflakecomputing.com",
    "snowflake.user.name": "YYY",
    "snowflake.private.key": "ZZZ",
    "snowflake.database.name": "KAFKA_EVENTS",
    "snowflake.schema.name": "PUBLIC",
    "buffer.count.records": "50000",
    "buffer.size.bytes": "20000000",
    "buffer.flush.time": "180"
}

I've verified that the database KAFKA_EVENTS has a schema named PUBLIC. Any help here?

Allow ingest on an interval in addition to current methods

To ensure data on low-throughput topics gets written to Snowflake in a timely manner, it would be great if the connector could be configured to ingest messages into Snowflake every N seconds in addition to the current approaches of record count and total record size (in bytes).

Snowflake Connector log statements are in a different format than the configured Kafka Connect log format

Example of Snowflake Connector log statement:

[SecurityManager-1(2981)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with subject OPTORO.STAGING_KAFKA_CONNECT

Example of Kafka Connect log statement:

{"timestamp":"2020-01-02 11:12:33,806","level":"INFO","logger":"org.apache.kafka.connect.runtime.rest.RestServer","thread":"qtp1871312485-27317","message":"127.0.0.1 - - [02/Jan/2020:11:12:33 +0000] \"GET /connectors HTTP/1.1\" 200 255  1"}

This prevents us from ingesting the Connector's log statements into our logging pipeline.

Worker task remain after connector removal

[2020-03-23 00:08:27,731] INFO Successfully processed removal of connector 'snow_sink' (org.apache.kafka.connect.storage.KafkaConfigBackingStore) [2020-03-23 00:08:27,731] INFO [Worker clientId=connect-1, groupId=connect-cluster] Connector snow_sink config removed (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [2020-03-23 00:08:27,731] INFO [Worker clientId=connect-1, groupId=connect-cluster] Handling connector-only config update by stopping connector snow_sink (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [2020-03-23 00:08:27,731] INFO Stopping connector snow_sink (org.apache.kafka.connect.runtime.Worker) [SF_KAFKA_CONNECTOR] SnowflakeSinkConnector:stop (com.snowflake.kafka.connector.SnowflakeSinkConnector) [2020-03-23 00:08:27,732] INFO Stopped connector snow_sink (org.apache.kafka.connect.runtime.Worker) [2020-03-23 00:08:28,474] INFO [Worker clientId=connect-1, groupId=connect-cluster] Joined group at generation 216 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-1-7e5cc2d8-a54a-4b0a-95a2-b245b10fce66', leaderUrl='http://ec2-34-213-4-7.us-west-2.compute.amazonaws.com:8083/', offset=304, connectorIds=[], taskIds=[], revokedConnectorIds=[snow_sink], revokedTaskIds=[snow_sink-0, snow_sink-1, snow_sink-3], delay=5179} with rebalance delay: 5179 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask) [SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:3]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask) [SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:1]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask) [SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask) [SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:3]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask)

Deleting the connector doesn't seem to revoke the tasks correctly. Please see log above.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.