snowflakedb / snowflake-kafka-connector Goto Github PK
View Code? Open in Web Editor NEWSnowflake Kafka Connector (Sink Connector)
License: Apache License 2.0
Snowflake Kafka Connector (Sink Connector)
License: Apache License 2.0
We use a couple Kafka record headers, and we'd like them to show up in Snowflake. It would be great if the connector could write any message headers (https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers) present on a Kafka message to the record_metadata
field in the target table.
I am using the Snowflake sink connector with Avro serialization. When registering the connector, the user is successfully connecting to Snowflake however I am then hitting the following error at the bottom of this log:
2020-01-08 16:29:38,614 INFO ||
[SF_KAFKA_CONNECTOR] pipe: SNOWFLAKE_KAFKA_CONNECTOR_snowflake_test_PIPE_customers_0 - service started [com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1]
2020-01-08 16:29:38,616 INFO ||
[SF_KAFKA_CONNECTOR] create new task in com.snowflake.kafka.connector.internal.SnowflakeSinkService - table: customers, topic: foo_debezium.debezium.customers, partition: 0 [com.snowflake.kafka.connector.internal.SnowflakeSinkServiceFactory$SnowflakeSinkServiceBuilder]
2020-01-08 16:29:38,618 INFO ||
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.SnowflakeSinkService created [com.snowflake.kafka.connector.internal.SnowflakeSinkServiceFactory$SnowflakeSinkServiceBuilder]
2020-01-08 16:29:38,881 INFO ||
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask:close [com.snowflake.kafka.connector.SnowflakeSinkTask]
2020-01-08 16:29:38,883 INFO ||
[SF_KAFKA_CONNECTOR] pipe SNOWFLAKE_KAFKA_CONNECTOR_snowflake_test_PIPE_customers_0: cleaner terminated [com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1]
2020-01-08 16:29:38,884 INFO ||
[SF_KAFKA_CONNECTOR] pipe SNOWFLAKE_KAFKA_CONNECTOR_snowflake_test_PIPE_customers_0: flusher terminated [com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1]
2020-01-08 16:29:38,885 INFO ||
[SF_KAFKA_CONNECTOR] IngestService Closed [com.snowflake.kafka.connector.internal.SnowflakeIngestionServiceV1]
2020-01-08 16:29:38,971 INFO ||
[SF_KAFKA_CONNECTOR] pipe SNOWFLAKE_KAFKA_CONNECTOR_snowflake_test_PIPE_customers_0: service closed [com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1]
2020-01-08 16:29:38,972 ERROR || WorkerSinkTask{id=snowflake_test-0} Task threw an uncaught and unrecoverable exception [org.apache.kafka.connect.runtime.WorkerTask]
java.lang.NoSuchMethodError: 'org.codehaus.jackson.JsonNode org.apache.avro.Schema$Field.defaultValue()'
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1652)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1511)
at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1220)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:100)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:86)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:485)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:485)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:465)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
2020-01-08 16:29:38,973 ERROR || WorkerSinkTask{id=snowflake_test-0} Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask]
Here is my config file:
{
"name": "snowflake_test",
"config": {
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"topics": "foo_debezium.debezium.customers",
"snowflake.topic2table.map": "foo_debezium.debezium.customers:customers",
"snowflake.url.name": "https://foo.snowflakecomputing.com:443",
"snowflake.user.name": "debezium",
"snowflake.private.key": "....",
"snowflake.private.key.passphrase": "....",
"snowflake.database.name": "debezium_dev",
"snowflake.schema.name": "debezium",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "com.snowflake.kafka.connector.records.SnowflakeAvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}
Some more details about how to reproduce:
I am running Debezium Postgres Source -> Snowflake Sink using docker. Here is my docker-compose:
version: '2'
services:
zookeeper:
image: debezium/zookeeper:${DEBEZIUM_VERSION}
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: debezium/kafka:${DEBEZIUM_VERSION}
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
schema-registry:
image: confluentinc/cp-schema-registry
ports:
- 8181:8181
- 8081:8081
environment:
- SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
- SCHEMA_REGISTRY_HOST_NAME=schema-registry
- SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081
links:
- zookeeper
connect:
image: debezium/connect:${DEBEZIUM_VERSION}
ports:
- 8083:8083
links:
- kafka
- schema-registry
volumes:
- $PWD/connect-plugins:/kafka/connect
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
My $PWD/connect-plugin directory:
$ ls -1 connect-plugins/
debezium-connectors
snowflakeinc-snowflake-kafka-connector-1.0.0
$ ls -1 connect-plugins/debezium-connectors/
debezium-connector-postgres-1.0.0.Final.jar
debezium-core-1.0.0.Final.jar
postgresql-42.2.9.jar
protobuf-java-3.8.0.jar
$ ls -1 connect-plugins/snowflakeinc-snowflake-kafka-connector-1.0.0/
assets
doc
lib
manifest.json
Reproduce:
# Containers up
$ docker-compose -f my-docker-compose.yaml up
# Register Snowflake Connector
$ curl -X POST -H "Content-Type: application/json" --data @register-sf.json http://localhost:8083/connectors
# Everything works fine here, but there is still no Avro data in Kafka
# Now add the Debezium Postgres connector - this starts populating data in Kafka in Avro format
$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres-on-prem-avro.json
# There is schema registry data
$ curl -X GET http://localhost:8081/subjects/
["__debezium-heartbeat.foo_debezium-value","foo_debezium.debezium.customers-value","__debezium-heartbeat.foo_debezium-key","foo_debezium.debezium.dummy_data-key","foo_debezium.debezium.customers-key","foo_debezium.debezium.dummy_data-value"]
Any insight much appreciated!
c.f. this issue #61
Why is the connector enforcing that its name (an internal Kafka Connect identifier) being a valid Snowflake identifier? If the connector is using the connector name for something on the Snowflake side then maybe it should handle the appropriate transformation to sanitise it as required itself.
As per the 0.4.0 release, the connector can handle invalid message types (not sure if it can handle nulls still), but I don't know where these invalid messages are being sent. What does 'table stage' mean as mentioned in the release?
Invalid messages aren't showing up in the internal stage when I send them and I don't see any additional tables/stages/topics being created in snowflake to handle the invalid messages.
As far as I can tell, if a Snowflake Connector configured to use the Snowflake Avro Converter encounters a message it can't decode, nothing is written to logs, and it skips the message with no external indication. This is extremely frustrating for debugging.
If we could even have just a message that said something like "the message at topic/partition/offset was discarded because it couldn't be decoded" (no need to include the message contents, since it might be huge, and the topic/partition/offset combo lets me investigate it if I want to) would be tremendously helpful.
Install connector plugin
confluent-hub install --no-prompt snowflakeinc/snowflake-kafka-connector:0.5.1
Create connector
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/test/config \
-d '{
"connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
"tasks.max":1,
"topics":"mysql-01-asgard.demo.transactions",
"snowflake.topic2table.map": "mysql-01-asgard.demo.transactions:TRANSACTIONS",
"snowflake.url.name":"asdf.us-east-1.snowflakecomputing.com",
"snowflake.user.name":"kafka",
"snowflake.user.role":"SYSADMIN",
"snowflake.private.key":"xxx",
"snowflake.database.name":"DEMO_DB",
"snowflake.schema.name":"PUBLIC",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"com.snowflake.kafka.connector.records.SnowflakeAvroConverter",
"value.converter.schema.registry.url":"https://${CCLOUD_SCHEMA_REGISTRY_URL}",
"value.converter.basic.auth.credentials.source":"USER_INFO", "value.converter.basic.auth.user.info":"${CCLOUD_SCHEMA_REGISTRY_API_KEY}:${CCLOUD_SCHEMA_REGISTRY_API_SECRET}",
"offset.flush.interval.ms":"60000",
"offset.flush.timeout.ms":"10000",
"buffer.count.records":"100",
"buffer.size.bytes":"65536"
}'
Fails:
[2019-10-17 08:05:39,689] INFO [test|worker] Creating connector test of type com.snowflake.kafka.connector.SnowflakeSinkConnector (org.apache.kafka.connect.runtime.Worker:246)
[2019-10-17 08:05:39,696] INFO [test|worker] Instantiated connector test with version 0.5.1 of type class com.snowflake.kafka.connector.SnowflakeSinkConnector (org.apache.kafka.connect.runtime.Worker:249)
[2019-10-17 08:05:39,700] INFO [test|worker]
[SF_KAFKA_CONNECTOR] Snowflake Kafka Connector Version: 0.5.1 (com.snowflake.kafka.connector.Utils:80)
[2019-10-17 08:05:39,759] INFO [test|worker]
[SF_KAFKA_CONNECTOR] SnowflakeSinkConnector:start (com.snowflake.kafka.connector.SnowflakeSinkConnector:87)
[2019-10-17 08:05:39,759] INFO [test|worker]
[SF_KAFKA_CONNECTOR] buffer.flush.time set to default 30 seconds (com.snowflake.kafka.connector.SnowflakeSinkConnector:126)
[2019-10-17 08:05:39,764] ERROR [test|worker] WorkerConnector{id=test} Error while starting connector (org.apache.kafka.connect.runtime.WorkerConnector:119)
java.lang.NoClassDefFoundError: org/bouncycastle/jce/provider/BouncyCastleProvider
at com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceFactory$SnowflakeConnectionServiceBuilder.setProperties(SnowflakeConnectionServiceFactory.java:40)
at com.snowflake.kafka.connector.SnowflakeSinkConnector.start(SnowflakeSinkConnector.java:137)
at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111)
at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136)
at org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:196)
at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:252)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1079)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:117)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1095)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1091)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.bouncycastle.jce.provider.BouncyCastleProvider
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 14 more
Below is the configuration
name=SnowflakeConnector
connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
snowflake.topic2table.map=devdb.engines:engines
tasks.max=1
topics=devdb.engines
snowflake.url.name=***
snowflake.database.name=mydb
snowflake.schema.name=public
buffer.count.records=10000
snowflake.user.name=***
snowflake.private.key=***
snowflake.user.role=***
value.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter
key.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter
buffer.size.bytes=5242880
The connector above runs normally without an issue but no data ever appears in the snowflakedb
If I copy the data from the above topic to a new topic called devdb_engines and update the connector's configuration with the above topicname, then data are being written normally in the snowflakedb
We are using SnowflakeAvroConverter with defined schema registry. The connector is decoding these values to unknown string formats when storing data in Snowflake. We are using 1.0.0 (in prod) but the latest on local seems to give the same error. We tried available sql functions within snowflake to decode these values but no use. The data is in an unusable state.
For e.g.
"0.2700"
in Kafka stored in Snowflake DB as "\n๏ฟฝ"
"0.3500"
in Kafka stored in Snowflake DB as "\rยฌ"
Schema registry definition:
{
"name": "percent_field",
"type": [
"null",
{
"type": "bytes",
"scale": 4,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "4"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
},
Hitting following exception with snowflake-kafka-connector-1.2.3.jar
Private key generated following the steps mentioned here https://docs.snowflake.com/en/user-guide/kafka-connector-install.html#using-key-pair-authentication
On further analysis of the code, I can see we are appending Begin/End Encrypted Private Key which will not work with Base64.decode
Just to do a small test I had written a small Java code to decode the key and it fails initially. Passes when I comment out append statement which adds Begin and End encryption key text
The code is at the bottom of the page.
"trace": "com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException: \n[SF_KAFKA_CONNECTOR] Exception: Invalid encrypted private key or passphrase\n[SF_KAFKA_CONNECTOR] Error Code: 0018\n[SF_KAFKA_CONNECTOR] Detail: failed to decrypt private key. Please verify input private key and passphrase. Snowflake Kafka Connector only supports encryption algorithms in FIPS 140-2\n[SF_KAFKA_CONNECTOR] Message: unable to decode base64 string: invalid characters encountered in base64 data\n[SF_KAFKA_CONNECTOR] org.bouncycastle.util.encoders.Base64.decode(Unknown Source)\n[SF_KAFKA_CONNECTOR] org.bouncycastle.util.io.pem.PemReader.loadObject(Unknown Source)\n[SF_KAFKA_CONNECTOR] org.bouncycastle.util.io.pem.PemReader.readPemObject(Unknown Source)\n[SF_KAFKA_CONNECTOR] org.bouncycastle.openssl.PEMParser.readObject(PEMParser.java:92)\n[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.EncryptionUtils.parseEncryptedPrivateKey(EncryptionUtils.java:40)\n[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.InternalUtils.createProperties(InternalUtils.java:162)\n[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceFactory$SnowflakeConnectionServiceBuilder.setProperties(SnowflakeConnectionServiceFactory.java:40)\n[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.SnowflakeSinkConnector.start(SnowflakeSinkConnector.java:140)\n[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111)\n[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136)\n[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:196)\n[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:252)\n[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1079)\n[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:117)\n[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1095)\n[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1091)\n[SF_KAFKA_CONNECTOR] java.util.concurrent.FutureTask.run(FutureTask.java:266)\n[SF_KAFKA_CONNECTOR] java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n[SF_KAFKA_CONNECTOR] java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n[SF_KAFKA_CONNECTOR] java.lang.Thread.run(Thread.java:748)\n\tat com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:367)\n\tat com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:343)\n\tat com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:330)\n\tat com.snowflake.kafka.connector.internal.EncryptionUtils.parseEncryptedPrivateKey(EncryptionUtils.java:51)\n\tat com.snowflake.kafka.connector.internal.InternalUtils.createProperties(InternalUtils.java:162)\n\tat com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceFactory$SnowflakeConnectionServiceBuilder.setProperties(SnowflakeConnectionServiceFactory.java:40)\n\tat com.snowflake.kafka.connector.SnowflakeSinkConnector.start(SnowflakeSinkConnector.java:140)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:196)\n\tat org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:252)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1079)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:117)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1095)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1091)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\n"
},
import org.bouncycastle.util.encoders.Base64;
public class Main {
public static void main(String[] args) {
String userData;
try {
String key = "-----BEGIN ENCRYPTED PRIVATE KEY-----\n" +
"MIIFLTBXBgkqhkiG9w0BBQ0wSjApBgkqhkiG9w0BBQwwHAQIPR0Nozv8sKECAggA\n" +
"MAwGCCqGSIb3DQIJBQAwHQYJYIZIAWUDBAEqBBAY+DZPwO+29JoC8l7NdMtVBIIE\n" +
"0PDqdGdVo2XnKfaF3PI+CVjY0pXMHgCPrcKNmDpnjvan5nmiWLWO+sNRDpywsNwD\n" +
"qifKnCXD6lPl+IkRDNl0yw6Yk6rkva8FZOY5MlwcTnkuBvvpxegKAcW1vAcJ28An\n" +
"H3Nl+RbS4NZ4bc+omuvqNuiid6b51Rx3FP9cC5OazVfAt/ocWmUraIxp1siPtdJb\n" +
"/6H7k2e5fj4Q57EdDh8SCr5stmdcv04F78K/xiwtW3DHD46d3uNNVS7LVclB+XcR\n" +
"6nntthXa9RggRPSBTzCDMVacnxOeoryRs6aY70pmdMo9O8VeGf1yZUJD8ZRfR995\n" +
"JkZYsqro3YF8MW9EJaRAInUfoUcLli3U7yYsLxfCmk6wFvg6QPsBU9jc7NWlimyQ\n" +
"ZKhgrWqdTV2nXPberR1TPp9tHJu+REHtJvU1eqLsYPfBJCzBgVeZTXePGXhYZTJG\n" +
"Q/A5/yx4bwOATmeAolpNs5fFk45lqblzFYScYNDrq0qZ9bGGE0q9/R4gjf0RL4wb\n" +
"Jbqf1Kplv/GZMC/Tlj7SaZpoxyKLLzJGOxbE8HjIc+idYQuVnlwfINnLMcaDCube\n" +
"Xctyi5ot0lHao6YRb25v4BeBDyG81gJ7IUKULXmn13ZBH6LvKKsa2KwCQj8mtJ9o\n" +
"ZQ6I6DmxkQFWSOvUNihWq0+Lc/VGyrY78LZWVVKKaEc9VlLy/VcK7oijIuKZROLH\n" +
"L0BsJWF/eh9xWhRxSa+GpdLZL5bbG7UKu5VdYRjA54+tL0jJ+oZMhDQO1Da/F0h2\n" +
"u8dmK2JFu4MmH3TPDHZihr1bg3nDmda7wcigW4d+CnY3xfoqSPRMK/eyaIlKC+vG\n" +
"T9qchSRCY4YTd+xD/hgZApQSi756ro559FddFVdnF9xet4PpEaZxftswCq2wBBwu\n" +
"r93ql3HvqGqkMVz6llfb/lOvYD3+84rM/zzdqB+85EEnvFDNcZeNlhGsfvjNn8IP\n" +
"fxr89NJOQFreHJHw3yKRPyGkbQd+eVzZGQv1RgMJbfXD3tslvZwgpQ2lrP7EvHmn\n" +
"pNFdK4PZIxXV0j8eiNIHf9Uw2apcbSul/e5IM04p7nlaKNQfD3wLTM37zbe0tU2D\n" +
"GxZ3b5G4tABXdq2q699vcFnObqANeG5zC10xohFYuExjttplqFdNTuS1ffhumNiA\n" +
"582Dd2/CXwTlORacZ7T3mE6CqujZ/iXM93zjmKUIp/RiR+G5nV44uXR2VxtXEWc+\n" +
"E6bG8i+X+ogjB645Mw1OxYbgb3d8eyQby+JIgHN3hRUvQJsRe/Q+BRf0cG+wTMAf\n" +
"m+DKQxooe9ABkypET2rtlwMGb4/ffOS1etzP8hnG7vh+f2ElqG+iQIEffwHguUj0\n" +
"QnQFWWkRVfsVVOocIuar74I+tZ5JpPPedMrskSBeNwZXp0EM+drNUJb0PlT9X9Bz\n" +
"xK6Z9GZ9MjR6gpPznhsG8uyrby2Y19H/YfqVF6PfqqXMJSD1Ya3Q4lg1GM4ZqYBO\n" +
"o70B4QZRFtQSm4gHEnLkkK6agcn7ULQP5JZDP+3oz71qz44oFYcE6X6Vanf21TCD\n" +
"FeGDhA6ZM9Q6GYnhwb6cWtPn6bsP+emCrXmO6LuT66wEjxi7c/yxjIbL8uvDzFGw\n" +
"cNMoLbqHuJJj0igJfekMjkcVfY/kP+4X/wVUnKIxdMf3\n" +
"-----END ENCRYPTED PRIVATE KEY-----";
key = key.replaceAll("-+[A-Za-z ]+-+", "");
key = key.replaceAll("\\s", "");
StringBuilder builder = new StringBuilder();
//Comment this line for success
builder.append("-----BEGIN ENCRYPTED PRIVATE KEY-----");
for (int i = 0; i < key.length(); i++)
{
if (i % 64 == 0)
{
builder.append("\n");
}
builder.append(key.charAt(i));
}
//Comment this line for success
builder.append("\n-----END ENCRYPTED PRIVATE KEY-----");
key = builder.toString();
userData = new String( Base64.decode( key ) );
System.out.println(userData);
} catch ( Exception e ) {
e.printStackTrace();
}
}
}
I've tried to use the snowflake kafka connector and it works great with version 0.5.5 but the issue is that the data imported is not as compressed as the data imported via S3 (60B vs 144B per row calculated with a total data pool of 1.7TB and 20GB respectively) and the snowpipe credit usage is 30X what it is while using AWS snowpipe following this tutorial (https://docs.snowflake.net/manuals/user-guide/data-load-snowpipe-auto-s3.html)
Right now on average we're spending slightly over 1 credit per day to load the data via S3 through our old pipeline, but running the snowflake kafka connector for 8 hours has already cost us close to 10 credits. This is nearly a 30X increase as prorated the rest of the day we would be spending 30 credits.
Are there any optimizations that can be made to avoid this and make data transfer more efficient? It seems like the connector makes a stage on S3 so I'm a little confused by the vast increase in cost.
This is an exact dup of #77
Snowflake connector version 1.0.0
Kafka connect version 2.1.2
The error persists over connector restarts and triggers each time. I have not managed to successfully get a single message into Snowflake.
[2020-01-09 13:31:00,983] ERROR WorkerSinkTask{id=kafkaingest-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:585)
com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException:
[SF_KAFKA_CONNECTOR] Exception: Failed to put records
[SF_KAFKA_CONNECTOR] Error Code: 5014
[SF_KAFKA_CONNECTOR] Detail: SinkTask hasn't been initialized before calling PUT function
at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:362)
at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:321)
at com.snowflake.kafka.connector.SnowflakeSinkTask.getSink(SnowflakeSinkTask.java:94)
at com.snowflake.kafka.connector.SnowflakeSinkTask.put(SnowflakeSinkTask.java:195)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
[2020-01-09 13:31:00,990] ERROR WorkerSinkTask{id=kafkaingest-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException:
[SF_KAFKA_CONNECTOR] Exception: Failed to put records
[SF_KAFKA_CONNECTOR] Error Code: 5014
[SF_KAFKA_CONNECTOR] Detail: SinkTask hasn't been initialized before calling PUT function
at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:362)
at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:321)
at com.snowflake.kafka.connector.SnowflakeSinkTask.getSink(SnowflakeSinkTask.java:94)
at com.snowflake.kafka.connector.SnowflakeSinkTask.put(SnowflakeSinkTask.java:195)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
... 10 more
Hi. We are using the kafka-connector and the current release is based on the ingest-sdk library which had a bug fixed in its latest release. Correspondingly, we need a release of kafka-connector which uses this library.
Hey! Thanks for supporting kafka-connect platform.
Just wondering, is there any version that I can deploy to production?
After quick evaluation, v0.4.0 fails at some point with non-recoverable error (failed to drop stage
).
I've tried to use v0.5.0, but, got a NullPointerException
, was not able to solve problem.
We are using topic2table mapping, so, according to latest changes - it seems there was a bug introduced.
So, going back to original question: is there any version (maybe v0.5.1?) that can be considered stable to be used in production?
topic2table map is not being parsed correctly. In this call, the property name (i.e. snowflake.topic2table.map
), not its value, is being passed to Utils.parseTopicToTableMap()
.
Below is the call stack
[1] com.snowflake.kafka.connector.Utils.parseTopicToTableMap (Utils.java:408)
[2] com.snowflake.kafka.connector.SnowflakeSinkTask.start (SnowflakeSinkTask.java:85)
[3] org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart (WorkerSinkTask.java:300)
[4] org.apache.kafka.connect.runtime.WorkerSinkTask.execute (WorkerSinkTask.java:189)
[5] org.apache.kafka.connect.runtime.WorkerTask.doRun (WorkerTask.java:177)
[6] org.apache.kafka.connect.runtime.WorkerTask.run (WorkerTask.java:227)
[7] java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
[8] java.util.concurrent.FutureTask.run (FutureTask.java:266)
[9] java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1,142)
[10] java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:617)
[11] java.lang.Thread.run (Thread.java:745)
Note that an earlier call is being made correctly:
Snowflake Kafka Connector Version: 0.5.1
When running the connector I get the following error. It would be really useful if the connector could throw specific details about which configuration it is expecting / missing values / incorrect value. As it is the user just has to guess :)
[SF_KAFKA_CONNECTOR] Exception: Invalid input connector configuration
[SF_KAFKA_CONNECTOR] Error Code: 0001
[SF_KAFKA_CONNECTOR] Detail: input kafka connector configuration is null, missing required values, or wrong input value
at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:347)
at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:306)
at com.snowflake.kafka.connector.Utils.validateConfig(Utils.java:400)
at com.snowflake.kafka.connector.SnowflakeSinkConnector.start(SnowflakeSinkConnector.java:131)
at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111)
at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136)
at org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:196)
at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:252)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1079)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:117)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1095)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1091)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
The current docs for this connector state
Messages will neither be duplicated nor silently dropped. Messages will be delivered exactly once, or an error message will be generated. If an error is detected while loading a record (for example, the record was expected to be a well-formed JSON or Avro record, but wasnโt well-formed, then the record is not loaded; instead, an error message is returned.
I have several questions about this:
Later in the same section you call attention to
Instances of the Kafka connector do not communicate with each other. If you start multiple instances of the connector on the same topics or partitions, then multiple copies of the same row might be inserted into the table. This is not recommended; each topic should be processed by only one instance of the connector.
Hi,
We are facing multiple issues related to reliability of messages being sent to snowflake with the connector. Raising ticket for one such issue.
Problem:
[2020-02-17 17:10:31,751] WARN WorkerSinkTask{id=snowflake_gis_stats_v5-0} Ignoring invalid task provided offset gis-stats-v5-0/OffsetAndMetadata{offset=9223372036854775807, leaderEpoch=null, metadata=''} -- not yet consumed, taskOffset=9223372036854775807 currentOffset=8825666 (org.apache.kafka.connect.runtime.WorkerSinkTask:416)
This does not happen often but we encountered it yesterday. The taskOffset
looks weird.
After upgrading to 0.4.0, the connector seems to shut down improperly where it fails to drop the internal stage. When I shut down the connector, I get the following error message:
[2019-09-10 15:39:42,757] ERROR Graceful stop of task snowflake_sink-0 failed. (org.apache.kafka.connect.runtime.Worker)
[2019-09-10 15:39:43,314] ERROR [SF_KAFKA_CONNECTOR] Failed to drop empty internal stage: SNOWFLAKE_KAFKA_CONNECTOR_snowflake_sink_STAGE_Test4. It can safely be removed. Exception: java.lang.IllegalStateException: Shutdown in progress (com.snowflake.kafka.connector.SnowflakeSinkConnector)
When I check in Snowflake, the internal stage still appears to be there (just as the error message states). When I run the connector again with the internal stage still present, it actually deletes the stage but still throws the following error and crashes:
[2019-09-10 15:41:36,622] ERROR WorkerConnector{id=snowflake_sink} Error while starting connector (org.apache.kafka.connect.runtime.WorkerConnector)
com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException:
[SF_KAFKA_CONNECTOR] Exception: Connector stopped due to invalid configuration
[SF_KAFKA_CONNECTOR] Error Code: 5006
[SF_KAFKA_CONNECTOR] Detail: Exception while creating or validating table, stage, or pipe.
[SF_KAFKA_CONNECTOR] Message:
[SF_KAFKA_CONNECTOR] [SF_KAFKA_CONNECTOR] Exception: Incompatible stage
[SF_KAFKA_CONNECTOR] [SF_KAFKA_CONNECTOR] Error Code: 5004
[SF_KAFKA_CONNECTOR] [SF_KAFKA_CONNECTOR] Detail: Stage contains files that were not created by the Snowflake Kafka Connector. The stage needs a careful cleanup.
[SF_KAFKA_CONNECTOR] [SF_KAFKA_CONNECTOR] Message: Stage name: SNOWFLAKE_KAFKA_CONNECTOR_snowflake_sink_STAGE_Test4
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:347)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.SnowflakeSinkConnector.start(SnowflakeSinkConnector.java:217)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:196)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:231)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.standalone.StandaloneHerder.startConnector(StandaloneHerder.java:289)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:205)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:113)
at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:347)
at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:323)
at com.snowflake.kafka.connector.SnowflakeSinkConnector.start(SnowflakeSinkConnector.java:231)
at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111)
at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136)
at org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:196)
at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:231)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.startConnector(StandaloneHerder.java:289)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:205)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:113)
When I shut down the connector and restart it once more (after getting this error), it works and I can continue sending messages as usual. I'm curious as to why the internal stage isn't being properly dropped when I stop the connector.
Hi team,
Can you please help, I am using the standalone.sh and getting the error , Below are the config files.
[2019-07-24 07:47:41,918] ERROR Failed to create job for /Users/skumar/kafka/code/snowflake.properties (org.apache.kafka.connect.cli.ConnectStandalone:110)
[2019-07-24 07:47:41,918] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:121)
connect-standalone.sh config/connect-standalone.properties snowflake.properties
Here is the snowflake.properties file:
{
"name":"XYZCompanySensorData",
"Config":{
"connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
"tasks.max":"8",
"topics":"snowflake_test_topic",
"snowflake.topic2table.map": "snowflake_test_topic:snowflake_test_topic",
"buffer.count.records":"5",
"buffer.size.bytes":"2000",
"snowflake.url.name":"xxxxx.snowflakecomputing.com",
"snowflake.user.name":"xxxx",
"snowflake.private.key":"NDEJrJRQquVqg==", -- private key
"snowflake.database.name":"xxxxx",
"snowflake.schema.name":"xxxxx",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"com.snowflake.kafka.connector.records.SnowflakeJsonConverter"
}
}
------- connect-standalone.properties . --- file
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/Users/skumar/kafka/kafka_connect/connectors/snowflake
#,/Users/skumar/kafka/kafka_connect/connectors/kafka-connect-twitter-0.2.26/usr/share/kafka-connect/kafka-connect-twitter
Snowflake Kafka Connector fails sporadically with the error message below:
"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception."
[...]
[SF_KAFKA_CONNECTOR] Exception: Failed to ingest file
[SF_KAFKA_CONNECTOR] Error Code: 3001
For some reason, it sometimes helps to restart tasks / the whole connector but it starts failing again after a few hours or days.
We are using the connector to ingest AVRO formatted messages from several Kafka topics into Snowflake. The versions we are facing the issues with are 1.1.0 and even 1.2.3.
Our configuration looks as follows:
connector_config.txt
Log message from the Connect worker:
log_msg.txt
[2019-10-21 11:54:21,875] ERROR WorkerSinkTask{id=MY_TOPIC-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask)
java.lang.NullPointerException
at com.snowflake.kafka.connector.SnowflakeSinkTask.put(SnowflakeSinkTask.java:165)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Hitting following exception with snowflake-kafka-connector-1.0.0.jar
java.lang.NoSuchMethodError: org.bouncycastle.crypto.CryptoServicesRegistrar.isInApprovedOnlyMode
Confluent Docker:
git clone https://github.com/confluentinc/examples
cd examplesย git checkout 5.3.1-post ย ย โ Version number may be different
Snowflake Kafka Connector: snowflake-kafka-connector-1.0.0.jar
(base) Balbirs-MBP:cp-all-in-one balbirsinghmatharu$ docker exec 780e0268538d ls /usr/share/confluent-hub-components/
confluentinc-kafka-connect-datagen
confluentinc-kafka-connect-gcs
snowflake-kafka-connector-1.0.0.jar
Connector Config:
(base) Balbirs-MBP:cp-all-in-one balbirsinghmatharu$ cat ../../../kafka/snowflake_connector.config
{
"name":"snowflakeConnector",
"config": {
"connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
"tasks.max":"8",
"topics":"users",
"snowflake.url.name":"xyz.snowflakecomputing.com",
"snowflake.user.name":"balbir",
"snowflake.private.key":"/Users/balbirsinghmatharu/Desktop/DataDrive/phData/Snowflakes/rsa_key.p8",
"snowflake.private.key.passphrase":"XXXXXXXX",
"snowflake.database.name":"USER_balbir",
"snowflake.schema.name":"KAFKA",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"com.snowflake.kafka.connector.records.SnowflakeJsonConverter",
"value.converter.schema.registry.url":"http://localhost:8081"
}
}
Exception when verifying the connector status:
(base) Balbirs-MBP:cp-all-in-one balbirsinghmatharu$ curl http://localhost:8083/connectors/snowflakeConnector/status | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 2110 100 2110 0 0 294k 0 --:--:-- --:--:-- --:--:-- 294k
{
"name": "snowflakeConnector",
"connector": {
"state": "FAILED",
"worker_id": "connect:8083",
"trace": "java.lang.NoSuchMethodError: org.bouncycastle.crypto.CryptoServicesRegistrar.isInApprovedOnlyMode()Z\n\tat org.bouncycastle.jcajce.provider.ProvSecureHash$MD5.configure(Unknown Source)\n\tat org.bouncycastle.jcajce.provider.BouncyCastleFipsProvider.<init>(Unknown Source)\n\tat org.bouncycastle.jcajce.provider.BouncyCastleFipsProvider.<init>(Unknown Source)\n\tat org.bouncycastle.jcajce.provider.BouncyCastleFipsProvider.<init>(Unknown Source)\n\tat com.snowflake.kafka.connector.internal.EncryptionUtils.parseEncryptedPrivateKey(EncryptionUtils.java:35)\n\tat com.snowflake.kafka.connector.internal.InternalUtils.createProperties(InternalUtils.java:162)\n\tat com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceFactory$SnowflakeConnectionServiceBuilder.setProperties(SnowflakeConnectionServiceFactory.java:40)\n\tat com.snowflake.kafka.connector.SnowflakeSinkConnector.start(SnowflakeSinkConnector.java:137)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:196)\n\tat org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:260)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1183)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1400(DistributedHerder.java:125)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1199)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1195)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\n"
},
"tasks": [],
"type": "sink"
}
Don't mean to be pushy, especially since you've been so accommodating on the 2 feature requests I made, but I'd really like to use said features! Any chance you can release them as 0.4.0?
Our user is connected successfully using key-pair auth as expected. The Connector is getting to the point of creating the stage, the pipe, and the destination table, but is failing after the REST API call Created Insert Request
:
connect_1 | [SF_KAFKA_CONNECTOR] Creating new stage SNOWFLAKE_KAFKA_CONNECTOR_snowflake_test_STAGE_debezium_dev.debezium.customers. [com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1]
connect_1 | 2020-01-10 14:58:04,187 INFO ||
connect_1 | [SF_KAFKA_CONNECTOR] create stage SNOWFLAKE_KAFKA_CONNECTOR_snowflake_test_STAGE_debezium_dev.debezium.customers [com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceV1]
connect_1 | 2020-01-10 14:58:07,646 INFO ||
connect_1 | [SF_KAFKA_CONNECTOR] list stage SNOWFLAKE_KAFKA_CONNECTOR_snowflake_test_STAGE_debezium_dev.debezium.customers retrieved 0 file names [com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceV1]
connect_1 | 2020-01-10 14:58:07,650 INFO ||
connect_1 | [SF_KAFKA_CONNECTOR] pipe SNOWFLAKE_KAFKA_CONNECTOR_snowflake_test_PIPE_debezium_dev.debezium.customers_0, recovered from existing pipe [com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1]
connect_1 | 2020-01-10 14:58:07,658 INFO ||
connect_1 | [SF_KAFKA_CONNECTOR] pipe SNOWFLAKE_KAFKA_CONNECTOR_snowflake_test_PIPE_debezium_dev.debezium.customers_0: cleaner started [com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1]
connect_1 | 2020-01-10 14:58:07,664 INFO ||
connect_1 | [SF_KAFKA_CONNECTOR] pipe SNOWFLAKE_KAFKA_CONNECTOR_snowflake_test_PIPE_debezium_dev.debezium.customers_0: flusher started [com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1]
connect_1 | 2020-01-10 14:58:21,609 INFO || WorkerSinkTask{id=snowflake_test-0} Committing offsets asynchronously using sequence number 1: {foo_debezium.debezium.customers-0=OffsetAndMetadata{offset=12, leaderEpoch=null, metadata=''}} [org.apache.kafka.connect.runtime.WorkerSinkTask]
connect_1 | [pool-15-thread-1] INFO net.snowflake.ingest.SimpleIngestManager - Sending Request UUID -
connect_1 | [pool-15-thread-1] INFO net.snowflake.ingest.connection.RequestBuilder - Created Insert Request : https://foodev.snowflakecomputing.com:443/v1/data/pipes/debezium_dev.debezium.SNOWFLAKE_KAFKA_CONNECTOR_snowflake_test_PIPE_debezium_dev.debezium.customers_0/insertFiles?requestId=1170b5ba-b709-4848-b5d3-48fe62ea2e3b
connect_1 | [pool-15-thread-1] INFO net.snowflake.ingest.utils.HttpUtil - Sleep time in millisecond: 2000
connect_1 | [pool-15-thread-1] INFO net.snowflake.ingest.utils.HttpUtil - Sleep time in millisecond: 4000
connect_1 | [pool-15-thread-1] INFO net.snowflake.ingest.utils.HttpUtil - Sleep time in millisecond: 8000
connect_1 | 2020-01-10 14:58:49,843 INFO || WorkerSourceTask{id=foo-debezium-connector-0} Committing offsets [org.apache.kafka.connect.runtime.WorkerSourceTask]
connect_1 | 2020-01-10 14:58:49,849 INFO || WorkerSourceTask{id=foo-debezium-connector-0} flushing 0 outstanding messages for offset commit [org.apache.kafka.connect.runtime.WorkerSourceTask]
connect_1 | 2020-01-10 14:58:49,900 INFO || WorkerSourceTask{id=foo-debezium-connector-0} Finished commitOffsets successfully in 52 ms [org.apache.kafka.connect.runtime.WorkerSourceTask]
connect_1 | [pool-15-thread-1] INFO net.snowflake.ingest.utils.HttpUtil - Reach the max retry time.
connect_1 | [pool-15-thread-1] INFO net.snowflake.ingest.SimpleIngestManager - Attempting to unmarshall insert response - HttpResponseProxy{HTTP/1.1 500 Internal Server Error [Cache-Control: must-revalidate,no-cache,no-store, Content-Type: text/html;charset=ISO-8859-1, Date: Fri, 10 Jan 2020 14:58:55 GMT, Server: nginx, Strict-Transport-Security: max-age=31536000, X-Content-Type-Options: nosniff, X-Frame-Options: deny, Content-Length: 1469, Connection: keep-alive] ResponseEntityProxy{[Content-Type: text/html;charset=ISO-8859-1,Content-Length: 1469,Chunked: false]}}
connect_1 | [pool-15-thread-1] WARN net.snowflake.ingest.connection.ServiceResponseHandler - Exceptional Status Code found in unmarshallInsert Response - 500
connect_1 | [pool-15-thread-1] ERROR net.snowflake.ingest.connection.ServiceResponseHandler - Status code 500 found in response from service
connect_1 | [pool-14-thread-1] INFO net.snowflake.ingest.connection.RequestBuilder - Final History URIBuilder - https://foodev.snowflakecomputing.com:443/v1/data/pipes/debezium_dev.debezium.SNOWFLAKE_KAFKA_CONNECTOR_snowflake_test_PIPE_debezium_dev.debezium.customers_0/insertReport?requestId=bb62fa74-62cf-40d8-b94a-2c6597458181
connect_1 | [pool-14-thread-1] INFO net.snowflake.ingest.utils.HttpUtil - Sleep time in millisecond: 2000
connect_1 | [pool-14-thread-1] INFO net.snowflake.ingest.utils.HttpUtil - Sleep time in millisecond: 4000
connect_1 | [pool-14-thread-1] INFO net.snowflake.ingest.utils.HttpUtil - Sleep time in millisecond: 8000
connect_1 | [pool-14-thread-1] INFO net.snowflake.ingest.utils.HttpUtil - Reach the max retry time.
connect_1 | [pool-14-thread-1] INFO net.snowflake.ingest.SimpleIngestManager - Attempting to unmarshall history response - HttpResponseProxy{HTTP/1.1 500 Internal Server Error [Cache-Control: must-revalidate,no-cache,no-store, Content-Type: text/html;charset=ISO-8859-1, Date: Fri, 10 Jan 2020 14:59:23 GMT, Server: nginx, Strict-Transport-Security: max-age=31536000, X-Content-Type-Options: nosniff, X-Frame-Options: deny, Content-Length: 1470, Connection: keep-alive] ResponseEntityProxy{[Content-Type: text/html;charset=ISO-8859-1,Content-Length: 1470,Chunked: false]}}
connect_1 | [pool-14-thread-1] WARN net.snowflake.ingest.connection.ServiceResponseHandler - Exceptional Status Code found in unmarshallHistoryResponse - 500
connect_1 | [pool-14-thread-1] ERROR net.snowflake.ingest.connection.ServiceResponseHandler - Status code 500 found in response from service
EDIT: We have also tried giving admin privileges to the user, which has not affected the error.
I see we are not the first to encounter this problem (https://support.snowflake.net/s/question/0D50Z00009VuxuiSAB/snowpipe-rest-api-throwing-http-error-500). We have tried a lot of fixes but to no avail - this is a hard blocker for us! Thanks!
Logs are as follows:
[SF_KAFKA_CONNECTOR] Exception: Invalid encrypted private key or passphrase
[SF_KAFKA_CONNECTOR] Error Code: 0018
[SF_KAFKA_CONNECTOR] Detail: failed to decrypt private key
[SF_KAFKA_CONNECTOR] Message: Cannot find any provider supporting 1.2.840.113549.1.5.13
[SF_KAFKA_CONNECTOR] java.base/javax.crypto.Cipher.getInstance(Cipher.java:565)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.InternalUtils.parseEncryptedPrivateKey(InternalUtils.java:96)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.InternalUtils.createProperties(InternalUtils.java:197)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceFactory$SnowflakeConnectionServiceBuilder.setProperties(SnowflakeConnectionServiceFactory.java:40)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.SnowflakeSinkConnector.start(SnowflakeSinkConnector.java:137)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:195)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:253)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.standalone.StandaloneHerder.startConnector(StandaloneHerder.java:289)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:205)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:109)
at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:367)
at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:343)
at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:330)
at com.snowflake.kafka.connector.internal.InternalUtils.parseEncryptedPrivateKey(InternalUtils.java:109)
at com.snowflake.kafka.connector.internal.InternalUtils.createProperties(InternalUtils.java:197)
at com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceFactory$SnowflakeConnectionServiceBuilder.setProperties(SnowflakeConnectionServiceFactory.java:40)
at com.snowflake.kafka.connector.SnowflakeSinkConnector.start(SnowflakeSinkConnector.java:137)
at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111)
at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136)
at org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:195)
at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:253)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.startConnector(StandaloneHerder.java:289)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:205)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:109)
I believe this is related to this commit which removed the BouncyCastle
crypto provider which is suggested by https://stackoverflow.com/questions/46767281/reading-pkcs8-in-pem-format-cannot-find-provider/46770084#46770084
==> Getting following NPE continuossly and worker threads getting restarts
[SF_KAFKA_CONNECTOR] pipe SNOWFLAKE_KAFKA_CONNECTOR_ dropped (com.snowflake.kafka.connector.internal.SnowflakeJDBCWrapper)
[2019-12-17 11:03:30,616] ERROR WorkerSinkTask{id=<>-2} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
[2019-12-17 11:01:22,388] INFO SinkConnectorConfig values:
config.action.reload = restart
connector.class = com.snowflake.kafka.connector.SnowflakeSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = CorrigoConnector
tasks.max = 4
topics = [name]
topics.regex =
transforms = []
value.converter = class com.snowflake.kafka.connector.records.SnowflakeAvroConverter
(org.apache.kafka.connect.runtime.SinkConnectorConfig)
{"id":0,"state":"FAILED","worker_id":"0.0.0.0:443","trace":"org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.lang.NullPointerException\n\tat java.nio.ByteBuffer.wrap(ByteBuffer.java:396)\n\tat com.snowflake.kafka.connector.records.SnowflakeAvroConverter.toConnectData(SnowflakeAvroConverter.java:75)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:514)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)\n\t... 13 more\n"},{"id":1,"state":"FAILED","worker_id":"0.0.0.0:443","trace":"org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.lang.NullPointerException\n\tat java.nio.ByteBuffer.wrap(ByteBuffer.java:396)\n\tat com.snowflake.kafka.connector.records.SnowflakeAvroConverter.toConnectData(SnowflakeAvroConverter.java:75)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:514)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)\n\t... 13 more\n"},{"id":2,"state":"FAILED","worker_id":"0.0.0.0:443","trace":"org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.lang.NullPointerException\n\tat java.nio.ByteBuffer.wrap(ByteBuffer.java:396)\n\tat com.snowflake.kafka.connector.records.SnowflakeAvroConverter.toConnectData(SnowflakeAvroConverter.java:75)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:514)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)\n\t... 13 more\n"},{"id":3,"state":"FAILED","worker_id":"0.0.0.0:443","trace":"org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.lang.NullPointerException\n\tat java.nio.ByteBuffer.wrap(ByteBuffer.java:396)\n\tat com.snowflake.kafka.connector.records.SnowflakeAvroConverter.toConnectData(SnowflakeAvroConverter.java:75)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:514)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)\n\t...
After this change, the project doesn't compile with Java 11 (8 is fine).
This module com.sun.corba
was deprecated in Java 9 and removed in Java 11.
Could you please reconsider using it?
After upgrading the connector 0.4.0, it is constantly crashing, probably on handling broken data (from looking at the stack trace).
The configuration includes:
"errors.tolerance": "all",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "com.snowflake.kafka.connector.records.SnowflakeJsonConverter",
stack trace:
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.NullPointerException
at com.snowflake.kafka.connector.records.SnowflakeRecordContent.getBrokenData(SnowflakeRecordContent.java:81)
at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.writeBrokenDataToTableStage(SnowflakeSinkServiceV1.java:358)
at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.insert(SnowflakeSinkServiceV1.java:321)
at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.access$100(SnowflakeSinkServiceV1.java:167)
at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1.insert(SnowflakeSinkServiceV1.java:78)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
at com.snowflake.kafka.connector.SnowflakeSinkTask.put(SnowflakeSinkTask.java:160)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
... 10 more
The connector is using two databases that are not documented so far as I can see anywhere. They seem to following this naming convention:
SNOWFLAKE_KAFKA_CONNECTOR_${name}_PIPE_${snowflake.database.name}
SNOWFLAKE_KAFKA_CONNECTOR_${name}_STAGE_${snowflake.database.name}
Stacktrace:
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask:start (com.snowflake.kafka.connector.SnowflakeSinkTask:78)
[2019-10-22 12:07:31,561] INFO
[SF_KAFKA_CONNECTOR] initialized the snowflake connection (com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceV1:38)
[2019-10-22 12:07:31,562] INFO WorkerSinkTask{id=kafkaingest-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:303)
[2019-10-22 12:07:31,836] INFO Cluster ID: npJAAcx-SjyK242XsgS1MQ (org.apache.kafka.clients.Metadata:285)
[2019-10-22 12:07:31,838] INFO [Consumer clientId=consumer-2, groupId=connect-kafkaingest] Discovered group coordinator 10.225.171.232:9093 (id: 2147483646 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:654)
[2019-10-22 12:07:31,851] INFO [Consumer clientId=consumer-2, groupId=connect-kafkaingest] Revoking previously assigned partitions [] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:458)
[2019-10-22 12:07:31,855] INFO [Consumer clientId=consumer-2, groupId=connect-kafkaingest] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:486)
[2019-10-22 12:07:32,485] ERROR WorkerSinkTask{id=kafkaingest-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:585)
java.lang.NullPointerException
at com.snowflake.kafka.connector.SnowflakeSinkTask.put(SnowflakeSinkTask.java:168)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
[2019-10-22 12:07:32,487] ERROR WorkerSinkTask{id=kafkaingest-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.NullPointerException
at com.snowflake.kafka.connector.SnowflakeSinkTask.put(SnowflakeSinkTask.java:168)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
... 10 more
[2019-10-22 12:07:32,489] ERROR WorkerSinkTask{id=kafkaingest-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)
[2019-10-22 12:07:32,489] INFO
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask:stop (com.snowflake.kafka.connector.SnowflakeSinkTask:112)
Configuration:
name=kafkaingest
connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
tasks.max=1
topics=barTopic
buffer.count.records=100
buffer.flush.time=60
buffer.size.bytes=65536
snowflake.url.name=xxx.ap-southeast-2.snowflakecomputing.com
snowflake.user.name=kafka_ingest
snowflake.private.key=xxx
snowflake.private.key.passphrase=xxx
snowflake.database.name=xxx
snowflake.schema.name=xxx
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter
Hi I wanted to cross reference this issue:
https://community.snowflake.com/s/question/0D50Z00009UvjWGSAZ/build-a-combined-docker-image-for-snowflakekafkaconnector-with-cpkafkaconnectbase-to-deploy-on-kafka-connect-cluster
I am not sure how to troubleshoot it if you have a minute.
Hi, first of all, thanks a lot for building & maintaining this connector, it will probably help lots of developers & companies like us. What I am trying to do is, replicating data from one PostgreSQL database to snowflake by using debezium as source and this connector as sink. Debezium part works flawless as it's a bit more mature but I am having problems with snowflake sink connector. According to logs, it can not create a table because the session does not have a current database even though I set the database name while configuring the connector.
Here is the full log;
[SF_KAFKA_CONNECTOR] Creating new table test_source_public_items_196930796. (com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1)
[SF_KAFKA_CONNECTOR] Exception: Failed to prepare SQL statement
[SF_KAFKA_CONNECTOR] Error Code: 2001
[SF_KAFKA_CONNECTOR] Detail: SQL Exception, reported by Snowflake JDBC
[SF_KAFKA_CONNECTOR] Message: Cannot perform CREATE TABLE. This session does not have a current database. Call 'USE DATABASE', or use a qualified name.
[SF_KAFKA_CONNECTOR] net.snowflake.client.jdbc.SnowflakeUtil.checkErrorAndThrowExceptionSub(SnowflakeUtil.java:139)
[SF_KAFKA_CONNECTOR] net.snowflake.client.jdbc.SnowflakeUtil.checkErrorAndThrowException(SnowflakeUtil.java:64)
[SF_KAFKA_CONNECTOR] net.snowflake.client.core.StmtUtil.pollForOutput(StmtUtil.java:495)
[SF_KAFKA_CONNECTOR] net.snowflake.client.core.StmtUtil.execute(StmtUtil.java:372)
[SF_KAFKA_CONNECTOR] net.snowflake.client.core.SFStatement.executeHelper(SFStatement.java:505)
[SF_KAFKA_CONNECTOR] net.snowflake.client.core.SFStatement.executeQueryInternal(SFStatement.java:249)
[SF_KAFKA_CONNECTOR] net.snowflake.client.core.SFStatement.executeQuery(SFStatement.java:187)
[SF_KAFKA_CONNECTOR] net.snowflake.client.core.SFStatement.execute(SFStatement.java:796)
[SF_KAFKA_CONNECTOR] net.snowflake.client.jdbc.SnowflakeStatementV1.executeInternal(SnowflakeStatementV1.java:308)
[SF_KAFKA_CONNECTOR] net.snowflake.client.jdbc.SnowflakePreparedStatementV1.execute(SnowflakePreparedStatementV1.java:509)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceV1.createTable(SnowflakeConnectionServiceV1.java:65)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceV1.createTable(SnowflakeConnectionServiceV1.java:79)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.createTableAndStage(SnowflakeSinkServiceV1.java:681)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.init(SnowflakeSinkServiceV1.java:231)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.insert(SnowflakeSinkServiceV1.java:315)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.access$100(SnowflakeSinkServiceV1.java:172)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1.insert(SnowflakeSinkServiceV1.java:77)
[SF_KAFKA_CONNECTOR] java.util.ArrayList.forEach(ArrayList.java:1257)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.SnowflakeSinkTask.put(SnowflakeSinkTask.java:195)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
[SF_KAFKA_CONNECTOR] java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[SF_KAFKA_CONNECTOR] java.util.concurrent.FutureTask.run(FutureTask.java:266)
[SF_KAFKA_CONNECTOR] java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[SF_KAFKA_CONNECTOR] java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[SF_KAFKA_CONNECTOR] java.lang.Thread.run(Thread.java:748)
And here is the configuration for the connector if it helps;
{
"key.converter.schema.registry.url": "http://schema-registry:8081",
"name": "test_sink",
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"tasks.max": "1",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"topics": [
"test_source.public.items"
],
"snowflake.url.name": "....",
"snowflake.user.name": "kafka_connect",
"snowflake.private.key": "....",
"snowflake.database.name": "kafka_connect_test",
"snowflake.schema.name": "kafka_schema",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
The version of Snowflake sink connector I am using is 0.5.4.
Hi,
We have the following issue with the connector. Two instances of the connector defined, one uses SnowflakeAvroConverter
as value.converter
, another, SnowflakeJsonConverter
. When Kafka Connect initializes everything, it deadlocks in a way like this
Found one Java-level deadlock:
=============================
"pool-14-thread-6":
waiting to lock monitor 0x00007f2d980339c8 (object 0x00000000d100e920, a org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader),
which is held by "pool-14-thread-5"
"pool-14-thread-5":
waiting to lock monitor 0x00007f2d98032b58 (object 0x00000000d2b03558, a org.apache.kafka.connect.runtime.isolation.PluginClassLoader),
which is held by "pool-14-thread-1"
"pool-14-thread-1":
waiting to lock monitor 0x00007f2d980339c8 (object 0x00000000d100e920, a org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader),
which is held by "pool-14-thread-5"
Java stack information for the threads listed above:
===================================================
"pool-14-thread-6":
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:709)
at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:471)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:464)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:62)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:75)
at org.apache.kafka.connect.runtime.ConnectorConfig.<init>(ConnectorConfig.java:205)
at org.apache.kafka.connect.runtime.ConnectorConfig.<init>(ConnectorConfig.java:201)
at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:405)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:865)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:110)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:880)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:876)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
"pool-14-thread-5":
at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:91)
- waiting to lock <0x00000000d2b03558> (a org.apache.kafka.connect.runtime.isolation.PluginClassLoader)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.loadClass(DelegatingClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:709)
at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:471)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:464)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:62)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:75)
at org.apache.kafka.connect.runtime.ConnectorConfig.<init>(ConnectorConfig.java:205)
at org.apache.kafka.connect.runtime.ConnectorConfig.<init>(ConnectorConfig.java:201)
at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:405)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:865)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:110)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:880)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:876)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
"pool-14-thread-1":
at java.lang.ClassLoader.loadClass(ClassLoader.java:404)
- waiting to lock <0x00000000d100e920> (a org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.loadClass(DelegatingClassLoader.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
- locked <0x00000000d045eed8> (a java.lang.Object)
at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
- locked <0x00000000d045eed8> (a java.lang.Object)
- locked <0x00000000d2b03558> (a org.apache.kafka.connect.runtime.isolation.PluginClassLoader)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:96)
- locked <0x00000000d045ef20> (a java.lang.Object)
- locked <0x00000000d2b03558> (a org.apache.kafka.connect.runtime.isolation.PluginClassLoader)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at com.snowflake.kafka.connector.records.SnowflakeConverter.<clinit>(SnowflakeConverter.java:39)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:709)
at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:471)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:464)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:62)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:75)
at org.apache.kafka.connect.runtime.ConnectorConfig.<init>(ConnectorConfig.java:205)
at org.apache.kafka.connect.runtime.ConnectorConfig.<init>(ConnectorConfig.java:201)
at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:234)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:908)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:110)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:924)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:920)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
I managed to boil it down to the following reproducing code:
package org.apache.kafka.connect.runtime;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import static org.apache.kafka.connect.runtime.WorkerConfig.PLUGIN_PATH_CONFIG;
public class Xxxxxx {
public static void main(String[] args) throws InterruptedException {
Map<String, String> props = new HashMap<>();
props.put(PLUGIN_PATH_CONFIG, "<path>");
Plugins plugins = new Plugins(props);
plugins.compareAndSwapWithDelegatingLoader();
Thread1 t1 = new Thread1(plugins);
Thread2 t2 = new Thread2(plugins);
t1.start();
t2.start();
t1.join();
t2.join();
}
private static class Thread1 extends Thread {
private final Plugins plugins;
private Thread1(final Plugins plugins) {
super("Thread-1");
this.plugins = plugins;
}
@Override
public void run() {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
Map<String, String> props = new HashMap<>();
props.put("name", "snowflake_avro");
props.put("connector.class", "com.snowflake.kafka.connector.SnowflakeSinkConnector");
props.put("value.converter", "com.snowflake.kafka.connector.records.SnowflakeAvroConverter");
ConnectorConfig connectorConfig = new ConnectorConfig(plugins, props);
}
}
private static class Thread2 extends Thread {
private final Plugins plugins;
private Thread2(final Plugins plugins) {
super("Thread-2");
this.plugins = plugins;
}
@Override
public void run() {
Map<String, String> props = new HashMap<>();
props.put("name", "snowflake_json");
props.put("connector.class", "com.snowflake.kafka.connector.SnowflakeSinkConnector");
props.put("value.converter", "com.snowflake.kafka.connector.records.SnowflakeJsonConverter");
ConnectorConfig connectorConfig = new ConnectorConfig(plugins, props);
}
}
}
(reproduces in ~40% of runs).
I'm not absolutely sure now, but this may be caused by https://issues.apache.org/jira/browse/KAFKA-7421. This issue may be fixed at some point, but who knows when and if it will be backported.
I propose to address this with #24. The idea is very simple: delay ObjectMapper
initialization to object creation by making it non-static. It's not far (in real time) from the class loading, but it's enought to break the lock. The change is very small in introduces close to 0 overhead.
Version 1.1.0 has org.apache.avro 19.0
and io.confluent.kafka-connect-avro-converter 5.3.0
(which has org.apache.avro 1.8.1
as compile time dependency) as compile time dependencies. Installation Prerequisites document says that you can use Avro 1.8.2+ and Kafka Connect Avro Connector 5.0.0+.
Since Avro version 1.9.0 comes with the artifact it overrides Avro dependency in the runtime. Due to API changes on version 1.9.0 this causes other issues for other connectors that using it.
In our case, we have Avro version 1.8.2 and Kafka Avro Converter version 5.1.2 in our Kafka Connect deployment. While running with Snowflake Kafka Connector, Debezium connector throws following;
java.lang.NoSuchMethodError: 'void org.apache.avro.Schema.addProp(java.lang.String, org.codehaus.jackson.JsonNode)'
at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:916)
at io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:1059)
at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:900)
at io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:1059)
at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:900)
at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:732)
at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:726)
at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:365)
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:80)
at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:270)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:270)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:294)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:229)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Avro converter is optional, due to that I think both Avro and Kafka Avro Converter should be provided by runtime.
It's my understanding that uploading Connectors to Confluent Hub isn't as easy as uploading a jar to a package repository, but it would be nice to know what your timeline for getting this Connector onto Confluent Hub is. Are you waiting for 1.0? Is there some approval process that's in the works but not yet complete? Thanks!
We are using the connector in production but we find that we have to rely on user created timestamps or Kafka timestamps for ETL which isn't great. We want to be able to use the timestamp of when each record gets written down in Snowflake. In order to do that we currently have to alter each KafkaConnect created table in Snowflake with default current_timestamp()
as the value of the record_timestamp . The problem, of course, is that it's an asynchronous operation. We would find it very useful if the connector could create this column by itself or there was a mechanism for creating this third column with the connector configuration
I would like to submit an enhancement request to publish committed offsets to a designated topic in Kafka, whereby a microservice could listen for and process them shortly after the records were committed to Snowflake. Some background info in Stackoverflow can be found here:
https://stackoverflow.com/questions/59758541/writing-records-into-snowflake-from-kafka/59808570#59808570
The wrapped technique suggested in Stackoverflow can work but support directly within the connector would be more appropriate.
[kafkaconnect-9e4947f-2]2019-10-29T18:57:52.282205[kafka-connect][2019-10-29 18:57:52,281] ERROR WorkerSinkTask{id=SnowflakeSink-0} Commit of offsets threw an unexpected exception for sequence number 1: null (org.apache.kafka.connect.runtime.WorkerSinkTask:259)
[kafkaconnect-9e4947f-2]2019-10-29T18:57:52.282205[kafka-connect]com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException:
[kafkaconnect-9e4947f-2]2019-10-29T18:57:52.282205[kafka-connect][SF_KAFKA_CONNECTOR] Exception: Failed to put records
[kafkaconnect-9e4947f-2]2019-10-29T18:57:52.282205[kafka-connect][SF_KAFKA_CONNECTOR] Error Code: 5014
[kafkaconnect-9e4947f-2]2019-10-29T18:57:52.282205[kafka-connect][SF_KAFKA_CONNECTOR] Detail: SinkTask hasn't been initialized before calling PUT function```
Can anyone help me identify the root cause of this error?
I'm running into an issue when trying to initialize the connector to read from a Kafka topic (running on 2.3.0). This is Stack trace below:
[SF_KAFKA_CONNECTOR] Creating new table OS_KAFKA_TEST. (com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1) [task-thread-KAFKA_SF_TEST-0]
2019-10-16 18:05:14,984 WARN WorkerSinkTask{id=KAFKA_SF_TEST-0} Offset commit failed during close (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-KAFKA_SF_TEST-0]
2019-10-16 18:05:14,984 ERROR WorkerSinkTask{id=KAFKA_SF_TEST-0} Commit of offsets threw an unexpected exception for sequence number 1: null (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-KAFKA_SF_TEST-0]
java.lang.NullPointerException
at com.snowflake.kafka.connector.SnowflakeSinkTask.lambda$preCommit$1(SnowflakeSinkTask.java:186)
at java.util.HashMap.forEach(HashMap.java:1289)
at com.snowflake.kafka.connector.SnowflakeSinkTask.preCommit(SnowflakeSinkTask.java:183)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:378)
at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:590)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2019-10-16 18:05:14,984 ERROR WorkerSinkTask{id=KAFKA_SF_TEST-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-KAFKA_SF_TEST-0]
java.lang.NullPointerException
at com.snowflake.kafka.connector.SnowflakeSinkTask.close(SnowflakeSinkTask.java:153)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:396)
at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:590)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2019-10-16 18:05:14,984 ERROR WorkerSinkTask{id=KAFKA_SF_TEST-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-KAFKA_SF_TEST-0]
Hi, I am using the sink connector version 0.5.4 and everything is working as expected if I use JSON as the message format but when I switch to Avro, then task crashes.
Here is the Debezium source configuration;
{
"name": "debezium_test_source",
"config": {
"tasks.max": "1",
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.delete.handling.mode": "rewrite",
"slot.name": "test_slot",
"plugin.name": "wal2json",
"database.server.name": "test_database",
"database.dbname": "test_database",
"database.hostname": "db-host-name",
"database.port": "5432",
"database.user": "db-user",
"database.password": "db-password",
"schema.whitelist": "public",
"table.whitelist": "public.test_table"
}
}
and here is the snowflake sink configuration;
{
"name": "snowflake_test_sink",
"config": {
"tasks.max": "1",
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"key.converter": "com.snowflake.kafka.connector.records.SnowflakeAvroConverter",
"value.converter": "com.snowflake.kafka.connector.records.SnowflakeAvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"topics": "test_database.public.test_table",
"snowflake.url.name": "....",
"snowflake.user.name": "....",
"snowflake.private.key": "....",
"snowflake.private.key.passphrase": "....",
"snowflake.database.name": "....",
"snowflake.schema.name": "...."
}
}
and finally here is the log;
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask:close (com.snowflake.kafka.connector.SnowflakeSinkTask)
[2019-11-06 22:53:24,636] ERROR WorkerSinkTask{id=snowflake_test_sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
java.lang.NullPointerException
at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.stopCleaner(SnowflakeSinkServiceV1.java:266)
at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.close(SnowflakeSinkServiceV1.java:649)
at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.access$1500(SnowflakeSinkServiceV1.java:172)
at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1.lambda$close$0(SnowflakeSinkServiceV1.java:91)
at java.util.HashMap.forEach(HashMap.java:1289)
at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1.close(SnowflakeSinkServiceV1.java:90)
at com.snowflake.kafka.connector.SnowflakeSinkTask.close(SnowflakeSinkTask.java:183)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:396)
at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:590)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2019-11-06 22:53:24,638] ERROR WorkerSinkTask{id=snowflake_test_sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
[2019-11-06 22:53:24,639] INFO
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask:stop (com.snowflake.kafka.connector.SnowflakeSinkTask)
Hi,
Getting below exception when using an encrypted key. Added bc-fips and changed like below to add the required classes.
"Caused by: java.lang.ClassNotFoundException: org.bouncycastle.jcajce.provider.BouncyCastleFipsProvider"
Added bc-fips and changed like below to add the required classes.
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-fips</artifactId>
<version>1.0.3</version>
<!--scope>provided</scope-->
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bc-fips</artifactId>
<version>1.0.2</version>
<!--scope>provided</scope-->
</dependency>
2020-03-13 17:17:12,296 ERROR WorkerConnector{id=snowflake_sink_messages} Error while starting connector (org.apache.kafka.connect.runtime.WorkerConnector) [pool-10-thread-1
]
org.bouncycastle.crypto.fips.FipsOperationError: org.bouncycastle.crypto.fips.FipsOperationError: Module checksum failed: expected [d68017e6f45512a51a862c77e4fa2348d0d24d816bc3f76fcf30375052216e86]
got [f4fd53410d9b8c9c253ec0f349c26f6b19c0c7befabd5d09be48cc857661fc74]
On the 24th of March, connector failed due to Snowflake incident and it was closed more than 1 hours;
WorkerSinkTask{id=dwh_toyblast_connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted.
[org.apache.kafka.connect.runtime.WorkerSinkTask]
com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException:
[SF_KAFKA_CONNECTOR] Exception: Failed to upload file
[SF_KAFKA_CONNECTOR] Error Code: 2003
[SF_KAFKA_CONNECTOR] Detail: Failed to upload file to Snowflake Stage though JDBC
[SF_KAFKA_CONNECTOR] Message: Internal error: Processing aborted due to error 370001:4037195285; incident 8554447.
[SF_KAFKA_CONNECTOR] net.snowflake.client.jdbc.SnowflakeUtil.checkErrorAndThrowExceptionSub(SnowflakeUtil.java:152)
[SF_KAFKA_CONNECTOR] net.snowflake.client.jdbc.SnowflakeUtil.checkErrorAndThrowException(SnowflakeUtil.java:77)
[SF_KAFKA_CONNECTOR] net.snowflake.client.core.StmtUtil.pollForOutput(StmtUtil.java:495)
[SF_KAFKA_CONNECTOR] net.snowflake.client.core.StmtUtil.execute(StmtUtil.java:372)
[SF_KAFKA_CONNECTOR] net.snowflake.client.core.SFStatement.executeHelper(SFStatement.java:575)
[SF_KAFKA_CONNECTOR] net.snowflake.client.jdbc.SnowflakeFileTransferAgent.parseCommandInGS(SnowflakeFileTransferAgent.java:1391)
[SF_KAFKA_CONNECTOR] net.snowflake.client.jdbc.SnowflakeFileTransferAgent.parseCommand(SnowflakeFileTransferAgent.java:1029)
[SF_KAFKA_CONNECTOR] net.snowflake.client.jdbc.SnowflakeFileTransferAgent.<init>(SnowflakeFileTransferAgent.java:1003)
[SF_KAFKA_CONNECTOR] net.snowflake.client.jdbc.SnowflakeConnectionV1.uploadStreamInternal(SnowflakeConnectionV1.java:1122)
[SF_KAFKA_CONNECTOR] net.snowflake.client.jdbc.SnowflakeConnectionV1.compressAndUploadStream(SnowflakeConnectionV1.java:1046)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceV1.put(SnowflakeConnectionServiceV1.java:603)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.flush(SnowflakeSinkServiceV1.java:453)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.insert(SnowflakeSinkServiceV1.java:423)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.access$100(SnowflakeSinkServiceV1.java:232)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1.insert(SnowflakeSinkServiceV1.java:88)
[SF_KAFKA_CONNECTOR] java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.SnowflakeSinkTask.put(SnowflakeSinkTask.java:203)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
[SF_KAFKA_CONNECTOR] java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
[SF_KAFKA_CONNECTOR] java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[SF_KAFKA_CONNECTOR] java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[SF_KAFKA_CONNECTOR] java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[SF_KAFKA_CONNECTOR] java.base/java.lang.Thread.run(Thread.java:834)
at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:367)
at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:343)
at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:330)
at com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceV1.put(SnowflakeConnectionServiceV1.java:607)
at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.flush(SnowflakeSinkServiceV1.java:453)
at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.insert(SnowflakeSinkServiceV1.java:423)
at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.access$100(SnowflakeSinkServiceV1.java:232)
at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1.insert(SnowflakeSinkServiceV1.java:88)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
at com.snowflake.kafka.connector.SnowflakeSinkTask.put(SnowflakeSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
After we restarted it, we noticed for one of the topics consumer lag was increasing in linear scale.
Checking Kafka Connect logs, it was commiting same offset every interval (which offset was not present due to log retention);
INFO || WorkerSinkTask{id=dwh_toyblast_connector-0} Committing offsets asynchronously using sequence number 42145: {toyblast.toyblast_prod.users-0=OffsetAndMetadata{offset=3013837293, leaderEpoch=null, metadata=''}, toyblast.toyblast_prod.android_payments-0=OffsetAndMetadata{offset=2243967, leaderEpoch=null, metadata=''}, toyblast.toyblast_prod.ios_payments-0=OffsetAndMetadata{offset=4598722, leaderEpoch=null, metadata=''}}
Even though we were able to see recent data in stage tables, their offsets were not comitted. After restarting connector, we noticed recover step was re-insgesting all the files in the stage.
Stage for the problematic topic had more than 22000 files when we noticed the problem, starting from 24th of March and it kept on increasing.
After further debugging, we found that step that marks files older than 1 hour as failed before updating offsets was getting stuck.
In this block tmpFileNames
list getting mutated during iteration. After first hit to following lines was making it block and never reach to updateOffset()
step.
Kafka Connect's docs indicate there's a topics.regex configuration parameter that's provided by the SinkTask
class, which your SinkTask class extends. It would be really useful to us to have that configuration parameter honored by the Snowflake Connector. It would let us add Kafka topics that met a certain pattern without having to update our Connector configuration.
Scenario:
buffer.count.records=400000
buffer.flush.time=1800
While restarting, sometimes the consumer group rebalances and we miss the last 1800(buffer.flush.time
) seconds data.
Message:
[2020-02-17 05:30:34,146] INFO [Consumer clientId=connector-consumer-snowflake_gis_stats_v5-1, groupId=connect-snowflake_gis_stats_v5] Attempt to heartbeat failed since group is rebalancing (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:936)
Whenever we encounter this message while restarting, we miss the messages collected in the last 1800(buffer.flush.time
) seconds.
about every 30min Connect log gets flooded with following messages from Snowflake connector which are printed at very high rate within one second.
All the messages in the same 1sec burst show same token / subject / issuer. Perhaps only one can be printed as they all seem to show same info.
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11423)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11424)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11425)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11426)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11427)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11428)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11429)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11430)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11431)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11434)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11433)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11435)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11436)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11437)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11432)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11438)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11440)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11439)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11441)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11442)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11443)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11444)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11445)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11446)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11447)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11448)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11449)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
Mar 25 09:34:57 kaf-prd-wrk04 connect-distributed[65848]: [SecurityManager-1(11450)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer ...
I'm encountering an error when trying to sinc:
[SF_KAFKA_CONNECTOR] Exception: Failed to prepare SQL statement
[SF_KAFKA_CONNECTOR] Error Code: 2001
[SF_KAFKA_CONNECTOR] Detail: SQL Exception, reported by Snowflake JDBC
[SF_KAFKA_CONNECTOR] Message: Cannot perform CREATE TABLE. This session does not have a current database. Call 'USE DATABASE', or use a qualified name.
My configuration is as follows:
{
"name": "SnowflakeSink",
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "com.snowflake.kafka.connector.records.SnowflakeAvroConverter",
"value.converter.schema.registry.url": "SCHEMA_URL,
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.basic.auth.user.info": "SHEMA_USER:SCHEMA_PASS",
"topics.regex": ".*AVRO.*",
"errors.retry.timeout": "1000",
"errors.retry.delay.max.ms": "3000",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"snowflake.url.name": "XXX.snowflakecomputing.com",
"snowflake.user.name": "YYY",
"snowflake.private.key": "ZZZ",
"snowflake.database.name": "KAFKA_EVENTS",
"snowflake.schema.name": "PUBLIC",
"buffer.count.records": "50000",
"buffer.size.bytes": "20000000",
"buffer.flush.time": "180"
}
I've verified that the database KAFKA_EVENTS has a schema named PUBLIC. Any help here?
To ensure data on low-throughput topics gets written to Snowflake in a timely manner, it would be great if the connector could be configured to ingest messages into Snowflake every N seconds in addition to the current approaches of record count and total record size (in bytes).
Example of Snowflake Connector log statement:
[SecurityManager-1(2981)] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with subject OPTORO.STAGING_KAFKA_CONNECT
Example of Kafka Connect log statement:
{"timestamp":"2020-01-02 11:12:33,806","level":"INFO","logger":"org.apache.kafka.connect.runtime.rest.RestServer","thread":"qtp1871312485-27317","message":"127.0.0.1 - - [02/Jan/2020:11:12:33 +0000] \"GET /connectors HTTP/1.1\" 200 255 1"}
This prevents us from ingesting the Connector's log statements into our logging pipeline.
[2020-03-23 00:08:27,731] INFO Successfully processed removal of connector 'snow_sink' (org.apache.kafka.connect.storage.KafkaConfigBackingStore) [2020-03-23 00:08:27,731] INFO [Worker clientId=connect-1, groupId=connect-cluster] Connector snow_sink config removed (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [2020-03-23 00:08:27,731] INFO [Worker clientId=connect-1, groupId=connect-cluster] Handling connector-only config update by stopping connector snow_sink (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [2020-03-23 00:08:27,731] INFO Stopping connector snow_sink (org.apache.kafka.connect.runtime.Worker) [SF_KAFKA_CONNECTOR] SnowflakeSinkConnector:stop (com.snowflake.kafka.connector.SnowflakeSinkConnector) [2020-03-23 00:08:27,732] INFO Stopped connector snow_sink (org.apache.kafka.connect.runtime.Worker) [2020-03-23 00:08:28,474] INFO [Worker clientId=connect-1, groupId=connect-cluster] Joined group at generation 216 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-1-7e5cc2d8-a54a-4b0a-95a2-b245b10fce66', leaderUrl='http://ec2-34-213-4-7.us-west-2.compute.amazonaws.com:8083/', offset=304, connectorIds=[], taskIds=[], revokedConnectorIds=[snow_sink], revokedTaskIds=[snow_sink-0, snow_sink-1, snow_sink-3], delay=5179} with rebalance delay: 5179 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask) [SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:3]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask) [SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:1]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask) [SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask) [SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:3]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
Deleting the connector doesn't seem to revoke the tasks correctly. Please see log above.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.