Giter Site home page Giter Site logo

confluentinc / kafka-connect-elasticsearch Goto Github PK

View Code? Open in Web Editor NEW
752.0 309.0 431.0 2.71 MB

Kafka Connect Elasticsearch connector

License: Other

Java 97.05% HTML 0.66% Shell 2.07% Dockerfile 0.22%
confluent kafka kafka-connector kafka-connect elasticsearch

kafka-connect-elasticsearch's Introduction

Kafka Connect Elasticsearch Connector

FOSSA Status

Changelog for this connector can be found here.

kafka-connect-elasticsearch is a Kafka Connector for copying data between Kafka and Elasticsearch.

Development

To build a development version you'll need a recent version of Kafka as well as a set of upstream Confluent projects, which you'll have to build from their appropriate snapshot branch. See the FAQ for guidance on this process.

You can build kafka-connect-elasticsearch with Maven using the standard lifecycle phases.

Configuring

Creating an Elasticsearch user and assigning required privileges

Create an Elasticsearch role

curl -u elastic:elastic -X POST "localhost:9200/_security/role/es_sink_connector_role?pretty" -H 'Content-Type: application/json' -d'
{
  "indices": [
    {
      "names": [ "*" ],
      "privileges": ["create_index", "read", "write", "view_index_metadata"]
    }
  ]
}'

Create an Elasticsearch user

curl -u elastic:elastic -X POST "localhost:9200/_security/user/es_sink_connector_user?pretty" -H 'Content-Type: application/json' -d'
{
  "password" : "seCret-secUre-PaSsW0rD",
  "roles" : [ "es_sink_connector_role" ]
}'

Contribute

License

This project is licensed under the Confluent Community License.

FOSSA Status

kafka-connect-elasticsearch's People

Contributors

amuamushu avatar andrewegel avatar arihant-confluent avatar avocader avatar c0urante avatar cjolivier01 avatar confluentjenkins avatar confluentsemaphore avatar cyrusv avatar erdody avatar ewencp avatar ishiihara avatar joel-hamill avatar kganeshdatta avatar kkonstantine avatar maxzheng avatar ncliang avatar patrick-premont avatar pbadani avatar poojakuntalcflt avatar purbon avatar rayokota avatar rhauch avatar shikhar avatar snehashisp avatar sp-gupta avatar subhashiyer9 avatar sudeshwasnik avatar wicknicks avatar xiangxin72 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-connect-elasticsearch's Issues

mapper_parsing_exception with string records containing json

I am seeing this upon indexing any record into ES:

[BulkProcessor@2073013577-1] WARN io.confluent.connect.elasticsearch.bulk.BulkProcessor - Failed to execute batch 1 of 1 records, retrying after 100 ms
org.apache.kafka.connect.errors.ConnectException: Bulk request failed: [{"type":"mapper_parsing_exception","reason":"failed to parse","caused_by":{"type":"not_x_content_exception","reason":"not_x_content_exception: Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes"}}]
        at io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.execute(BulkProcessor.java:353)
        at io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.call(BulkProcessor.java:326)
        at io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.call(BulkProcessor.java:312)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

I want the connector to take the incoming payloads (which are string encoded json) and index them as is into ES, so I have set schema.ignore=true and value.converter.schemas.enable=false. I've added some debug info to see what is going wrong and noticed the payload that the ES connector ultimately indexes looks like this:

"{\"entry_point\": \"rabbitmq\", \"ts_posted\": \"2017-04-05T02:02:27.296757\", \"tracer_id\": \"eb1c8284-19a3-11e7-9555-0a7b84c6deb8\"}

The original value in the SinkRecord is a non-escaped String containing the json data, i.e.

{"entry_point": "rabbitmq", "ts_posted": "2017-04-05T02:02:27.296757", "tracer_id": "eb1c8284-19a3-11e7-9555-0a7b84c6deb8"}

ES expects this to be a JSON payload without escaping.
While debugging, I wrote a unit test to reproduce this. In ElasticsearchWriterTest, the test looks like this:

  @Test
  public void testWriterIgnoreSchemaStringRecord() throws Exception {
    final boolean ignoreKey = true;
    final boolean ignoreSchema = true;

    String value = "{\"foo\":\"bar\"}";
    Collection<SinkRecord> records = Collections.singletonList(new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, Schema.STRING_SCHEMA, value, 0));
    ElasticsearchWriter writer = initWriter(client, ignoreKey, ignoreSchema);
    writeDataAndRefresh(writer, records);
    verifySearchResults(records, ignoreKey, ignoreSchema);
  }

I dug further in the code and I believe the issue comes from this line:

final String payload = new String(JSON_CONVERTER.fromConnectData(record.topic(), schema, value), StandardCharsets.UTF_8);

When the JsonConverter is passed a string as a value and no schema, it will end up returning an escaped json string. It seems like if the value in the SinkRecord is already a string, and there is no schema or schemas are ignored, it should not use the JsonConverter?

Match all topics with pattern

Hi,

Is it possible, instead of passing a list of topics in "topics" config, to pass a template (ex: event.member.* will match all kafka topics starting with event.member. and place them in the corresponding index), which would also scan for newly ceated topics?

why this "_children" object?

I am trying to put this connector to use in a situation where a number of Logstash agents would send their events to Kafka, and this connector would pump them into Elastic and so available on a Kibana dashboard... The json messages/events vary a great deal so I do not want any rigid schemas. Here is what I have done as a simple test

(1) start up ZK & Kafka the usual way

(2)

./bin/connect-standalone etc/test-connect-standalone.properties etc/kafka-connect-elasticsearch/test-elasticsearch.properties

