Giter Site home page Giter Site logo

Seeing "Ignoring invalid task provided offset..." in connect logs when using TimestampRouter about kafka-connect-elasticsearch HOT 9 CLOSED

confluentinc avatar confluentinc commented on July 18, 2024 7
Seeing "Ignoring invalid task provided offset..." in connect logs when using TimestampRouter

from kafka-connect-elasticsearch.

Comments (9)

rmoff avatar rmoff commented on July 18, 2024

Related: https://issues.apache.org/jira/browse/KAFKA-5567

from kafka-connect-elasticsearch.

ewencp avatar ewencp commented on July 18, 2024

Yeah, in the case of the ES connector, this is really a framework issue when a transformation is used that modifies the topic since the ES connector relies on the framework's offset commits. This is something you should be concerned about since offsets for any topic partitions that change their names (presumably all of them in the case of TimestampRouter) won't be committed, which means upon failure and/or rebalance, data will be replayed and re-delivered to Elastic.

So at the moment, I wouldn't combine these routers with sink connectors. The good news is that the fix for sink connectors like ES look like they won't be a problem (you can see there is already a patch).

from kafka-connect-elasticsearch.

zzbennett avatar zzbennett commented on July 18, 2024

@ewencp thanks for the details. So to be crystal clear: when running the ES sink connector with the TimestampRouter, it can't commit its offsets at all to kafka? If I restart the connect cluster, the ES connectors will consume from the latest offset/earliest offset (depending on configs) because it has no offsets committed in kafka?

Having data get replayed to ES is not such a big deal since the offset is encoded in the ES id itself and all the duplicate writes will be idempotent. I've been running the ES connector with the TimestampRouter for a few months now and have not had issues with lost or duplicate data yet, which is why I wasn't sure if this warning was as bad as it sounds.

We were planning to deploy the ES connector to production in the next two-three weeks. Do you know if the patch will be merged and released before then? And will the fix be part of a bug fix release for 3.2.x? Or will it only appear in later versions? Sorry for the barrage of questions, just trying to figure out if this will be a blocker for us or not.

from kafka-connect-elasticsearch.

ewencp avatar ewencp commented on July 18, 2024

@zzbennett That's currently correct due to the bug.

If your cluster is pretty stable, i.e. not growing, shrinking, or rebalancing frequently, you probably wouldn't notice since things will just keep humming along. The replay only happens when the task/consumer has to restart, which only happens during rebalance.

The patch is already filed but still needs tests. It shouldn't be hard to get committed and unless there are unforeseen issues I'll backport it from Kafka's trunk to 0.11.0 and 0.10.2, which is when SMTs were introduced. There's no fix to be done in the ES connector, but 0.10.2 corresponds to the 3.2.x branch so it'd be included in any subsequent 3.2.x release of CP (or you can, of course, always build your own Kafka from 0.10.2 to run Connect with).

In terms of being a blocker, as mentioned above it depends a bit on how stable your cluster is. The other factor to take into account is whether you care about records being rewritten. If you are using topic+partition+offset as your key, then it generally doesn't matter since you'll rewrite the data, but the data in ES is immutable. However, if you're using the key from the Kafka record, then you'd sort of see records go back in time temporarily, then updated again when the replay is happening, which means the state in ES could be in a state that is inconsistent with the state of data in Kafka at any given time. Whether that is an issue depends a lot on your application.

from kafka-connect-elasticsearch.

zzbennett avatar zzbennett commented on July 18, 2024

Heyo. So we just enabled our full production load on our kafka connect cluster and created an ES connector on a high volume topic. We're seeing this exception on it:

[2017-10-18 16:11:56,784] ERROR WorkerSinkTask{id=aa-nginx-to-ES-0} Offset commit failed, rewinding to last committed offsets (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.kafka.connect.errors.ConnectException: Flush timeout expired with unflushed records: 2000
        at io.confluent.connect.elasticsearch.bulk.BulkProcessor.flush(BulkProcessor.java:302)
        at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:217)
        at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.flush(ElasticsearchSinkTask.java:125)
        at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:117)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:299)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:165)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
[2017-10-18 16:11:56,785] ERROR Commit of WorkerSinkTask{id=aa-nginx-to-ES-0} offsets threw an unexpected exception:  (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.kafka.connect.errors.ConnectException: Flush timeout expired with unflushed records: 2000
        at io.confluent.connect.elasticsearch.bulk.BulkProcessor.flush(BulkProcessor.java:302)
        at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:217)
        at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.flush(ElasticsearchSinkTask.java:125)
        at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:117)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:299)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:165)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

