Giter Site home page Giter Site logo

Comments (18)

gopikrishna967 avatar gopikrishna967 commented on July 18, 2024 1

The solution that worked for me in my use case is as below:
Elasticsearch allows date math during index time: https://www.elastic.co/guide/en/elasticsearch/reference/2.0/date-math-index-names.html#date-math-index-names

My connector config:
{
"name": "arras_es_sink1",
"config": {
"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url" : "http://localhost:9200",
"key.ignore" :"true",
"group.id" : "test2",
"topic.index.map" : "junk_json:<logs-{now/d}>",
"type.name" : "mytype",
"schema.ignore":"true",
"topic.schema.ignore" : "junk_json",
"topic.key.ignore" : "true",
"key.converter.schemas.enable":"false",
"value.converter.schemas.enable":"false",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"retry.backoff.ms" : "10000",
"batch.size":"20000",
"max.buffered.records":"150000",
"flush.timeout.ms":"60000",
"max_tasks" : "1",
"topics":"junk_json"
}
}
output:

http://localhost:9200/_cat/indices?v

health status index uuid pri rep docs.count docs.deleted store.size pri.store.size

green open logs-2018.02.22 3EI9-NuOTzSZxBu_F0vlRQ 5 1 153500 888 56.5mb 28.2mb

from kafka-connect-elasticsearch.

shikhar avatar shikhar commented on July 18, 2024

Thanks for opening this issue @michaeltravisuk, making it possible to use a placeholder for the date when specifying destination index makes a lot of sense when sinking to ES. We'll try to get to it soon.

from kafka-connect-elasticsearch.

ioc32 avatar ioc32 commented on July 18, 2024

Depending on your environment, another possible approach to keeping index sizes in check would be to use server-side index aliases.

These can be atomically rotated according to your needs using the Indices API directly or a wrapper like curator without causing any indexing downtime.

from kafka-connect-elasticsearch.

michaeltravisuk avatar michaeltravisuk commented on July 18, 2024

That's what I've done for now, means I have some cron jobs taking care of the rotation of the indices, which works well enough for now. In order to do away with those cron jobs, however, the date placeholder feature would be much appreciated.

from kafka-connect-elasticsearch.

shikhar avatar shikhar commented on July 18, 2024

https://cwiki.apache.org/confluence/display/KAFKA/KIP-66%3A+Single+Message+Transforms+for+Kafka+Connect proposes a TimestampRouter which should address this use-case.

from kafka-connect-elasticsearch.

mikeskali avatar mikeskali commented on July 18, 2024

If I understand the above proposal:

  1. make a transformation of a topic name->new topic name including the timestamp
  2. in ES sink connector don't use the topic->index mapping but use the topic name as an index

There are many things I don't like in this approach. Primarily - we are loosing the original topic name in the process which is not nice. For instance if we add some metrics/logging solution, it would be a headache to follow these dynamic topic names.

I like the approach of HDFS Sink connector which allows to specify a custom partitioner.

I created a fork of the ES sink connector which I will gladly contribute, @gwenshap - I will appreciate some guidance for the contribution process: https://github.com/mikeskali/kafka-connect-elasticsearch

Some of the functionality:

  1. Minimal changes to original code
  2. Allows any custom "partitioning" logic
  3. Comes bundled with:
    a. SimplePartitioner (default) - keeps the existing functionality
    b. TimePartitioner - by processing(current) time or event time (taken from some field). Pretty similar to HDFS connector functionality - flexible naming pattern, Timezone support, Frequency (for instance an index for every 6 hours).

from kafka-connect-elasticsearch.

ewencp avatar ewencp commented on July 18, 2024

@mikeskali We're trying to balance a couple of different things here:

  1. Ease of use/flexibility -- minimize configs required to do what you want
  2. Standardization/minimizing reimplementation -- we'd like to avoid common operations getting reimplemented across all connectors (Converters/serialization was the one we tackled from the get go, but common transformations is why we avoided adding too much functionality to any given connector -- note that HDFS partitioners are closely tied to how the file commit process works which is why they are baked in.)
  3. Avoid "programming by configuration", i.e. the small # of transformations breaks down when you need to perform > 2 per connector. Sometimes adding more transformations is better than having people "coding" in their config files by composing long pipelines of transformations.