test-connect-standalone.properties has been modified so:

key.converter.schemas.enable=false
value.converter.schemas.enable=false

Without the above I get errors about 'json deserialization...'

test-elasticsearch.properties has been modified so (not sure if needed to actually)

schema.ignore=true

(3) As I pump messages into Kafka (via Logstash) they do come up in the elastic index. But unfortunately the simple json that Logstash puts together ends up under a '_children' sub object !! This means that Kibana does not find the '@timestamp' field to work seamlessly...

...
"_score": 1,
"_source": {
"_children": {
"f1": {
"_value": "value1"
}
},
...

The whole real json message sits under this '_children' sub object. Is there a configurable way to get rid of this '_children' tag and put the message directly under '_source' ?

Thanks
Ashok

Exception when commiting offsets, Bulk Insert

When ES Sink bulk insertion and commit offsets happens , there are exceptions. Created a gist . Pleas see the exceptions

org.apache.kafka.connect.errors.ConnectException: Flush failed with non retriable exception.
    at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:319)
    at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.flush(ElasticsearchSinkTask.java:137)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:275)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:155)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.connect.errors.ConnectException: Cannot finish flush messages within 10000
    at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:314)
    ... 11 more
[2016-08-03 03:05:21,942] ERROR Rewinding offsets to last committed offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:278)
[2016-08-03 03:05:21,942] ERROR Commit of WorkerSinkTask{id=elasticsearch-sink-0} offsets threw an unexpected exception:  (org.apache.kafka.connect.runtime.WorkerSinkTask:180)
org.apache.kafka.connect.errors.ConnectException: Flush failed with non retriable exception.
    at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:319)
    at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.flush(ElasticsearchSinkTask.java:137)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:275)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:155)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.connect.errors.ConnectException: Cannot finish flush messages within 10000
    at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:314)
    ... 11 more
[2016-08-03 03:05:21,953] ERROR Invalid call to OffsetStorageWriter flush() while already flushing, the framework should not allow this (org.apache.kafka.connect.storage.OffsetStorageWriter:110)
[2016-08-03 03:05:21,953] ERROR Unhandled exception when committing WorkerSourceTask{id=sensor-mqtt-connect-0}:  (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:115)
org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing
    at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:112)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:279)
    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:107)
    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access$000(SourceTaskOffsetCommitter.java:44)
    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:73)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
[2016-08-03 03:05:21,967] WARN Failed to execute bulk request java.util.ConcurrentModificationException. (io.confluent.connect.elasticsearch.internals.BulkProcessor:159)
[2016-08-03 03:05:21,969] WARN Failed to execute bulk request java.util.ConcurrentModificationException. (io.confluent.connect.elasticsearch.internals.BulkProcessor:159)
[2016-08-03 03:05:21,971] WARN Failed to execute bulk request java.util.ConcurrentModificationException. (io.confluent.connect.elasticsearch.internals.BulkProcessor:159)
[2016-08-03 03:05:31,955] ERROR Invalid call to OffsetStorageWriter flush() while already flushing, the framework should not allow this (org.apache.kafka.connect.storage.OffsetStorageWriter:110)
[2016-08-03 03:05:31,955] ERROR Unhandled exception when committing WorkerSourceTask{id=sensor-mqtt-connect-0}:  (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:115)
org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing
    at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:112)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:279)
    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:107)
    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access$000(SourceTaskOffsetCommitter.java:44)
    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:73)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
[2016-08-03 03:05:31,991] ERROR Commit of WorkerSinkTask{id=elasticsearch-sink-0} offsets failed due to exception while flushing: (org.apache.kafka.connect.runtime.WorkerSinkTask:277)
org.apache.kafka.connect.errors.ConnectException: Flush failed with non retriable exception.
    at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:319)
    at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.flush(ElasticsearchSinkTask.java:137)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:275)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:155)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.connect.errors.ConnectException: Cannot finish flush messages within 10000
    at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:314)
    ... 11 more
[2016-08-03 03:05:31,991] ERROR Rewinding offsets to last committed offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:278)
[2016-08-03 03:05:31,991] ERROR Commit of WorkerSinkTask{id=elasticsearch-sink-0} offsets threw an unexpected exception:  (org.apache.kafka.connect.runtime.WorkerSinkTask:180)
org.apache.kafka.connect.errors.ConnectException: Flush failed with non retriable exception.
    at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:319)
    at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.flush(ElasticsearchSinkTask.java:137)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:275)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:155)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.connect.errors.ConnectException: Cannot finish flush messages within 10000
    at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:314)
    ... 11 more
[2016-08-03 03:05:41,956] ERROR Invalid call to OffsetStorageWriter flush() while already flushing, the framework should not allow this (org.apache.kafka.connect.storage.OffsetStorageWriter:110)
[2016-08-03 03:05:41,956] ERROR Unhandled exception when committing WorkerSourceTask{id=sensor-mqtt-connect-0}:  (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:115)
org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing
    at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:112)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:279)
    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:107)
    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access$000(SourceTaskOffsetCommitter.java:44)
    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:73)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
[2016-08-03 03:05:42,015] ERROR Commit of WorkerSinkTask{id=elasticsearch-sink-0} offsets failed due to exception while flushing: (org.apache.kafka.connect.runtime.WorkerSinkTask:277)
org.apache.kafka.connect.errors.ConnectException: Flush failed with non retriable exception.
    at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:319)
    at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.flush(ElasticsearchSinkTask.java:137)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:275)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:155)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.connect.errors.ConnectException: Cannot finish flush messages within 10000
    at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:314)
    ... 11 more
