confluentinc / kafka-connect-elasticsearch Goto Github PK
View Code? Open in Web Editor NEWKafka Connect Elasticsearch connector
License: Other
Kafka Connect Elasticsearch connector
License: Other
Is it possible to transform from { Lat: valLat, Lon:valLon, name:placeName} to {name:placeName, Location : {Lat:valLat,Lon:valLon} } format using single message transform with this plugin.
I tried this ....
transforms=hoistToStruct
transforms.hoistToStruct.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.hoistToStruct.field=Location
It works
, the problem is record transformed to {Location: {name:placeName,Lat:valLat,Lon:valLon} } which is not what I want.
How do you feel about adding an option to treat kafka connect logical date fields (date, datetime, timestamp) as Elasticsearch date types? Both for schema creation and data conversion.
For background, we are trying to sink records with logical date fields (days since epoch) to an ES index with a corresponding date field. ES doesn't know how to interpret days since epoch and is expecting epoch_millis (or another accepted format).
POST or GET in /connectors return 500 Timeout in distribuited mode.
[2017-03-21 21:26:04,794] INFO 127.0.0.1 - - [21/Mar/2017:21:24:34 +0000] "GET /connectors HTTP/1.1" 500 48 90235 (org.apache.kafka.connect.runtime.rest.RestServer:60)
Hi.
We're trying to migrate from Datamountaineer Elastic Sink connector to the official Confluent connector. We're experiencing problems with geo_point data types, correctly working with the previous connector, but obviously it depends on custom connector logic. This is the error, repeated for all points:
org.apache.kafka.connect.errors.ConnectException: Bulk request failed: [{"type":"mapper_parsing_exception","reason":"failed to parse [geoVal]","caused_by":{"type":"number_format_exception","reason":"For input string: \"43.54801,10.31061\""}}]
at io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.execute(BulkProcessor.java:353)
at io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.call(BulkProcessor.java:326)
at io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.call(BulkProcessor.java:312)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
This is the avro schema:
{"namespace": "com.streams.avro",
"type": "record",
"name": "Data",
"fields": [
(...)
{"name": "geoVal", "type": "string"},
(...)
]
}
This is the properties file content:
name=kafka-to-elasticsearch-connector
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=MYTOPIC
topic.index.map=MYTOPIC:myindex
key.ignore=true
topic.schema.ignore=MYTOPIC
connection.url=http://elasticsearch:9200
type.name=kafka-connect
This is the connector log initialization:
[2016-12-16 13:59:34,354] INFO Instantiated connector kafka-to-elasticsearch-measm-connector with version 3.1.1 of type class io.confluent.connect.elasticsearch.ElasticsearchSinkConnector (org.apache.kafka.connect.runtime.Worker)
[2016-12-16 13:59:34,356] INFO ElasticsearchSinkConnectorConfig values:
batch.size = 2000
connection.url = http://elasticsearch:9200
flush.timeout.ms = 10000
key.ignore = true
linger.ms = 1
max.buffered.records = 20000
max.in.flight.requests = 5
max.retries = 5
retry.backoff.ms = 100
schema.ignore = false
topic.index.map = [MEASM-TOPIC:measm_by_sensor_param]
topic.key.ignore = []
topic.schema.ignore = [MEASM-TOPIC]
type.name = kafka-connect
(io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig)
[2016-12-16 13:59:34,357] INFO Finished creating connector kafka-to-elasticsearch-connector (org.apache.kafka.connect.runtime.Worker)
Ths is the field of elastic index:
(...)
"geoVal": {
"type": "geo_point"
},
(...)
I'm not sure if it's an avro schema problem or if we're missing some configuration on properties file.
Thanks
Luca
Are there any plans to support multiple es hosts?
I've got a connector set up with a constant low volume stream of messages (10/min). While everything seems to work fine the following warning is being logged at regular intervals:
WARN Commit of WorkerSinkTask{id=elastic-authenticatedcall-sink-2} offsets timed out
The storage topics are configured as specified here http://docs.confluent.io/3.2.0/connect/userguide.html#distributed-worker-configuration
Timeouts are set to the defaults (which seem generous)
The kafka cluster is running 3 nodes and has essentially no load. Everything appears to work fine, and when stopping and starting the connector it appears to pick up where it left off.
Can anyone shed any light on this warning? Should I be concerned?
as reported in #51
i hive use kafka connector to write data to es,kafka version is 0.10.1.1 and es version is 5.2.1,but when i start connect-standalone i hive get a error:
[pool-1-thread-1] ERROR org.apache.kafka.connect.runtime.WorkerTask - Task kafka-to-elasticsearch-1-0 threw an uncaught and unrecoverable exception
com.google.gson.JsonSyntaxException: com.google.gson.stream.MalformedJsonException: Use JsonReader.setLenient(true) to accept malformed JSON at line 1 column 5 path $
at com.google.gson.JsonParser.parse(JsonParser.java:65)
at com.google.gson.JsonParser.parse(JsonParser.java:45)
at io.searchbox.action.AbstractAction.parseResponseBody(AbstractAction.java:94)
at io.searchbox.action.AbstractAction.createNewElasticSearchResult(AbstractAction.java:65)
at io.searchbox.action.GenericResultAbstractAction.createNewElasticSearchResult(GenericResultAbstractAction.java:20)
at io.searchbox.client.http.JestHttpClient.deserializeResponse(JestHttpClient.java:131)
at io.searchbox.client.http.JestHttpClient.execute(JestHttpClient.java:50)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.createIndicesForTopics(ElasticsearchWriter.java:248)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.open(ElasticsearchSinkTask.java:113)
at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:431)
at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1000(WorkerSinkTask.java:55)
at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:467)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:317)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:235)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.google.gson.stream.MalformedJsonException: Use JsonReader.setLenient(true) to accept malformed JSON at line 1 column 5 path $
at com.google.gson.stream.JsonReader.syntaxError(JsonReader.java:1573)
at com.google.gson.stream.JsonReader.checkLenient(JsonReader.java:1423)
at com.google.gson.stream.JsonReader.doPeek(JsonReader.java:546)
at com.google.gson.stream.JsonReader.peek(JsonReader.java:429)
at com.google.gson.JsonParser.parse(JsonParser.java:60)
... 28 more
[pool-1-thread-1] ERROR org.apache.kafka.connect.runtime.WorkerTask - Task is being killed and will not recover until manually restarted
and my connector conf is :
bootstrap.servers=es-1:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
also the connector sink conf is :
name=kafka-to-elasticsearch-1
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=connect-test-3
topic.index.map=connect-test-3:myindex
key.ignore=true
topic.schema.ignore=ture
connection.url=http://es-2:9200,http://es-3:9200
type.name=kafka-connect
and who i tell what the problem is .thanks
Is there support for HTTP authentication with an Elasticsearch cluster protected by Shield?
If not, is there a way of handling the authentication elsewhere, i.e. an Elasticsearch client node or similar?
I am trying to run the ES connector against an Avro-based Kafka topic, which is using the Schema Registry.
Unfortunately when the connector starts it fails fetching the schema from the Schema Registry.
error:
[2016-12-21 21:30:51,955] ERROR Task ES-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
org.apache.kafka.connect.errors.DataException: Failed to deserialize data to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:109)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:358)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:239)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 421
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:170)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:187)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:323)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:316)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:63)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndID(CachedSchemaRegistryClient.java:118)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:121)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:190)
at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:130)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:99)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:358)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:239)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2016-12-21 21:30:51,961] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:143)
[2016-12-21 21:30:51,961] INFO Stopping ElasticsearchSinkTask. (io.confluent.connect.elasticsearch.ElasticsearchSinkTask:135)
connect-distributed.properties:
bootstrap.servers=10.xx.xx.xx:9090,10.xx.xx.xx:9090
group.id=connect-cluster
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://10.xx.xx.xx:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://10.xx.xx.xx:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
config.storage.topic=connect-configs
status.storage.topic=connect-status
The Schema Registry log shows that it received a request for /subjects/[topicname]-value
, but returned 404.
We use Schema Registry for long now, but nowhere in its request logs I have ever seen a request for anything like /subjects/[topicname]-value
Elasticsearch has support for Ingest Pipelines to perform transformations on data before it is indexed. The indexing API requires a parameter to be passed to use a pipeline:
PUT /myindex/type/1?pipeline=monthlyindex
{
"date1" : "2016-04-25T12:02:01.789Z"
}
Would it be possible to add this as an option to the Kafka Connect Elasticsearch connector?
We aren't currently setting connection and read timeouts on the HTTP client. I believe the default is infinite.
AWS Elasticsearch implements a custom signature method to authenticate users [0].
It would be nice to be able to use this connector to move data into AWS Elasticsearch clusters that require authentication.
Could someone can tell user how to use this connect, use as a plugin , use as an application or other ??
I think we need provide a simple readme , should we ??
Hi There,
I am trying to load data from a kafka topic to elasticsearch index with key.ignore=true. But on elasticsearch, I see docs.deleted counts getting increased/decreased but still docs.count seems to be perfect. Am I missing something here? I understand that with lucene updations take place as delete+insert. But with key.ignore set to true, these should not happen. It would be really great if you can help me understand.
Thanks,
Deepti
Hi.
We have this error trying to run connector 3.1.1 and elasticsearch 5.3 (from official ES docker):
org.apache.kafka.connect.errors.ConnectException: Could not create index:MEASM-TOPIC at io.confluent.connect.elasticsearch.ElasticsearchWriter.createIndicesForTopics(ElasticsearchWriter.java:251) at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.open(ElasticsearchSinkTask.java:113) at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:431) at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1000(WorkerSinkTask.java:55) at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:467) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979) at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:317) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:235) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
This is the connector configuration:
{
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"type.name": "kafka-connect",
"tasks.max": "1",
"topics": "MEASM-TOPIC",
"name": "ElasticsearchSinkConnector",
"key.ignore": "true",
"connection.url": "http://elasticsearch:9200"
}
It seems the problems is on the specific topic. We tried with another new test topic and connector didn't throw any exception: index on elastic was correctly created and data moved from kafka to elastic. The stack trace didn't give explanation on the cause of the problem.
Thanks
Luca
If I have
key.ignore=true
I get a failure:
[pool-2-thread-1] ERROR org.apache.kafka.connect.runtime.WorkerSinkTask - Task elasticsearch-sink-0 threw an uncaught and unrecoverable exception
org.apache.kafka.connect.errors.DataException: STRUCTis not supported as the document id.
at io.confluent.connect.elasticsearch.DataConverter.convertKey(DataConverter.java:80)
at io.confluent.connect.elasticsearch.DataConverter.convertRecord(DataConverter.java:134)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:308)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:129)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:381)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:227)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
The config is definitely being picked up :
[pool-2-thread-1] INFO io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig - ElasticsearchSinkConnectorConfig values:
type.name = kafka-connect
batch.size = 2000
max.retry = 5
key.ignore = true
max.in.flight.requests = 5
retry.backoff.ms = 100
max.buffered.records = 20000
schema.ignore = false
flush.timeout.ms = 10000
topic.index.map = [ORCL.SOE.LOGON:soe.logon]
topic.key.ignore = []
connection.url = http://localhost:9200
topic.schema.ignore = []
linger.ms = 1
If I set the key ignore specifically for the topic:
topic.key.ignore = ORCL.SOE.LOGON
Then processing works fine. Should key.ignore
work this way (globally) or am I misunderstanding it?
Hi There,
First of all thanks for the work on this connector - looking forward to getting it up and running.
In Kafka I have my topics created with an uppercase naming, so when starting up the ES sink connector it fails as per the following...that's ok, I get that:
[2016-07-30 10:49:31,717][DEBUG][action.admin.indices.create] [Mandrill] [REP-SOE.CUSTOMERS] failed to create
[REP-SOE.CUSTOMERS] InvalidIndexNameException[Invalid index name [REP-SOE.CUSTOMERS], must be lowercase]
So I thought to map between the topic and the index as per the following:
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=REP-SOE.CUSTOMERS
key.ignore=true
connection.url=http://localhost:9200
type.name=kafka-connect
topic.index.map=rep-soe.customers
But I get this error:
(io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig:178)
[2016-07-30 10:53:17,941] ERROR Task elasticsearch-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
java.lang.ArrayIndexOutOfBoundsException: 1
Can you please explain how to configure this mapping properly (that is between REP-SOE.CUSTOMERS (topic) and the rep-soe.customers (index)).
Note also that I have not created this index in Elastic as I saw that the sink connector does this in my initial tests. Would it still create a new index when these mappings are used?
Thanks for your help here.
I'm using the TimestampRouter to index documents in rolling daily indexes. It is indexing documents correctly, however, I'm seeing a ton of these WARN messages in my connect logs:
[2017-06-07 20:30:38,932] WARN Ignoring invalid task provided offset test.topic6-20170607-0/OffsetAndMetadata{offset=4, metadata=''} -- partition not assigned (org.apache.kafka.connect.runtime.WorkerSinkTask:337)
Is this something I should be concerned about? And/or should the ES connector be handling this logic differently?
Hi, I saw the current ES connector is using jest as the ES client.
I have to 2 questions:
According to documentation, topic.index.map
config defines mapping between Kafka topics and Elasticsearch indices. I have the following map configured: topic.index.map=mytopic:myindex
, so I expect records from mytopic
to be added to myindex
. But in io.confluent.connect.elasticsearch.ElasticsearchWriter#write
method I see another behavior - topic name is used for schema mapping, despite the fact that mapped index name was used during initialization and index creation. So I always receive index_not_found_exception in io.confluent.connect.elasticsearch.Mapping#createMapping
as long as I have index myindex
, but connector is trying to find mytopic
one. Quick fix solved this issue for me:
String mappedIndex = topicToIndexMap.get(sinkRecord.topic());
final String index = mappedIndex != null ? mappedIndex : sinkRecord.topic();
...but I'm not sure if I've got the idea of the given config correctly and using it in a proper way. Could you kindly check if it's bug or not?
Thanks
I have just upgraded our Kafka cluster to 3.1.1 to take advantage the new Elasticsearch connector.
Unfortunately when I start the Elasticsearch connector it fails with a Schema Registry error and looking at the logs of the Schema Registry I see that the connector made a request to /subjects/[topic]-value
, which doesn't exist, so it receives a 404, causing the connector to fail.
Why is the connector making a POST request to /subjects/[topic]-value
?
What is this -value
thing, where is it coming from?
Some of the logs:
[2016-12-16 20:40:28,610] ERROR Task ES-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
org.apache.kafka.connect.errors.DataException: Failed to deserialize data to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:109)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:358)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:239)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 441
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:170)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:187)
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:215)
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:206)
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:200)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersionFromRegistry(CachedSchemaRegistryClient.java:69)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:161)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:149)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:190)
at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:130)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:99)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:358)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:239)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2016-12-16 20:40:28,612] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:143)
The Schema registry (observer the response status of 404):
Fri Dec 16 20:40:28 UTC 2016: [2016-12-16 20:40:28,566] INFO 10.206.xx.xx - - [16/Dec/2016:20:40:28 +0000] "POST /subjects/apphits-value HTTP/1.1" 404 51 3 (io.confluent.rest-utils.requests:77)
Connect-distributed.properties:
# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=10.206.xx.xx:9092,10.206.xx.xx:9092
# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
group.id=frbi-connect-cluster
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://10.206.xx.xx:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://10.206.xx.xx:8081
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true
# The internal converter used for offsets and config data is configurable and must be specified, but most users will
# always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# Topic to use for storing offsets. This topic should have many partitions and be replicated.
offset.storage.topic=connect-offsets
# Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated topic.
# You may need to manually create the topic to ensure single partition for the config topic as auto created topics may have multiple partitions.
config.storage.topic=connect-configs
# Topic to use for storing statuses. This topic can have multiple partitions and should be replicated.
status.storage.topic=connect-status
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
{"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max":"1",
"topics":"apphits",
"key.ignore":"true",
"connection.url":"http://10.204.xx.xx:9292"
"type.name":"kafka-connect" }}
First, thanks for your work. Frankly, I'm quite surprised of the amount of work required to connect the two (seemed quite trivial, functionality-wise).
Now, I'm having a really hard time getting this thing to work.
I've tried to follow http://docs.confluent.io/3.2.0/connect/connect-elasticsearch/docs/elasticsearch_connector.html (with the exception of I don't know what -avro- is, seems to be outdated, so I've just dropped it).
So the connector logs the following:
kafka-elasticsearch-connector_1 | [2017-03-28 15:10:13,079] INFO ConnectorConfig values:
kafka-elasticsearch-connector_1 | connector.class = io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
kafka-elasticsearch-connector_1 | key.converter = null
kafka-elasticsearch-connector_1 | name = elasticsearch-sink
kafka-elasticsearch-connector_1 | tasks.max = 2
kafka-elasticsearch-connector_1 | value.converter = null
kafka-elasticsearch-connector_1 | (org.apache.kafka.connect.runtime.ConnectorConfig:180)
note that neither topics
(from config/quickstart-elasticsearch.properties
, which I didn't find in ElasticsearchSinkConnectorConfig
) and topic.index.map
(which I added in the properties file, and the config class has them) are absent, and it doesn't help if I pre-create the index (which is unnecessary, I believe).
So, I get no error messages from any component and no hits in ES, please help.
I am seeing this upon indexing any record into ES:
[BulkProcessor@2073013577-1] WARN io.confluent.connect.elasticsearch.bulk.BulkProcessor - Failed to execute batch 1 of 1 records, retrying after 100 ms
org.apache.kafka.connect.errors.ConnectException: Bulk request failed: [{"type":"mapper_parsing_exception","reason":"failed to parse","caused_by":{"type":"not_x_content_exception","reason":"not_x_content_exception: Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes"}}]
at io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.execute(BulkProcessor.java:353)
at io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.call(BulkProcessor.java:326)
at io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.call(BulkProcessor.java:312)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
I want the connector to take the incoming payloads (which are string encoded json) and index them as is into ES, so I have set schema.ignore=true
and value.converter.schemas.enable=false
. I've added some debug info to see what is going wrong and noticed the payload that the ES connector ultimately indexes looks like this:
"{\"entry_point\": \"rabbitmq\", \"ts_posted\": \"2017-04-05T02:02:27.296757\", \"tracer_id\": \"eb1c8284-19a3-11e7-9555-0a7b84c6deb8\"}
The original value in the SinkRecord is a non-escaped String containing the json data, i.e.
{"entry_point": "rabbitmq", "ts_posted": "2017-04-05T02:02:27.296757", "tracer_id": "eb1c8284-19a3-11e7-9555-0a7b84c6deb8"}
ES expects this to be a JSON payload without escaping.
While debugging, I wrote a unit test to reproduce this. In ElasticsearchWriterTest, the test looks like this:
@Test
public void testWriterIgnoreSchemaStringRecord() throws Exception {
final boolean ignoreKey = true;
final boolean ignoreSchema = true;
String value = "{\"foo\":\"bar\"}";
Collection<SinkRecord> records = Collections.singletonList(new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, Schema.STRING_SCHEMA, value, 0));
ElasticsearchWriter writer = initWriter(client, ignoreKey, ignoreSchema);
writeDataAndRefresh(writer, records);
verifySearchResults(records, ignoreKey, ignoreSchema);
}
I dug further in the code and I believe the issue comes from this line:
When the JsonConverter is passed a string as a value and no schema, it will end up returning an escaped json string. It seems like if the value in the SinkRecord is already a string, and there is no schema or schemas are ignored, it should not use the JsonConverter?
I am using Kafka Connect to write logs into an Elasticsearch index.
Is it possible to set an index to output to that changes daily, i.e. logs-{date}
, similar to what logstash does, to prevent the ES index from getting too big?
I have had some success with the topic.index.map
field but I have only been able to pass in a string as the index map.
I'm trying to store a geo_point field within an elasticsearch index but the connector is not able to map the geo_point field. Following is my set-up:
POST's Body to the connector API:
{
"name": "khermeslocation20",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"topics": "khermeslocation20",
"type.name" : "khermeslocation20",
"key.ignore" : "true",
"connection.url" : "http://localhost:9200",
"tasks.max": "1"
}
}
Avro config
{
"type": "record",
"name": "locationschema",
"fields": [{"name": "name","type": "string"},{"name": "location","type":"string"}]
}
PUT y POST to create the ElasticSearch index:
curl -XPUT 'localhost:9200/khermeslocation20?pretty' -H 'Content-Type: application/json' -d'
{
"settings" : {
"index" : {
"number_of_shards" : 1,
"number_of_replicas" : 1
}
}
}'
curl -X POST "http://localhost:9200/khermeslocation20/khermeslocation5/_mapping" -d '{
"khermeslocation5" : {
"properties" : {
"name" : { "type" : "string"},
"location" : { "type" : "geo_point"}
}}
}'
Following is the connector exception
[2017-05-04 13:58:06,369] ERROR Task khermeslocation20-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask)
kafka-connect_1 | org.apache.kafka.connect.errors.ConnectException: Cannot create mapping {"khermeslocation20":{"properties":{"name":{"type":"string"},"location":{"type":"string"}}}} -- {"root_cause":[{"type$
:"illegal_argument_exception","reason":"mapper [location] cannot be changed from type [geo_point] to [string]"}],"type":"illegal_argument_exception","reason":"mapper [location] cannot be changed from type [geo_$
oint] to [string]"}
I've seen that the geo_point type is not included within the switch case of the Mapping clase. Does it mean that we can't use the geo_point yet? Would it make sense to modify the code to include this type?
Let's track known issues here.
Schema.Type.STRING
to string
fails because is is not a valid ES type anymore (https://www.elastic.co/guide/en/elasticsearch/reference/current/breaking_50_mapping_changes.html)I want to use an alias to insert into elastic using the pattern described here:
https://www.elastic.co/blog/managing-time-based-indices-efficiently
However I'm finding that the connector fails on startup because it is attempting to create an index with the alias name.
On startup, the connector checks for the existence of an index but it should also check whether an alias with the same name already exists before attempting to create a new index
I can't download the artifact from Confluent's Maven repo (version 3.0.0). Is it uploaded?
This is the URL from which I'm trying to connect.
Hi,
I hive use kafka connector to write data to es,kafka version is 0.10.1.1 and es version is 5.2.1,but when i start connect-standalone i hive get a error:
[pool-1-thread-1] ERROR org.apache.kafka.connect.runtime.WorkerTask - Task kafka-to-elasticsearch-1-0 threw an uncaught and unrecoverable exception
com.google.gson.JsonSyntaxException: com.google.gson.stream.MalformedJsonException: Use JsonReader.setLenient(true) to accept malformed JSON at line 1 column 5 path $
at com.google.gson.JsonParser.parse(JsonParser.java:65)
at com.google.gson.JsonParser.parse(JsonParser.java:45)
at io.searchbox.action.AbstractAction.parseResponseBody(AbstractAction.java:94)
at io.searchbox.action.AbstractAction.createNewElasticSearchResult(AbstractAction.java:65)
at io.searchbox.action.GenericResultAbstractAction.createNewElasticSearchResult(GenericResultAbstractAction.java:20)
at io.searchbox.client.http.JestHttpClient.deserializeResponse(JestHttpClient.java:131)
at io.searchbox.client.http.JestHttpClient.execute(JestHttpClient.java:50)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.createIndicesForTopics(ElasticsearchWriter.java:248)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.open(ElasticsearchSinkTask.java:113)
at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:431)
at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1000(WorkerSinkTask.java:55)
at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:467)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:317)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:235)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.google.gson.stream.MalformedJsonException: Use JsonReader.setLenient(true) to accept malformed JSON at line 1 column 5 path $
at com.google.gson.stream.JsonReader.syntaxError(JsonReader.java:1573)
at com.google.gson.stream.JsonReader.checkLenient(JsonReader.java:1423)
at com.google.gson.stream.JsonReader.doPeek(JsonReader.java:546)
at com.google.gson.stream.JsonReader.peek(JsonReader.java:429)
at com.google.gson.JsonParser.parse(JsonParser.java:60)
... 28 more
[pool-1-thread-1] ERROR org.apache.kafka.connect.runtime.WorkerTask - Task is being killed and will not recover until manually restarted
bootstrap.servers=es-1:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
name=kafka-to-elasticsearch-1
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=connect-test-3
topic.index.map=connect-test-3:myindex
key.ignore=true
topic.schema.ignore=ture
connection.url=http://es-2:9200,http://es-3:9200
type.name=kafka-connect
and who can tell what the problem is .thanks
I am trying to put this connector to use in a situation where a number of Logstash agents would send their events to Kafka, and this connector would pump them into Elastic and so available on a Kibana dashboard... The json messages/events vary a great deal so I do not want any rigid schemas. Here is what I have done as a simple test
(1) start up ZK & Kafka the usual way
(2)
./bin/connect-standalone etc/test-connect-standalone.properties etc/kafka-connect-elasticsearch/test-elasticsearch.properties
test-connect-standalone.properties has been modified so:
key.converter.schemas.enable=false
value.converter.schemas.enable=false
Without the above I get errors about 'json deserialization...'
test-elasticsearch.properties has been modified so (not sure if needed to actually)
schema.ignore=true
(3) As I pump messages into Kafka (via Logstash) they do come up in the elastic index. But unfortunately the simple json that Logstash puts together ends up under a '_children' sub object !! This means that Kibana does not find the '@timestamp' field to work seamlessly...
...
"_score": 1,
"_source": {
"_children": {
"f1": {
"_value": "value1"
}
},
...
The whole real json message sits under this '_children' sub object. Is there a configurable way to get rid of this '_children' tag and put the message directly under '_source' ?
Thanks
Ashok
Hi!
Sometimes it's useful to take timestamp from kafka records as version number instead partition offset.
For example when you add partitions in a target topic.
The following loop never terminates due to the middle condition always being true
.
Hi,
While running kafka-connect-elasticsearch in distributed mode, we get a lot of warnings about version conflicts:
WARN Ignoring version conflicts for items: [Key{access-logs/from-kafka/normalized-logs+0+7542000}, Key{access-logs/from-kafka/normalized-logs+0+7542001}, Key{access-logs/from-kafka/normalized-logs+0+7542002} (...)
Then, if I check one of the document in elasticsearch, I have:
$ curl 'localhost:9200/access-logs/from-kafka/normalized-logs%2B0%2B7542000?pretty=true'
{
"_index" : "access-logs",
"_type" : "from-kafka",
"_id" : "normalized-logs+0+7542000",
"_version" : 7542000,
"found" : true,
"_source" : {
...
were the version number (which, from the elasticsearch documentation is the number of versions) is equal to the offset in the partition (key.ignore=true in our configuration). This is unexpected and difficult to understand.
This is the configuration used to setup kafka-connect-elasticsearch:
$ curl -XPOST -H 'Content-Type: application/json' kafka-connect:8083/connectors -d '{
"name": "kafka-connect-elasticsearch-alogs",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "3",
"topic.index.map": "normalized-logs:access-logs",
"schema.ignore": true,
"key.ignore": true,
"topics": "normalized-logs",
"connection.url": "http://elasticsearch:9200",
"type.name": "from-kafka"
}}'
Any idea why we get such a strange result?
Is it just too early to use the kafka-connect-elasticsearch connector in distributed mode?
(by the way, we index in elasticsearch 5, which is not officially supported by the jest client (but the index api didn't change)).
Maven failed on missing dependencies due to the -rc1
version suffix being appended to the version number for Kafka itself in pom.xml
. This should be straightforward to fix and avoid others wasting as much time as me.
I noticed that documents indexed in elasticsearch have their ids in the following format topic+partition+offset
.
I would prefer to use id's generated by elasticsearch. It seems topic+partition+offset
is not usually unique so I am loosing data.
How can I change that?
Hello, is there any plan to support multiple types of kafka messages from one single topic?
It can be handled by copying the type from the structured kafka message to the _type
meta field while indexing into elasticsearch.
Using the _type
acts like name-spacing the messages in the elasticsearch and each type of data can therefore have its own mapping (which can be dynamic). This will resolve the mapping conflict problem while indexing.
This feature would be very appreciated because it will allow us to add new types of messages without doing any modification in the configuration of this connector nor in the elasticsearch.
[2016-10-07 08:59:13,507] ERROR Task elastic-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:401)
org.apache.kafka.connect.errors.ConnectException: Cannot create mapping {"sample":{"type":"string"}} -- {"root_cause":[{"type":"mapper_parsing_exception","reason":"Root mapping definition has unsupported parameters: [type : string]"}],"type":"mapper_parsing_exception","reason":"Root mapping definition has unsupported parameters: [type : string]"}
at io.confluent.connect.elasticsearch.Mapping.createMapping(Mapping.java:64)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:200)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:119)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:381)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:227)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
I am using ES version 2.3.4 and kafka 0.10.0.1. So i downgraded the version of elasticsearch,lucene and kafka according to my platform and build JAR. Below are my connector properties
type.name=sample
topics=sensor-in
key.ignore=true
value.ignore=true
connection.url=http://A.B.C.D:9200/
I tested two cases
But both cases i am getting above exception.
Thank You
i was trying to use "org.apache.kafka.connect.storage.stringconverter" instead of json converter for following properties:
key.converter
value.converter
and it throws out exceptions :
Exception in thread "main" org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.connect.storage.stringconverter for configuration key.converter: Class org.apache.kafka.connect.storage.stringconverter could not be found.
at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:690)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:433)
at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:56)
at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:63)
at org.apache.kafka.connect.runtime.WorkerConfig.(WorkerConfig.java:163)
at org.apache.kafka.connect.runtime.standalone.StandaloneConfig.(StandaloneConfig.java:43)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:69)
let me know if its possible to use string instead of json
Hi!
Sink task has failed if data converter throws exception:
https://github.com/confluentinc/kafka-connect-elasticsearch/blob/master/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchWriter.java#L209-L211
Sometimes is very usefull to drop this kind of messages and write error in log.
Is it possible to add option to change task behaviour in this situation?
Hello, I've been using this connector. And I need to add or delete topics frequently.
Actually, I know one solution to update topic configuration.
That is to use Kafka connect REST API,
"PUT /connectors/{name}/config - update the configuration parameters for a specific connector".
But it takes too much time because it updates total configuration and rebalances the partitions.
I need the faster solution to update topics exclusively without restarting connector.
Do you have any idea?
I tried finding a pre-built .jar, but wasn't able to find one. Is there one available?
When ES Sink bulk insertion and commit offsets happens , there are exceptions. Created a gist . Pleas see the exceptions
org.apache.kafka.connect.errors.ConnectException: Flush failed with non retriable exception.
at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:319)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.flush(ElasticsearchSinkTask.java:137)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:275)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:155)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.connect.errors.ConnectException: Cannot finish flush messages within 10000
at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:314)
... 11 more
[2016-08-03 03:05:21,942] ERROR Rewinding offsets to last committed offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:278)
[2016-08-03 03:05:21,942] ERROR Commit of WorkerSinkTask{id=elasticsearch-sink-0} offsets threw an unexpected exception: (org.apache.kafka.connect.runtime.WorkerSinkTask:180)
org.apache.kafka.connect.errors.ConnectException: Flush failed with non retriable exception.
at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:319)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.flush(ElasticsearchSinkTask.java:137)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:275)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:155)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.connect.errors.ConnectException: Cannot finish flush messages within 10000
at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:314)
... 11 more
[2016-08-03 03:05:21,953] ERROR Invalid call to OffsetStorageWriter flush() while already flushing, the framework should not allow this (org.apache.kafka.connect.storage.OffsetStorageWriter:110)
[2016-08-03 03:05:21,953] ERROR Unhandled exception when committing WorkerSourceTask{id=sensor-mqtt-connect-0}: (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:115)
org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing
at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:112)
at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:279)
at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:107)
at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access$000(SourceTaskOffsetCommitter.java:44)
at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:73)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2016-08-03 03:05:21,967] WARN Failed to execute bulk request java.util.ConcurrentModificationException. (io.confluent.connect.elasticsearch.internals.BulkProcessor:159)
[2016-08-03 03:05:21,969] WARN Failed to execute bulk request java.util.ConcurrentModificationException. (io.confluent.connect.elasticsearch.internals.BulkProcessor:159)
[2016-08-03 03:05:21,971] WARN Failed to execute bulk request java.util.ConcurrentModificationException. (io.confluent.connect.elasticsearch.internals.BulkProcessor:159)
[2016-08-03 03:05:31,955] ERROR Invalid call to OffsetStorageWriter flush() while already flushing, the framework should not allow this (org.apache.kafka.connect.storage.OffsetStorageWriter:110)
[2016-08-03 03:05:31,955] ERROR Unhandled exception when committing WorkerSourceTask{id=sensor-mqtt-connect-0}: (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:115)
org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing
at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:112)
at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:279)
at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:107)
at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access$000(SourceTaskOffsetCommitter.java:44)
at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:73)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2016-08-03 03:05:31,991] ERROR Commit of WorkerSinkTask{id=elasticsearch-sink-0} offsets failed due to exception while flushing: (org.apache.kafka.connect.runtime.WorkerSinkTask:277)
org.apache.kafka.connect.errors.ConnectException: Flush failed with non retriable exception.
at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:319)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.flush(ElasticsearchSinkTask.java:137)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:275)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:155)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.connect.errors.ConnectException: Cannot finish flush messages within 10000
at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:314)
... 11 more
[2016-08-03 03:05:31,991] ERROR Rewinding offsets to last committed offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:278)
[2016-08-03 03:05:31,991] ERROR Commit of WorkerSinkTask{id=elasticsearch-sink-0} offsets threw an unexpected exception: (org.apache.kafka.connect.runtime.WorkerSinkTask:180)
org.apache.kafka.connect.errors.ConnectException: Flush failed with non retriable exception.
at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:319)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.flush(ElasticsearchSinkTask.java:137)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:275)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:155)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.connect.errors.ConnectException: Cannot finish flush messages within 10000
at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:314)
... 11 more
[2016-08-03 03:05:41,956] ERROR Invalid call to OffsetStorageWriter flush() while already flushing, the framework should not allow this (org.apache.kafka.connect.storage.OffsetStorageWriter:110)
[2016-08-03 03:05:41,956] ERROR Unhandled exception when committing WorkerSourceTask{id=sensor-mqtt-connect-0}: (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:115)
org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing
at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:112)
at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:279)
at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:107)
at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access$000(SourceTaskOffsetCommitter.java:44)
at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:73)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2016-08-03 03:05:42,015] ERROR Commit of WorkerSinkTask{id=elasticsearch-sink-0} offsets failed due to exception while flushing: (org.apache.kafka.connect.runtime.WorkerSinkTask:277)
org.apache.kafka.connect.errors.ConnectException: Flush failed with non retriable exception.
at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:319)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.flush(ElasticsearchSinkTask.java:137)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:275)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:155)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.connect.errors.ConnectException: Cannot finish flush messages within 10000
at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:314)
... 11 more
[2016-08-03 03:05:42,015] ERROR Rewinding offsets to last committed offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:278)
[2016-08-03 03:05:42,016] ERROR Commit of WorkerSinkTask{id=elasticsearch-sink-0} offsets threw an unexpected exception: (org.apache.kafka.connect.runtime.WorkerSinkTask:180)
org.apache.kafka.connect.errors.ConnectException: Flush failed with non retriable exception.
at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:319)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.flush(ElasticsearchSinkTask.java:137)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:275)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:155)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.connect.errors.ConnectException: Cannot finish flush messages within 10000
at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:314)
... 11 more
[2016-08-03 03:05:42,026] WARN Failed to execute bulk request java.util.ConcurrentModificationException. (io.confluent.connect.elasticsearch.internals.BulkProcessor:159)
[2016-08-03 03:05:51,956] ERROR Invalid call to OffsetStorageWriter flush() while already flushing, the framework should not allow this (org.apache.kafka.connect.storage.OffsetStorageWriter:110)
[2016-08-03 03:05:51,956] ERROR Unhandled exception when committing WorkerSourceTask{id=sensor-mqtt-connect-0}: (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:115)
org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing
at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:112)
at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:279)
at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:107)
at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access$000(SourceTaskOffsetCommitter.java:44)
at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:73)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2016-08-03 03:05:52,075] ERROR Commit of WorkerSinkTask{id=elasticsearch-sink-0} offsets failed due to exception while flushing: (org.apache.kafka.connect.runtime.WorkerSinkTask:277)
org.apache.kafka.connect.errors.ConnectException: Flush failed with non retriable exception.
at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:319)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.flush(ElasticsearchSinkTask.java:137)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:275)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:155)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.connect.errors.ConnectException: Cannot finish flush messages within 10000
at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:314)
... 11 more
[2016-08-03 03:05:52,076] ERROR Rewinding offsets to last committed offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:278)
[2016-08-03 03:05:52,076] ERROR Commit of WorkerSinkTask{id=elasticsearch-sink-0} offsets threw an unexpected exception: (org.apache.kafka.connect.runtime.WorkerSinkTask:180)
org.apache.kafka.connect.errors.ConnectException: Flush failed with non retriable exception.
at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:319)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.flush(ElasticsearchSinkTask.java:137)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:275)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:155)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.connect.errors.ConnectException: Cannot finish flush messages within 10000
at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:314)
Thanks,
Hi,
Is it possible, instead of passing a list of topics in "topics" config, to pass a template (ex: event.member.* will match all kafka topics starting with event.member. and place them in the corresponding index), which would also scan for newly ceated topics?
Hi,
I'm keen to try this connector out. I cloned the repo and ran mvn clean package
but hit a failure
Failed to execute goal on project kafka-connect-elasticsearch:
Could not resolve dependencies for project io.confluent:kafka-connect-elasticsearch:jar:3.1.0-SNAPSHOT:
The following artifacts could not be resolved:
org.apache.kafka:connect-api:jar:0.10.1.0-SNAPSHOT,
org.apache.kafka:connect-json:jar:0.10.1.0-SNAPSHOT:
Could not find artifact org.apache.kafka:connect-api:jar:0.10.1.0-SNAPSHOT in confluent (http://packages.confluent.io/maven/)
I'm new to building packages etc, but happy to give it a go :) - can you point me towards how I can resolve these dependencies? Thanks!
If I create an index template with dynamic mappings, they are ignored if I start the connector from scratch, but honoured if the connector is already running and I delete the index and then create new data to force its recreation.
I'll try and create a test case to reproduce, but is there something about how the connector initialises itself with Elasticsearch at startup that would explain this behaviour?
thanks.
I'm thinking about a dynamic index creation. The current cfg mapping is
topicname -> indexname (auto infer ES mapping)
What I need is something like that
topicname -> indexname-YYYYMMDD (template mapping indexname-* )
it is a sort of daily sharding of data, useful for dashboarding a huge set of infos. The data are essentially facts, so yesterday data won't be updated and consequently reindexed after midnight.
the field I intend to use for the index creation is the timestamp in the kafka message, so reprocessing data will be solid and safe.
The current master HEAD (0b590cd) does not build. It depends on SNAPSHOT versions of the Confluent Platform libraries (3.2.0-SNAPSHOT) and Kafka (0.10.2.0-SNAPSHOT), neither of which are available on the public repository.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.