Comments (18)
The solution that worked for me in my use case is as below:
Elasticsearch allows date math during index time: https://www.elastic.co/guide/en/elasticsearch/reference/2.0/date-math-index-names.html#date-math-index-names
My connector config:
{
"name": "arras_es_sink1",
"config": {
"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url" : "http://localhost:9200",
"key.ignore" :"true",
"group.id" : "test2",
"topic.index.map" : "junk_json:<logs-{now/d}>",
"type.name" : "mytype",
"schema.ignore":"true",
"topic.schema.ignore" : "junk_json",
"topic.key.ignore" : "true",
"key.converter.schemas.enable":"false",
"value.converter.schemas.enable":"false",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"retry.backoff.ms" : "10000",
"batch.size":"20000",
"max.buffered.records":"150000",
"flush.timeout.ms":"60000",
"max_tasks" : "1",
"topics":"junk_json"
}
}
output:
http://localhost:9200/_cat/indices?v
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
green open logs-2018.02.22 3EI9-NuOTzSZxBu_F0vlRQ 5 1 153500 888 56.5mb 28.2mb
from kafka-connect-elasticsearch.
Thanks for opening this issue @michaeltravisuk, making it possible to use a placeholder for the date when specifying destination index makes a lot of sense when sinking to ES. We'll try to get to it soon.
from kafka-connect-elasticsearch.
Depending on your environment, another possible approach to keeping index sizes in check would be to use server-side index aliases.
These can be atomically rotated according to your needs using the Indices API directly or a wrapper like curator without causing any indexing downtime.
from kafka-connect-elasticsearch.
That's what I've done for now, means I have some cron jobs taking care of the rotation of the indices, which works well enough for now. In order to do away with those cron jobs, however, the date placeholder feature would be much appreciated.
from kafka-connect-elasticsearch.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-66%3A+Single+Message+Transforms+for+Kafka+Connect proposes a TimestampRouter
which should address this use-case.
from kafka-connect-elasticsearch.
If I understand the above proposal:
- make a transformation of a topic name->new topic name including the timestamp
- in ES sink connector don't use the topic->index mapping but use the topic name as an index
There are many things I don't like in this approach. Primarily - we are loosing the original topic name in the process which is not nice. For instance if we add some metrics/logging solution, it would be a headache to follow these dynamic topic names.
I like the approach of HDFS Sink connector which allows to specify a custom partitioner.
I created a fork of the ES sink connector which I will gladly contribute, @gwenshap - I will appreciate some guidance for the contribution process: https://github.com/mikeskali/kafka-connect-elasticsearch
Some of the functionality:
- Minimal changes to original code
- Allows any custom "partitioning" logic
- Comes bundled with:
a. SimplePartitioner (default) - keeps the existing functionality
b. TimePartitioner - by processing(current) time or event time (taken from some field). Pretty similar to HDFS connector functionality - flexible naming pattern, Timezone support, Frequency (for instance an index for every 6 hours).
from kafka-connect-elasticsearch.
@mikeskali We're trying to balance a couple of different things here:
- Ease of use/flexibility -- minimize configs required to do what you want
- Standardization/minimizing reimplementation -- we'd like to avoid common operations getting reimplemented across all connectors (Converters/serialization was the one we tackled from the get go, but common transformations is why we avoided adding too much functionality to any given connector -- note that HDFS partitioners are closely tied to how the file commit process works which is why they are baked in.)
- Avoid "programming by configuration", i.e. the small # of transformations breaks down when you need to perform > 2 per connector. Sometimes adding more transformations is better than having people "coding" in their config files by composing long pipelines of transformations.
I think it'd be helpful to figure out if there's an approach that covers 99% of use cases without requiring every connector with similar requirements to implement the same functionality. In particular, none of this "partitioning" logic in the ES connector requires extra coordination to make delivery guarantees, so it is unclear that it would need to be integrated directly into the connector.
Since the list of bundled partitioners is small, perhaps small extensions to Kafka Connect's existing transformations could address your needs while also keeping things entirely generalized for all connectors and minimizing code duplication? Single message transforms essentially allow for completely pluggable partitioning logic for any connector, and most connectors already have a default, natural system-specific table/whatever <-> topic mapping that connectors can override. Timestamps are an extremely common use case for this which is why TimestampRouter
exists, and it often may be the only transformation needed so it satisfies the requirement for low configuration overhead. Would giving it a bit more flexibility be sufficient to satisfy your use cases? If so, I think we could move the discussion over to the Kafka side of things to figure out how it can be extended without making it overly complicated.
from kafka-connect-elasticsearch.
@ewencp,
The transformations functionality seems to be a great feature.
- Sure - let's make the TimestampRouter more flexible, I would be happy to take part. I believe that using event time(from some field in the data) is a must.
- I don't think that transforming the topic name and using the transformed topic name as the target partition is a good approach. Original topic name gets lost - which is problematic (for instance when looking at logs, if metrics will be added and such).
If we want to use the transformations - I would suggest to either:
a. Use the connector specific mapping approach
or
b. Adding an additional field "destination" and use the destination field in the different sink connectors.
from kafka-connect-elasticsearch.
For 2, no matter what you do you're going to need to get the topic into the value anyway or else it'll be lost, right? You could use something like the following transformations to do both:
transforms=CopyTopic, Router
transforms.CopyTopic.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.CopyTopic.topic.field=topic
transforms.Router.type=org.apache.kafka.connect.transforms.TimestampRouter
transforms.Router.timestamp.format=yyyyMMdd
transforms.Router.topic.format=index-${timestamp}
which would add a field topic
containing the topic name to the value and then override the topic (for the purposes of controlling how the ES connector, and most connectors, route data) to organize the data based on daily indexes.
from kafka-connect-elasticsearch.
I am not talking about storing the topic name in the ES - I mean that it doesn't sound like a perfect solution to change the original topic name. Even if it is used by some Sinks, it still sounds like a hack.
If transformations is the preferred approach, I would suggest adding an additional field "destination" that could be used to send data to ES topic/HDFS partition/database table or any other sink.
from kafka-connect-elasticsearch.
Hi, let me reuse this thread for my question. I am trying to achieve daily-rotating index in ES with help of TimestampRouter. I am not interested in keeping topic name in documents stored in ES, so I created kafka-elasticsearch connector with this configuration
curl -XPOST -H"Content-Type: application/json" localhost:8083/connectors -d '
{"name": "elasticsearch-sink-connector",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://localhost:9200",
"tasks.max": "1",
"topics": "topic.test",
"type.name": "log",
"key.ignore": true,
"schema.ignore": true,
"flush.timeout.ms": 20000,
"batch.size": 1000,
"max.buffered.records": 10000,
"transforms": "Router",
"transforms.Router.type": "org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.Router.timestamp.format": "yyyy-MM-dd",
"transforms.Router.topic.format": " ${topic}-${timestamp}"
}
}'
However, this leads to unexpected result
- There are created 2 new indices right after connector starts up -
topic.test
andtopic.test-1969-12-31
.- Index with the timestamp suffix is then fed with data. Why the un-suffixed index is created?
- The timestamp is far in the past, why?
- Distributed Connector server prints bunch of warnings
WARN Ignoring invalid task provided offset topic.test-1969-12-31-24/OffsetAndMetadata{offset=27768, metadata=''} -- partition not assigned (org.apache.kafka.connect.runtime.WorkerSinkTask:337)
(The source topic from which data are taken has 50 partitions, so it is repeating 50 times every time it is printed out)- I assume, the connector is writing own position in the source topic, but as the topic name changed, it fails.
My setup is
- Confluent OSS 3.2.2 on separate host that runs Distributed Connector
- Vanilla Kafka 0.10.2.1 forming Kafka cluster
from kafka-connect-elasticsearch.
I see it is discussed in issue #94
from kafka-connect-elasticsearch.
Hi @ewencp @mikeskali and others, I am trying to use SMT to create time based ES indices with following config :
"transforms":"routeTS",
"transforms.routeTS.type":"org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.routeTS.timestamp.format":"yyyy.MM.dd",
"transforms.routeTS.topic.format":"smttest-${topic}-${timestamp}"
Noticed that it adds the record timestamp in new indices created. Is there any provision to mention event timestamp field in TimestampRouter? Thanks in advance.
from kafka-connect-elasticsearch.
Hi @ewencp, @martinhynar and others, I am new to ES connector. I tried same config, but don't see the time-based index created. The connector status looks good. Does anyone know what could be the reason? Thank you very much.
from kafka-connect-elasticsearch.
Closing because multiple solutions provided
from kafka-connect-elasticsearch.
@martinhynar
-- Why the un-suffixed index is created?
Regarding to your issue, could you please turn off the "auto.create.indices.at.start" option?
ref:https://docs.confluent.io/current/connect/kafka-connect-elasticsearch/configuration_options.html
Actually it worked for me! kindly check plz
from kafka-connect-elasticsearch.
@martinhynar
-- Why the un-suffixed index is created?Regarding to your issue, could you please turn off the "auto.create.indices.at.start" option?
ref:https://docs.confluent.io/current/connect/kafka-connect-elasticsearch/configuration_options.html
Actually it worked for me! kindly check plz
this options is not available in this docs
from kafka-connect-elasticsearch.
The solution that worked for me in my use case is as below:
Elasticsearch allows date math during index time: https://www.elastic.co/guide/en/elasticsearch/reference/2.0/date-math-index-names.html#date-math-index-namesMy connector config:
{
"name": "arras_es_sink1",
"config": {
"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url" : "http://localhost:9200",
"key.ignore" :"true",
"group.id" : "test2",
"topic.index.map" : "junk_json:<logs-{now/d}>",
"type.name" : "mytype",
"schema.ignore":"true",
"topic.schema.ignore" : "junk_json",
"topic.key.ignore" : "true",
"key.converter.schemas.enable":"false",
"value.converter.schemas.enable":"false",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"retry.backoff.ms" : "10000",
"batch.size":"20000",
"max.buffered.records":"150000",
"flush.timeout.ms":"60000",
"max_tasks" : "1",
"topics":"junk_json"
}
}
output:http://localhost:9200/_cat/indices?v
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
green open logs-2018.02.22 3EI9-NuOTzSZxBu_F0vlRQ 5 1 153500 888 56.5mb 28.2mb
I tried your solution, the index is created but the field is not the same as the original message. WDYT?
from kafka-connect-elasticsearch.
Related Issues (20)
- How to Convert JSON String field to ES Object?
- Capture Kafka key without using it as ID HOT 2
- Suggestion for INSERT operation "Ignoring EXTERNAL version conflict for operation INDEX on document"
- Used Elastic Java REST client is deprecated in 7.15.0 HOT 1
- Error with `"behavior.on.null.values": "delete"`
- Consumer paused indefinitely when using `AsyncOffsetTracker` with lot of null values
- Cannot use data stream with time_series mode HOT 2
- Error: Cannot infer mapping without schema HOT 1
- Connector fails with payloads >20 MB HOT 1
- Can't create a connector even if its loaded in Strimzi
- Support requests per second configuration options
- Log when there are too many requests errors
- [BUG] `TOO_MANY_REQUESTS` error craches the tasks with a unrecoverable exceptions without retries
- Ignore 'document_parsing_exception' HOT 1
- Inconsistent Logging for Tombstone Messages in Elastic Sink Connector
- abnormal data loss question
- Data Stream naming is far too restrictive HOT 1
- Creating index based on Timestamp doesn't work
- Limit retry backoff (and unlimited retries) HOT 4
- add support for index templates other than logs and metrics as types when using data streams
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from kafka-connect-elasticsearch.