[2016-08-03 03:05:42,015] ERROR Rewinding offsets to last committed offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:278)
[2016-08-03 03:05:42,016] ERROR Commit of WorkerSinkTask{id=elasticsearch-sink-0} offsets threw an unexpected exception:  (org.apache.kafka.connect.runtime.WorkerSinkTask:180)
org.apache.kafka.connect.errors.ConnectException: Flush failed with non retriable exception.
    at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:319)
    at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.flush(ElasticsearchSinkTask.java:137)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:275)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:155)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.connect.errors.ConnectException: Cannot finish flush messages within 10000
    at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:314)
    ... 11 more
[2016-08-03 03:05:42,026] WARN Failed to execute bulk request java.util.ConcurrentModificationException. (io.confluent.connect.elasticsearch.internals.BulkProcessor:159)
[2016-08-03 03:05:51,956] ERROR Invalid call to OffsetStorageWriter flush() while already flushing, the framework should not allow this (org.apache.kafka.connect.storage.OffsetStorageWriter:110)
[2016-08-03 03:05:51,956] ERROR Unhandled exception when committing WorkerSourceTask{id=sensor-mqtt-connect-0}:  (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:115)
org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing
    at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:112)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:279)
    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:107)
    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access$000(SourceTaskOffsetCommitter.java:44)
    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:73)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
[2016-08-03 03:05:52,075] ERROR Commit of WorkerSinkTask{id=elasticsearch-sink-0} offsets failed due to exception while flushing: (org.apache.kafka.connect.runtime.WorkerSinkTask:277)
org.apache.kafka.connect.errors.ConnectException: Flush failed with non retriable exception.
    at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:319)
    at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.flush(ElasticsearchSinkTask.java:137)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:275)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:155)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.connect.errors.ConnectException: Cannot finish flush messages within 10000
    at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:314)
    ... 11 more
[2016-08-03 03:05:52,076] ERROR Rewinding offsets to last committed offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:278)
[2016-08-03 03:05:52,076] ERROR Commit of WorkerSinkTask{id=elasticsearch-sink-0} offsets threw an unexpected exception:  (org.apache.kafka.connect.runtime.WorkerSinkTask:180)
org.apache.kafka.connect.errors.ConnectException: Flush failed with non retriable exception.
    at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:319)
    at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.flush(ElasticsearchSinkTask.java:137)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:275)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:155)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.connect.errors.ConnectException: Cannot finish flush messages within 10000
    at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:314)

Thanks,

JestClient HTTP timeouts

We aren't currently setting connection and read timeouts on the HTTP client. I believe the default is infinite.

topic to index mapping

Hi There,

First of all thanks for the work on this connector - looking forward to getting it up and running.

In Kafka I have my topics created with an uppercase naming, so when starting up the ES sink connector it fails as per the following...that's ok, I get that:

[2016-07-30 10:49:31,717][DEBUG][action.admin.indices.create] [Mandrill] [REP-SOE.CUSTOMERS] failed to create
[REP-SOE.CUSTOMERS] InvalidIndexNameException[Invalid index name [REP-SOE.CUSTOMERS], must be lowercase]

So I thought to map between the topic and the index as per the following:

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=REP-SOE.CUSTOMERS
key.ignore=true
connection.url=http://localhost:9200
type.name=kafka-connect
topic.index.map=rep-soe.customers

But I get this error:

 (io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig:178)
[2016-07-30 10:53:17,941] ERROR Task elasticsearch-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
java.lang.ArrayIndexOutOfBoundsException: 1

Can you please explain how to configure this mapping properly (that is between REP-SOE.CUSTOMERS (topic) and the rep-soe.customers (index)).

Note also that I have not created this index in Elastic as I saw that the sink connector does this in my initial tests. Would it still create a new index when these mappings are used?

Thanks for your help here.

Unexpected error when registering ES connect

I have an MQTT connector configured in a connect-distributed environment. Then i tried to setup ES Sink connector. I am getting below exception.
screenshot from 2016-08-03 08 23 19

My ES Sink configuration is like this
screenshot from 2016-08-03 08 25 22
Due to this the connect interface itself is failing. Any pointers on this ? Thanks

Create a daily index

I'm thinking about a dynamic index creation. The current cfg mapping is
topicname -> indexname (auto infer ES mapping)

What I need is something like that

topicname -> indexname-YYYYMMDD (template mapping indexname-* )

it is a sort of daily sharding of data, useful for dashboarding a huge set of infos. The data are essentially facts, so yesterday data won't be updated and consequently reindexed after midnight.

the field I intend to use for the index creation is the timestamp in the kafka message, so reprocessing data will be solid and safe.

Shield Auth Support

Is there support for HTTP authentication with an Elasticsearch cluster protected by Shield?

If not, is there a way of handling the authentication elsewhere, i.e. an Elasticsearch client node or similar?

ES Transport Client for bulkprocessor

Hi, I saw the current ES connector is using jest as the ES client.
I have to 2 questions:

  1. Is there any plan to add ES transport-layer-client support?
  2. What about ES SSL-support, e.g. searchguard or shield?

Broken build

The current master HEAD (0b590cd) does not build. It depends on SNAPSHOT versions of the Confluent Platform libraries (3.2.0-SNAPSHOT) and Kafka (0.10.2.0-SNAPSHOT), neither of which are available on the public repository.

Create mapping exception