I think it'd be helpful to figure out if there's an approach that covers 99% of use cases without requiring every connector with similar requirements to implement the same functionality. In particular, none of this "partitioning" logic in the ES connector requires extra coordination to make delivery guarantees, so it is unclear that it would need to be integrated directly into the connector.

Since the list of bundled partitioners is small, perhaps small extensions to Kafka Connect's existing transformations could address your needs while also keeping things entirely generalized for all connectors and minimizing code duplication? Single message transforms essentially allow for completely pluggable partitioning logic for any connector, and most connectors already have a default, natural system-specific table/whatever <-> topic mapping that connectors can override. Timestamps are an extremely common use case for this which is why TimestampRouter exists, and it often may be the only transformation needed so it satisfies the requirement for low configuration overhead. Would giving it a bit more flexibility be sufficient to satisfy your use cases? If so, I think we could move the discussion over to the Kafka side of things to figure out how it can be extended without making it overly complicated.

from kafka-connect-elasticsearch.

mikeskali avatar mikeskali commented on July 18, 2024

@ewencp,
The transformations functionality seems to be a great feature.

  1. Sure - let's make the TimestampRouter more flexible, I would be happy to take part. I believe that using event time(from some field in the data) is a must.
  2. I don't think that transforming the topic name and using the transformed topic name as the target partition is a good approach. Original topic name gets lost - which is problematic (for instance when looking at logs, if metrics will be added and such).
    If we want to use the transformations - I would suggest to either:
    a. Use the connector specific mapping approach
    or
    b. Adding an additional field "destination" and use the destination field in the different sink connectors.

from kafka-connect-elasticsearch.

ewencp avatar ewencp commented on July 18, 2024

For 2, no matter what you do you're going to need to get the topic into the value anyway or else it'll be lost, right? You could use something like the following transformations to do both:

transforms=CopyTopic, Router
transforms.CopyTopic.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.CopyTopic.topic.field=topic
transforms.Router.type=org.apache.kafka.connect.transforms.TimestampRouter
transforms.Router.timestamp.format=yyyyMMdd
transforms.Router.topic.format=index-${timestamp}

which would add a field topic containing the topic name to the value and then override the topic (for the purposes of controlling how the ES connector, and most connectors, route data) to organize the data based on daily indexes.

from kafka-connect-elasticsearch.

mikeskali avatar mikeskali commented on July 18, 2024

I am not talking about storing the topic name in the ES - I mean that it doesn't sound like a perfect solution to change the original topic name. Even if it is used by some Sinks, it still sounds like a hack.
If transformations is the preferred approach, I would suggest adding an additional field "destination" that could be used to send data to ES topic/HDFS partition/database table or any other sink.

from kafka-connect-elasticsearch.

martinhynar avatar martinhynar commented on July 18, 2024

Hi, let me reuse this thread for my question. I am trying to achieve daily-rotating index in ES with help of TimestampRouter. I am not interested in keeping topic name in documents stored in ES, so I created kafka-elasticsearch connector with this configuration

curl -XPOST -H"Content-Type: application/json" localhost:8083/connectors -d '
{"name": "elasticsearch-sink-connector",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "connection.url": "http://localhost:9200",
        "tasks.max": "1",
        "topics": "topic.test",
        "type.name": "log",
        "key.ignore": true,
        "schema.ignore": true,
        "flush.timeout.ms": 20000,
        "batch.size": 1000,
        "max.buffered.records": 10000,
        "transforms": "Router",
        "transforms.Router.type": "org.apache.kafka.connect.transforms.TimestampRouter",
        "transforms.Router.timestamp.format": "yyyy-MM-dd",
        "transforms.Router.topic.format": " ${topic}-${timestamp}"
    }
}'