And it stops writing data.
I know the fix is scheduled to go out in 0.11.1. I wanted to throw this exception in here for posterity and to also inquire about when 0.11.1 will get released. Hopefully soon? This has become a blocker for us.

EDIT: above stack trace has nothing to do with the original issue. Related to this issue: #136

from kafka-connect-elasticsearch.

rogusdev avatar rogusdev commented on July 18, 2024

@ewencp it looks like you reviewed a PR to fix this but then it was closed without merging:
apache/kafka#3499

And the ticket @rmoff linked to is marked as Resolved/Fixed.

Is this issue supposed to be actually fixed at this point? I am still seeing these log lines (admittedly not in elastic search connector, but a custom one).

from kafka-connect-elasticsearch.

ewencp avatar ewencp commented on July 18, 2024

To clarify, as I think @rogusdev found on the upstream AK PR, this has been merged, but due to the way merging works on AK before we could merge with GitHub squash merge approach, keeping a clean, linear, single-commit-per-PR history was only possible by rebasing and closing the original PR. So the GitHub PR looks like it was just closed, but it was actually merged. Relevant merge links should be posted into the corresponding JIRA. It was backported to 0.11.0, but not 0.10.2. atm, I can't remember why, but it's likely just because it doesn't apply cleanly and need for backporting wasn't that great.

from kafka-connect-elasticsearch.

dozturk avatar dozturk commented on July 18, 2024

Hello everyone,

Can somebody please update me regarding to this issue? I have deployed 3 distributed kafka connector with hdfs sink tasks (using TimeBasedPartitioner) and I see lots of this warn messages from 2 of the workers logs:

[2019-05-06 10:29:53,525] WARN WorkerSinkTask{id=hdfsSinkConnector-101-Daily-0} Ignoring invalid task provided offset voucher-devalue-9/OffsetAndMetadata{offset=3, leaderEpoch=null, metadata=''} -- partition not assigned, assignment=[voucher-devalue-3, voucher-devalue-2, voucher-devalue-1, voucher-devalue-0] (org.apache.kafka.connect.runtime.WorkerSinkTask)
  | [2019-05-06 10:29:53,525] WARN WorkerSinkTask{id=hdfsSinkConnector-101-Daily-0} Ignoring invalid task provided offset voucher-devalue-8/OffsetAndMetadata{offset=3, leaderEpoch=null, metadata=''} -- partition not assigned, assignment=[voucher-devalue-3, voucher-devalue-2, voucher-devalue-1, voucher-devalue-0] (org.apache.kafka.connect.runtime.WorkerSinkTask)
  | [2019-05-06 10:29:53,525] WARN WorkerSinkTask{id=hdfsSinkConnector-101-Daily-0} Ignoring invalid task provided offset voucher-devalue-7/OffsetAndMetadata{offset=2, leaderEpoch=null, metadata=''} -- partition not assigned, assignment=[voucher-devalue-3, voucher-devalue-2, voucher-devalue-1, voucher-devalue-0] (org.apache.kafka.connect.runtime.WorkerSinkTask)
  | [2019-05-06 10:29:53,525] WARN WorkerSinkTask{id=hdfsSinkConnector-101-Daily-0} Ignoring invalid task provided offset voucher-devalue-6/OffsetAndMetadata{offset=4, leaderEpoch=null, metadata=''} -- partition not assigned, assignment=[voucher-devalue-3, voucher-devalue-2, voucher-devalue-1, voucher-devalue-0] (org.apache.kafka.connect.runtime.WorkerSinkTask)
  | [2019-05-06 10:29:53,525] WARN WorkerSinkTask{id=hdfsSinkConnector-101-Daily-0} Ignoring invalid task provided offset voucher-devalue-5/OffsetAndMetadata{offset=2, leaderEpoch=null, metadata=''} -- partition not assigned, assignment=[voucher-devalue-3, voucher-devalue-2, voucher-devalue-1, voucher-devalue-0] (org.apache.kafka.connect.runtime.WorkerSinkTask)
  | [2019-05-06 10:29:53,525] WARN WorkerSinkTask{id=hdfsSinkConnector-101-Daily-0} Ignoring invalid task provided offset voucher-devalue-4/OffsetAndMetadata{offset=1, leaderEpoch=null, metadata=''} -- partition not assigned, assignment=[voucher-devalue-3, voucher-devalue-2, voucher-devalue-1, voucher-devalue-0] (org.apache.kafka.connect.runtime.WorkerSinkTask)

from kafka-connect-elasticsearch.

levzem avatar levzem commented on July 18, 2024

@dozturk please use an updated version of CP and the connector as this issue was resolved

closing because was framework issue and was fixed

from kafka-connect-elasticsearch.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.