[2016-10-07 08:59:13,507] ERROR Task elastic-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:401)
org.apache.kafka.connect.errors.ConnectException: Cannot create mapping {"sample":{"type":"string"}} -- {"root_cause":[{"type":"mapper_parsing_exception","reason":"Root mapping definition has unsupported parameters:  [type : string]"}],"type":"mapper_parsing_exception","reason":"Root mapping definition has unsupported parameters:  [type : string]"}
        at io.confluent.connect.elasticsearch.Mapping.createMapping(Mapping.java:64)
        at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:200)
        at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:119)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:381)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:227)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

I am using ES version 2.3.4 and kafka 0.10.0.1. So i downgraded the version of elasticsearch,lucene and kafka according to my platform and build JAR. Below are my connector properties

type.name=sample
topics=sensor-in
key.ignore=true
value.ignore=true
connection.url=http://A.B.C.D:9200/

I tested two cases

  • Index and Type is already exists in ES cluster
  • Connector will create Index and Type with mapping

But both cases i am getting above exception.
Thank You

Problems with geo_point

I'm trying to store a geo_point field within an elasticsearch index but the connector is not able to map the geo_point field. Following is my set-up:

POST's Body to the connector API:

{
    "name": "khermeslocation20",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "topics": "khermeslocation20",
        "type.name" : "khermeslocation20",
        "key.ignore" : "true",
        "connection.url" : "http://localhost:9200",
        "tasks.max": "1"
    }
}

Avro config

{
	"type": "record",
	"name": "locationschema",
	"fields": [{"name": "name","type": "string"},{"name": "location","type":"string"}]
}

PUT y POST to create the ElasticSearch index:

curl -XPUT 'localhost:9200/khermeslocation20?pretty' -H 'Content-Type: application/json' -d'
{
    "settings" : {
        "index" : {
            "number_of_shards" : 1, 
            "number_of_replicas" : 1 
        }
    }
}'
curl -X POST "http://localhost:9200/khermeslocation20/khermeslocation5/_mapping" -d '{
   "khermeslocation5" : {
   "properties" : {
   	   "name" : { "type" : "string"},
       "location" : { "type" : "geo_point"}
   }}
}'

Following is the connector exception

[2017-05-04 13:58:06,369] ERROR Task khermeslocation20-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask)
kafka-connect_1    | org.apache.kafka.connect.errors.ConnectException: Cannot create mapping {"khermeslocation20":{"properties":{"name":{"type":"string"},"location":{"type":"string"}}}} -- {"root_cause":[{"type$
:"illegal_argument_exception","reason":"mapper [location] cannot be changed from type [geo_point] to [string]"}],"type":"illegal_argument_exception","reason":"mapper [location] cannot be changed from type [geo_$
oint] to [string]"}

I've seen that the geo_point type is not included within the switch case of the Mapping clase. Does it mean that we can't use the geo_point yet? Would it make sense to modify the code to include this type?

POST /subjects/[topic]-value request to Schema Registry receive 404

I have just upgraded our Kafka cluster to 3.1.1 to take advantage the new Elasticsearch connector.
Unfortunately when I start the Elasticsearch connector it fails with a Schema Registry error and looking at the logs of the Schema Registry I see that the connector made a request to /subjects/[topic]-value, which doesn't exist, so it receives a 404, causing the connector to fail.

Why is the connector making a POST request to /subjects/[topic]-value ?
What is this -value thing, where is it coming from?

Some of the logs:

  1. the connector:
[2016-12-16 20:40:28,610] ERROR Task ES-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
org.apache.kafka.connect.errors.DataException: Failed to deserialize data to Avro: 
	at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:109)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:358)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:239)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 441
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401
	at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:170)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:187)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:215)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:206)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:200)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersionFromRegistry(CachedSchemaRegistryClient.java:69)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:161)
	at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:149)
	at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:190)
	at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:130)
	at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:99)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:358)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:239)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
[2016-12-16 20:40:28,612] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:143)
  1. The Schema registry (observer the response status of 404):
    Fri Dec 16 20:40:28 UTC 2016: [2016-12-16 20:40:28,566] INFO 10.206.xx.xx - - [16/Dec/2016:20:40:28 +0000] "POST /subjects/apphits-value HTTP/1.1" 404 51 3 (io.confluent.rest-utils.requests:77)

  2. Connect-distributed.properties:

# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=10.206.xx.xx:9092,10.206.xx.xx:9092

# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
group.id=frbi-connect-cluster

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://10.206.xx.xx:8081

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://10.206.xx.xx:8081

# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# The internal converter used for offsets and config data is configurable and must be specified, but most users will
# always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# Topic to use for storing offsets. This topic should have many partitions and be replicated.
offset.storage.topic=connect-offsets

# Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated topic.
# You may need to manually create the topic to ensure single partition for the config topic as auto created topics may have multiple partitions.
config.storage.topic=connect-configs

# Topic to use for storing statuses. This topic can have multiple partitions and should be replicated.
status.storage.topic=connect-status

# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
  1. Connector settings:
{"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max":"1",
"topics":"apphits",
"key.ignore":"true",
"connection.url":"http://10.204.xx.xx:9292"
 "type.name":"kafka-connect" }}

How to use this connect !??

Could someone can tell user how to use this connect, use as a plugin , use as an application or other ??
I think we need provide a simple readme , should we ??

Distributed mode and versionning

Hi,