However, this leads to unexpected result

  • There are created 2 new indices right after connector starts up - topic.test and topic.test-1969-12-31.
    • Index with the timestamp suffix is then fed with data. Why the un-suffixed index is created?
    • The timestamp is far in the past, why?
  • Distributed Connector server prints bunch of warnings
    WARN Ignoring invalid task provided offset topic.test-1969-12-31-24/OffsetAndMetadata{offset=27768, metadata=''} -- partition not assigned (org.apache.kafka.connect.runtime.WorkerSinkTask:337)
    (The source topic from which data are taken has 50 partitions, so it is repeating 50 times every time it is printed out)
    • I assume, the connector is writing own position in the source topic, but as the topic name changed, it fails.

My setup is

  • Confluent OSS 3.2.2 on separate host that runs Distributed Connector
  • Vanilla Kafka 0.10.2.1 forming Kafka cluster

from kafka-connect-elasticsearch.

martinhynar avatar martinhynar commented on July 18, 2024

I see it is discussed in issue #94

from kafka-connect-elasticsearch.

deeptiantony avatar deeptiantony commented on July 18, 2024

Hi @ewencp @mikeskali and others, I am trying to use SMT to create time based ES indices with following config :
"transforms":"routeTS",
"transforms.routeTS.type":"org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.routeTS.timestamp.format":"yyyy.MM.dd",
"transforms.routeTS.topic.format":"smttest-${topic}-${timestamp}"

Noticed that it adds the record timestamp in new indices created. Is there any provision to mention event timestamp field in TimestampRouter? Thanks in advance.

from kafka-connect-elasticsearch.

iamhg avatar iamhg commented on July 18, 2024

Hi @ewencp, @martinhynar and others, I am new to ES connector. I tried same config, but don't see the time-based index created. The connector status looks good. Does anyone know what could be the reason? Thank you very much.

from kafka-connect-elasticsearch.

levzem avatar levzem commented on July 18, 2024

Closing because multiple solutions provided

from kafka-connect-elasticsearch.

anzersy avatar anzersy commented on July 18, 2024

@martinhynar
-- Why the un-suffixed index is created?

Regarding to your issue, could you please turn off the "auto.create.indices.at.start" option?

ref:https://docs.confluent.io/current/connect/kafka-connect-elasticsearch/configuration_options.html

Actually it worked for me! kindly check plz

from kafka-connect-elasticsearch.

naviat avatar naviat commented on July 18, 2024

@martinhynar
-- Why the un-suffixed index is created?

Regarding to your issue, could you please turn off the "auto.create.indices.at.start" option?

ref:https://docs.confluent.io/current/connect/kafka-connect-elasticsearch/configuration_options.html

Actually it worked for me! kindly check plz

this options is not available in this docs

from kafka-connect-elasticsearch.

naviat avatar naviat commented on July 18, 2024

The solution that worked for me in my use case is as below:
Elasticsearch allows date math during index time: https://www.elastic.co/guide/en/elasticsearch/reference/2.0/date-math-index-names.html#date-math-index-names

My connector config:
{
"name": "arras_es_sink1",
"config": {
"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url" : "http://localhost:9200",
"key.ignore" :"true",
"group.id" : "test2",
"topic.index.map" : "junk_json:<logs-{now/d}>",
"type.name" : "mytype",
"schema.ignore":"true",
"topic.schema.ignore" : "junk_json",
"topic.key.ignore" : "true",
"key.converter.schemas.enable":"false",
"value.converter.schemas.enable":"false",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"retry.backoff.ms" : "10000",
"batch.size":"20000",
"max.buffered.records":"150000",
"flush.timeout.ms":"60000",
"max_tasks" : "1",
"topics":"junk_json"
}
}
output:

http://localhost:9200/_cat/indices?v

health status index uuid pri rep docs.count docs.deleted store.size pri.store.size

green open logs-2018.02.22 3EI9-NuOTzSZxBu_F0vlRQ 5 1 153500 888 56.5mb 28.2mb

I tried your solution, the index is created but the field is not the same as the original message. WDYT?

from kafka-connect-elasticsearch.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.