While running kafka-connect-elasticsearch in distributed mode, we get a lot of warnings about version conflicts:
WARN Ignoring version conflicts for items: [Key{access-logs/from-kafka/normalized-logs+0+7542000}, Key{access-logs/from-kafka/normalized-logs+0+7542001}, Key{access-logs/from-kafka/normalized-logs+0+7542002} (...)
Then, if I check one of the document in elasticsearch, I have:

$ curl 'localhost:9200/access-logs/from-kafka/normalized-logs%2B0%2B7542000?pretty=true'
{
  "_index" : "access-logs",
  "_type" : "from-kafka",
  "_id" : "normalized-logs+0+7542000",
  "_version" : 7542000,
  "found" : true,
  "_source" : {
   ...

were the version number (which, from the elasticsearch documentation is the number of versions) is equal to the offset in the partition (key.ignore=true in our configuration). This is unexpected and difficult to understand.

This is the configuration used to setup kafka-connect-elasticsearch:

$ curl -XPOST -H 'Content-Type: application/json' kafka-connect:8083/connectors -d '{
"name": "kafka-connect-elasticsearch-alogs",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "3",
"topic.index.map": "normalized-logs:access-logs",
"schema.ignore": true,
"key.ignore": true,
"topics": "normalized-logs",
"connection.url": "http://elasticsearch:9200",
"type.name": "from-kafka"
}}'

Any idea why we get such a strange result?

Is it just too early to use the kafka-connect-elasticsearch connector in distributed mode?

(by the way, we index in elasticsearch 5, which is not officially supported by the jest client (but the index api didn't change)).

Transform Lat Lon to GeoPoint

Is it possible to transform from { Lat: valLat, Lon:valLon, name:placeName} to {name:placeName, Location : {Lat:valLat,Lon:valLon} } format using single message transform with this plugin.
I tried this ....
transforms=hoistToStruct
transforms.hoistToStruct.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.hoistToStruct.field=Location

It works
, the problem is record transformed to {Location: {name:placeName,Lat:valLat,Lon:valLon} } which is not what I want.

topic.index.map is ignored on record put

According to documentation, topic.index.map config defines mapping between Kafka topics and Elasticsearch indices. I have the following map configured: topic.index.map=mytopic:myindex, so I expect records from mytopic to be added to myindex. But in io.confluent.connect.elasticsearch.ElasticsearchWriter#write method I see another behavior - topic name is used for schema mapping, despite the fact that mapped index name was used during initialization and index creation. So I always receive index_not_found_exception in io.confluent.connect.elasticsearch.Mapping#createMapping as long as I have index myindex, but connector is trying to find mytopic one. Quick fix solved this issue for me:

String mappedIndex = topicToIndexMap.get(sinkRecord.topic());
final String index = mappedIndex != null ? mappedIndex : sinkRecord.topic();

...but I'm not sure if I've got the idea of the given config correctly and using it in a proper way. Could you kindly check if it's bug or not?

Thanks

Support multiple types

Hello, is there any plan to support multiple types of kafka messages from one single topic?

It can be handled by copying the type from the structured kafka message to the _type meta field while indexing into elasticsearch.

Using the _type acts like name-spacing the messages in the elasticsearch and each type of data can therefore have its own mapping (which can be dynamic). This will resolve the mapping conflict problem while indexing.

This feature would be very appreciated because it will allow us to add new types of messages without doing any modification in the configuration of this connector nor in the elasticsearch.

[Question] Support for timestamps

Hi confluent folks,

I noticed that all data written to elasticsearch is written as strings... I would like to use timestamps (which I include in my originating kafka messages in a timestamp field.

image

Any advice on how to get this done correctly?

Best regards,
Bart

Support Logical Date Types

How do you feel about adding an option to treat kafka connect logical date fields (date, datetime, timestamp) as Elasticsearch date types? Both for schema creation and data conversion.

For background, we are trying to sink records with logical date fields (days since epoch) to an ES index with a corresponding date field. ES doesn't know how to interpret days since epoch and is expecting epoch_millis (or another accepted format).

documents going under docs.deleted with key.ignore as true

Hi There,

I am trying to load data from a kafka topic to elasticsearch index with key.ignore=true. But on elasticsearch, I see docs.deleted counts getting increased/decreased but still docs.count seems to be perfect. Am I missing something here? I understand that with lucene updations take place as delete+insert. But with key.ignore set to true, these should not happen. It would be really great if you can help me understand.

Thanks,
Deepti

Sink record timestamp as version number

Hi!
Sometimes it's useful to take timestamp from kafka records as version number instead partition offset.
For example when you add partitions in a target topic.

Error 500 Timeout

POST or GET in /connectors return 500 Timeout in distribuited mode.

[2017-03-21 21:26:04,794] INFO 127.0.0.1 - - [21/Mar/2017:21:24:34 +0000] "GET /connectors HTTP/1.1" 500 48 90235 (org.apache.kafka.connect.runtime.rest.RestServer:60)

Can't get working at all / config woes?

First, thanks for your work. Frankly, I'm quite surprised of the amount of work required to connect the two (seemed quite trivial, functionality-wise).

Now, I'm having a really hard time getting this thing to work.
I've tried to follow http://docs.confluent.io/3.2.0/connect/connect-elasticsearch/docs/elasticsearch_connector.html (with the exception of I don't know what -avro- is, seems to be outdated, so I've just dropped it).
So the connector logs the following:

kafka-elasticsearch-connector_1  | [2017-03-28 15:10:13,079] INFO ConnectorConfig values:
kafka-elasticsearch-connector_1  | 	connector.class = io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
kafka-elasticsearch-connector_1  | 	key.converter = null
kafka-elasticsearch-connector_1  | 	name = elasticsearch-sink
kafka-elasticsearch-connector_1  | 	tasks.max = 2
kafka-elasticsearch-connector_1  | 	value.converter = null
kafka-elasticsearch-connector_1  |  (org.apache.kafka.connect.runtime.ConnectorConfig:180)

note that neither topics (from config/quickstart-elasticsearch.properties, which I didn't find in ElasticsearchSinkConnectorConfig) and topic.index.map (which I added in the properties file, and the config class has them) are absent, and it doesn't help if I pre-create the index (which is unnecessary, I believe).

So, I get no error messages from any component and no hits in ES, please help.

Offset Commit Timed Out

I've got a connector set up with a constant low volume stream of messages (10/min). While everything seems to work fine the following warning is being logged at regular intervals:

WARN Commit of WorkerSinkTask{id=elastic-authenticatedcall-sink-2} offsets timed out

The storage topics are configured as specified here http://docs.confluent.io/3.2.0/connect/userguide.html#distributed-worker-configuration

Timeouts are set to the defaults (which seem generous)

The kafka cluster is running 3 nodes and has essentially no load. Everything appears to work fine, and when stopping and starting the connector it appears to pick up where it left off.

Can anyone shed any light on this warning? Should I be concerned?

Seeing "Ignoring invalid task provided offset..." in connect logs when using TimestampRouter

I'm using the TimestampRouter to index documents in rolling daily indexes. It is indexing documents correctly, however, I'm seeing a ton of these WARN messages in my connect logs:

[2017-06-07 20:30:38,932] WARN Ignoring invalid task provided offset test.topic6-20170607-0/OffsetAndMetadata{offset=4, metadata=''} -- partition not assigned (org.apache.kafka.connect.runtime.WorkerSinkTask:337)

Is this something I should be concerned about? And/or should the ES connector be handling this logic differently?

Index creation ignores dynamic templates?

If I create an index template with dynamic mappings, they are ignored if I start the connector from scratch, but honoured if the connector is already running and I delete the index and then create new data to force its recreation.
I'll try and create a test case to reproduce, but is there something about how the connector initialises itself with Elasticsearch at startup that would explain this behaviour?

thanks.

Elasticsearch Sink Connector 3.1.1 and mapping problem for geo_point

Hi.

We're trying to migrate from Datamountaineer Elastic Sink connector to the official Confluent connector. We're experiencing problems with geo_point data types, correctly working with the previous connector, but obviously it depends on custom connector logic. This is the error, repeated for all points:

org.apache.kafka.connect.errors.ConnectException: Bulk request failed: [{"type":"mapper_parsing_exception","reason":"failed to parse [geoVal]","caused_by":{"type":"number_format_exception","reason":"For input string: \"43.54801,10.31061\""}}]
    at io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.execute(BulkProcessor.java:353)
    at io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.call(BulkProcessor.java:326)
    at io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.call(BulkProcessor.java:312)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

This is the avro schema:

{"namespace": "com.streams.avro",
 "type": "record",
 "name": "Data",
 "fields": [
(...)
     {"name": "geoVal", "type": "string"},
(...)
 ]
}

This is the properties file content:

name=kafka-to-elasticsearch-connector
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=MYTOPIC
topic.index.map=MYTOPIC:myindex
key.ignore=true
topic.schema.ignore=MYTOPIC
connection.url=http://elasticsearch:9200
type.name=kafka-connect

This is the connector log initialization:

[2016-12-16 13:59:34,354] INFO Instantiated connector kafka-to-elasticsearch-measm-connector with version 3.1.1 of type class io.confluent.connect.elasticsearch.ElasticsearchSinkConnector (org.apache.kafka.connect.runtime.Worker)
[2016-12-16 13:59:34,356] INFO ElasticsearchSinkConnectorConfig values:
    batch.size = 2000
    connection.url = http://elasticsearch:9200
    flush.timeout.ms = 10000
    key.ignore = true
    linger.ms = 1
    max.buffered.records = 20000
    max.in.flight.requests = 5
    max.retries = 5
    retry.backoff.ms = 100
    schema.ignore = false
    topic.index.map = [MEASM-TOPIC:measm_by_sensor_param]
    topic.key.ignore = []
    topic.schema.ignore = [MEASM-TOPIC]
    type.name = kafka-connect
 (io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig)
[2016-12-16 13:59:34,357] INFO Finished creating connector kafka-to-elasticsearch-connector (org.apache.kafka.connect.runtime.Worker)

Ths is the field of elastic index:

(...)
          "geoVal": {
            "type": "geo_point"
          },
(...)

I'm not sure if it's an avro schema problem or if we're missing some configuration on properties file.

Thanks
Luca

kafka-connect : does it work with stringconverter

i was trying to use "org.apache.kafka.connect.storage.stringconverter" instead of json converter for following properties:

key.converter
value.converter

and it throws out exceptions :
Exception in thread "main" org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.connect.storage.stringconverter for configuration key.converter: Class org.apache.kafka.connect.storage.stringconverter could not be found.
at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:690)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:433)
at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:56)
at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:63)
at org.apache.kafka.connect.runtime.WorkerConfig.(WorkerConfig.java:163)
at org.apache.kafka.connect.runtime.standalone.StandaloneConfig.(StandaloneConfig.java:43)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:69)

let me know if its possible to use string instead of json

Could not create index on elasticsearch

Hi.

We have this error trying to run connector 3.1.1 and elasticsearch 5.3 (from official ES docker):

org.apache.kafka.connect.errors.ConnectException: Could not create index:MEASM-TOPIC at io.confluent.connect.elasticsearch.ElasticsearchWriter.createIndicesForTopics(ElasticsearchWriter.java:251) at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.open(ElasticsearchSinkTask.java:113) at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:431) at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1000(WorkerSinkTask.java:55) at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:467) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979) at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:317) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:235) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

This is the connector configuration:

{
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"type.name": "kafka-connect",
"tasks.max": "1",
"topics": "MEASM-TOPIC",
"name": "ElasticsearchSinkConnector",
"key.ignore": "true",
"connection.url": "http://elasticsearch:9200"
}

It seems the problems is on the specific topic. We tried with another new test topic and connector didn't throw any exception: index on elastic was correctly created and data moved from kafka to elastic. The stack trace didn't give explanation on the cause of the problem.

Thanks
Luca

Index Creation Attempted When Using Existing Index Via Alias

I want to use an alias to insert into elastic using the pattern described here:

https://www.elastic.co/blog/managing-time-based-indices-efficiently

However I'm finding that the connector fails on startup because it is attempting to create an index with the alias name.

On startup, the connector checks for the existence of an index but it should also check whether an alias with the same name already exists before attempting to create a new index

MalformedJsonException

i hive use kafka connector to write data to es,kafka version is 0.10.1.1 and es version is 5.2.1,but when i start connect-standalone i hive get a error:
[pool-1-thread-1] ERROR org.apache.kafka.connect.runtime.WorkerTask - Task kafka-to-elasticsearch-1-0 threw an uncaught and unrecoverable exception
com.google.gson.JsonSyntaxException: com.google.gson.stream.MalformedJsonException: Use JsonReader.setLenient(true) to accept malformed JSON at line 1 column 5 path $
at com.google.gson.JsonParser.parse(JsonParser.java:65)
at com.google.gson.JsonParser.parse(JsonParser.java:45)
at io.searchbox.action.AbstractAction.parseResponseBody(AbstractAction.java:94)
at io.searchbox.action.AbstractAction.createNewElasticSearchResult(AbstractAction.java:65)
at io.searchbox.action.GenericResultAbstractAction.createNewElasticSearchResult(GenericResultAbstractAction.java:20)
at io.searchbox.client.http.JestHttpClient.deserializeResponse(JestHttpClient.java:131)
at io.searchbox.client.http.JestHttpClient.execute(JestHttpClient.java:50)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.createIndicesForTopics(ElasticsearchWriter.java:248)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.open(ElasticsearchSinkTask.java:113)
at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:431)
at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1000(WorkerSinkTask.java:55)
at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:467)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:317)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:235)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.google.gson.stream.MalformedJsonException: Use JsonReader.setLenient(true) to accept malformed JSON at line 1 column 5 path $
at com.google.gson.stream.JsonReader.syntaxError(JsonReader.java:1573)
at com.google.gson.stream.JsonReader.checkLenient(JsonReader.java:1423)
at com.google.gson.stream.JsonReader.doPeek(JsonReader.java:546)
at com.google.gson.stream.JsonReader.peek(JsonReader.java:429)
at com.google.gson.JsonParser.parse(JsonParser.java:60)
... 28 more
[pool-1-thread-1] ERROR org.apache.kafka.connect.runtime.WorkerTask - Task is being killed and will not recover until manually restarted

and my connector conf is :
bootstrap.servers=es-1:9092

The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will

need to configure these based on the format they want their data in when loaded from or stored into Kafka

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply

it to

key.converter.schemas.enable=true
value.converter.schemas.enable=true

The internal converter used for offsets and config data is configurable and must be specified, but most users will

always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format.

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets

Flush much faster than normal, which is useful for testing/debugging

offset.flush.interval.ms=10000

also the connector sink conf is :
name=kafka-to-elasticsearch-1
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=connect-test-3
topic.index.map=connect-test-3:myindex
key.ignore=true
topic.schema.ignore=ture
connection.url=http://es-2:9200,http://es-3:9200
type.name=kafka-connect

and who i tell what the problem is .thanks

Artifact in repo?

I can't download the artifact from Confluent's Maven repo (version 3.0.0). Is it uploaded?
This is the URL from which I'm trying to connect.

Global config values for topic-overridable options should act as defaults

If I have

key.ignore=true

I get a failure:

[pool-2-thread-1] ERROR org.apache.kafka.connect.runtime.WorkerSinkTask - Task elasticsearch-sink-0 threw an uncaught and unrecoverable exception
org.apache.kafka.connect.errors.DataException: STRUCTis not supported as the document id.
        at io.confluent.connect.elasticsearch.DataConverter.convertKey(DataConverter.java:80)
        at io.confluent.connect.elasticsearch.DataConverter.convertRecord(DataConverter.java:134)
        at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:308)
        at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:129)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:381)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:227)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

The config is definitely being picked up :

[pool-2-thread-1] INFO io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig - ElasticsearchSinkConnectorConfig values:
        type.name = kafka-connect
        batch.size = 2000
        max.retry = 5
        key.ignore = true
        max.in.flight.requests = 5
        retry.backoff.ms = 100
        max.buffered.records = 20000
        schema.ignore = false
        flush.timeout.ms = 10000
        topic.index.map = [ORCL.SOE.LOGON:soe.logon]
        topic.key.ignore = []
        connection.url = http://localhost:9200
        topic.schema.ignore = []
        linger.ms = 1

If I set the key ignore specifically for the topic:

topic.key.ignore = ORCL.SOE.LOGON

Then processing works fine. Should key.ignore work this way (globally) or am I misunderstanding it?

Schema Registry - Schema not found; error code: 40403

I am trying to run the ES connector against an Avro-based Kafka topic, which is using the Schema Registry.
Unfortunately when the connector starts it fails fetching the schema from the Schema Registry.

error:

[2016-12-21 21:30:51,955] ERROR Task ES-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
org.apache.kafka.connect.errors.DataException: Failed to deserialize data to Avro: 
	at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:109)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:358)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:239)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 421
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
	at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:170)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:187)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:323)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:316)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:63)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndID(CachedSchemaRegistryClient.java:118)
	at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:121)
	at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:190)
	at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:130)
	at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:99)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:358)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:239)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
[2016-12-21 21:30:51,961] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:143)
[2016-12-21 21:30:51,961] INFO Stopping ElasticsearchSinkTask. (io.confluent.connect.elasticsearch.ElasticsearchSinkTask:135)

connect-distributed.properties:

bootstrap.servers=10.xx.xx.xx:9090,10.xx.xx.xx:9090
group.id=connect-cluster

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://10.xx.xx.xx:8081

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://10.xx.xx.xx:8081

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.topic=connect-offsets
config.storage.topic=connect-configs
status.storage.topic=connect-status

The Schema Registry log shows that it received a request for /subjects/[topicname]-value , but returned 404.
We use Schema Registry for long now, but nowhere in its request logs I have ever seen a request for anything like /subjects/[topicname]-value

Topic addition and deletion

Hello, I've been using this connector. And I need to add or delete topics frequently.

Actually, I know one solution to update topic configuration.

That is to use Kafka connect REST API,
"PUT /connectors/{name}/config - update the configuration parameters for a specific connector".

But it takes too much time because it updates total configuration and rebalances the partitions.

I need the faster solution to update topics exclusively without restarting connector.

Do you have any idea?

ES Ingest Pipeline support

Elasticsearch has support for Ingest Pipelines to perform transformations on data before it is indexed. The indexing API requires a parameter to be passed to use a pipeline:

PUT /myindex/type/1?pipeline=monthlyindex
{
  "date1" : "2016-04-25T12:02:01.789Z"
}

Would it be possible to add this as an option to the Kafka Connect Elasticsearch connector?

Build?

Hi,
I'm keen to try this connector out. I cloned the repo and ran mvn clean package but hit a failure

Failed to execute goal on project kafka-connect-elasticsearch: 
Could not resolve dependencies for project io.confluent:kafka-connect-elasticsearch:jar:3.1.0-SNAPSHOT: 
The following artifacts could not be resolved: 
org.apache.kafka:connect-api:jar:0.10.1.0-SNAPSHOT, 
org.apache.kafka:connect-json:jar:0.10.1.0-SNAPSHOT: 
Could not find artifact org.apache.kafka:connect-api:jar:0.10.1.0-SNAPSHOT in confluent (http://packages.confluent.io/maven/) 

I'm new to building packages etc, but happy to give it a go :) - can you point me towards how I can resolve these dependencies? Thanks!

Date placeholder for specifying destination ES index

I am using Kafka Connect to write logs into an Elasticsearch index.

Is it possible to set an index to output to that changes daily, i.e. logs-{date}, similar to what logstash does, to prevent the ES index from getting too big?

I have had some success with the topic.index.map field but I have only been able to pass in a string as the index map.

Using elasticsearch generated ID's

I noticed that documents indexed in elasticsearch have their ids in the following format topic+partition+offset.

I would prefer to use id's generated by elasticsearch. It seems topic+partition+offset is not usually unique so I am loosing data.

How can I change that?

Json format Exception,MalformedJsonException

Hi,

I hive use kafka connector to write data to es,kafka version is 0.10.1.1 and es version is 5.2.1,but when i start connect-standalone i hive get a error:
[pool-1-thread-1] ERROR org.apache.kafka.connect.runtime.WorkerTask - Task kafka-to-elasticsearch-1-0 threw an uncaught and unrecoverable exception
com.google.gson.JsonSyntaxException: com.google.gson.stream.MalformedJsonException: Use JsonReader.setLenient(true) to accept malformed JSON at line 1 column 5 path $
at com.google.gson.JsonParser.parse(JsonParser.java:65)
at com.google.gson.JsonParser.parse(JsonParser.java:45)
at io.searchbox.action.AbstractAction.parseResponseBody(AbstractAction.java:94)
at io.searchbox.action.AbstractAction.createNewElasticSearchResult(AbstractAction.java:65)
at io.searchbox.action.GenericResultAbstractAction.createNewElasticSearchResult(GenericResultAbstractAction.java:20)
at io.searchbox.client.http.JestHttpClient.deserializeResponse(JestHttpClient.java:131)
at io.searchbox.client.http.JestHttpClient.execute(JestHttpClient.java:50)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.createIndicesForTopics(ElasticsearchWriter.java:248)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.open(ElasticsearchSinkTask.java:113)
at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:431)
at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1000(WorkerSinkTask.java:55)
at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:467)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:317)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:235)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.google.gson.stream.MalformedJsonException: Use JsonReader.setLenient(true) to accept malformed JSON at line 1 column 5 path $
at com.google.gson.stream.JsonReader.syntaxError(JsonReader.java:1573)
at com.google.gson.stream.JsonReader.checkLenient(JsonReader.java:1423)
at com.google.gson.stream.JsonReader.doPeek(JsonReader.java:546)
at com.google.gson.stream.JsonReader.peek(JsonReader.java:429)
at com.google.gson.JsonParser.parse(JsonParser.java:60)
... 28 more
[pool-1-thread-1] ERROR org.apache.kafka.connect.runtime.WorkerTask - Task is being killed and will not recover until manually restarted

and my connector conf is :

bootstrap.servers=es-1:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

also the connector sink conf is :

name=kafka-to-elasticsearch-1
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=connect-test-3
topic.index.map=connect-test-3:myindex
key.ignore=true
topic.schema.ignore=ture
connection.url=http://es-2:9200,http://es-3:9200
type.name=kafka-connect

and who can tell what the problem is .thanks

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.