Comments (9)
Related: https://issues.apache.org/jira/browse/KAFKA-5567
from kafka-connect-elasticsearch.
Yeah, in the case of the ES connector, this is really a framework issue when a transformation is used that modifies the topic since the ES connector relies on the framework's offset commits. This is something you should be concerned about since offsets for any topic partitions that change their names (presumably all of them in the case of TimestampRouter
) won't be committed, which means upon failure and/or rebalance, data will be replayed and re-delivered to Elastic.
So at the moment, I wouldn't combine these routers with sink connectors. The good news is that the fix for sink connectors like ES look like they won't be a problem (you can see there is already a patch).
from kafka-connect-elasticsearch.
@ewencp thanks for the details. So to be crystal clear: when running the ES sink connector with the TimestampRouter, it can't commit its offsets at all to kafka? If I restart the connect cluster, the ES connectors will consume from the latest offset/earliest offset (depending on configs) because it has no offsets committed in kafka?
Having data get replayed to ES is not such a big deal since the offset is encoded in the ES id itself and all the duplicate writes will be idempotent. I've been running the ES connector with the TimestampRouter for a few months now and have not had issues with lost or duplicate data yet, which is why I wasn't sure if this warning was as bad as it sounds.
We were planning to deploy the ES connector to production in the next two-three weeks. Do you know if the patch will be merged and released before then? And will the fix be part of a bug fix release for 3.2.x? Or will it only appear in later versions? Sorry for the barrage of questions, just trying to figure out if this will be a blocker for us or not.
from kafka-connect-elasticsearch.
@zzbennett That's currently correct due to the bug.
If your cluster is pretty stable, i.e. not growing, shrinking, or rebalancing frequently, you probably wouldn't notice since things will just keep humming along. The replay only happens when the task/consumer has to restart, which only happens during rebalance.
The patch is already filed but still needs tests. It shouldn't be hard to get committed and unless there are unforeseen issues I'll backport it from Kafka's trunk to 0.11.0 and 0.10.2, which is when SMTs were introduced. There's no fix to be done in the ES connector, but 0.10.2 corresponds to the 3.2.x branch so it'd be included in any subsequent 3.2.x release of CP (or you can, of course, always build your own Kafka from 0.10.2 to run Connect with).
In terms of being a blocker, as mentioned above it depends a bit on how stable your cluster is. The other factor to take into account is whether you care about records being rewritten. If you are using topic+partition+offset
as your key, then it generally doesn't matter since you'll rewrite the data, but the data in ES is immutable. However, if you're using the key from the Kafka record, then you'd sort of see records go back in time temporarily, then updated again when the replay is happening, which means the state in ES could be in a state that is inconsistent with the state of data in Kafka at any given time. Whether that is an issue depends a lot on your application.
from kafka-connect-elasticsearch.
Heyo. So we just enabled our full production load on our kafka connect cluster and created an ES connector on a high volume topic. We're seeing this exception on it:
[2017-10-18 16:11:56,784] ERROR WorkerSinkTask{id=aa-nginx-to-ES-0} Offset commit failed, rewinding to last committed offsets (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.kafka.connect.errors.ConnectException: Flush timeout expired with unflushed records: 2000
at io.confluent.connect.elasticsearch.bulk.BulkProcessor.flush(BulkProcessor.java:302)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:217)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.flush(ElasticsearchSinkTask.java:125)
at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:117)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:299)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:165)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
[2017-10-18 16:11:56,785] ERROR Commit of WorkerSinkTask{id=aa-nginx-to-ES-0} offsets threw an unexpected exception: (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.kafka.connect.errors.ConnectException: Flush timeout expired with unflushed records: 2000
at io.confluent.connect.elasticsearch.bulk.BulkProcessor.flush(BulkProcessor.java:302)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:217)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.flush(ElasticsearchSinkTask.java:125)
at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:117)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:299)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:165)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
And it stops writing data.
I know the fix is scheduled to go out in 0.11.1. I wanted to throw this exception in here for posterity and to also inquire about when 0.11.1 will get released. Hopefully soon? This has become a blocker for us.
EDIT: above stack trace has nothing to do with the original issue. Related to this issue: #136
from kafka-connect-elasticsearch.
@ewencp it looks like you reviewed a PR to fix this but then it was closed without merging:
apache/kafka#3499
And the ticket @rmoff linked to is marked as Resolved/Fixed.
Is this issue supposed to be actually fixed at this point? I am still seeing these log lines (admittedly not in elastic search connector, but a custom one).
from kafka-connect-elasticsearch.
To clarify, as I think @rogusdev found on the upstream AK PR, this has been merged, but due to the way merging works on AK before we could merge with GitHub squash merge approach, keeping a clean, linear, single-commit-per-PR history was only possible by rebasing and closing the original PR. So the GitHub PR looks like it was just closed, but it was actually merged. Relevant merge links should be posted into the corresponding JIRA. It was backported to 0.11.0, but not 0.10.2. atm, I can't remember why, but it's likely just because it doesn't apply cleanly and need for backporting wasn't that great.
from kafka-connect-elasticsearch.
Hello everyone,
Can somebody please update me regarding to this issue? I have deployed 3 distributed kafka connector with hdfs sink tasks (using TimeBasedPartitioner) and I see lots of this warn messages from 2 of the workers logs:
[2019-05-06 10:29:53,525] WARN WorkerSinkTask{id=hdfsSinkConnector-101-Daily-0} Ignoring invalid task provided offset voucher-devalue-9/OffsetAndMetadata{offset=3, leaderEpoch=null, metadata=''} -- partition not assigned, assignment=[voucher-devalue-3, voucher-devalue-2, voucher-devalue-1, voucher-devalue-0] (org.apache.kafka.connect.runtime.WorkerSinkTask)
| [2019-05-06 10:29:53,525] WARN WorkerSinkTask{id=hdfsSinkConnector-101-Daily-0} Ignoring invalid task provided offset voucher-devalue-8/OffsetAndMetadata{offset=3, leaderEpoch=null, metadata=''} -- partition not assigned, assignment=[voucher-devalue-3, voucher-devalue-2, voucher-devalue-1, voucher-devalue-0] (org.apache.kafka.connect.runtime.WorkerSinkTask)
| [2019-05-06 10:29:53,525] WARN WorkerSinkTask{id=hdfsSinkConnector-101-Daily-0} Ignoring invalid task provided offset voucher-devalue-7/OffsetAndMetadata{offset=2, leaderEpoch=null, metadata=''} -- partition not assigned, assignment=[voucher-devalue-3, voucher-devalue-2, voucher-devalue-1, voucher-devalue-0] (org.apache.kafka.connect.runtime.WorkerSinkTask)
| [2019-05-06 10:29:53,525] WARN WorkerSinkTask{id=hdfsSinkConnector-101-Daily-0} Ignoring invalid task provided offset voucher-devalue-6/OffsetAndMetadata{offset=4, leaderEpoch=null, metadata=''} -- partition not assigned, assignment=[voucher-devalue-3, voucher-devalue-2, voucher-devalue-1, voucher-devalue-0] (org.apache.kafka.connect.runtime.WorkerSinkTask)
| [2019-05-06 10:29:53,525] WARN WorkerSinkTask{id=hdfsSinkConnector-101-Daily-0} Ignoring invalid task provided offset voucher-devalue-5/OffsetAndMetadata{offset=2, leaderEpoch=null, metadata=''} -- partition not assigned, assignment=[voucher-devalue-3, voucher-devalue-2, voucher-devalue-1, voucher-devalue-0] (org.apache.kafka.connect.runtime.WorkerSinkTask)
| [2019-05-06 10:29:53,525] WARN WorkerSinkTask{id=hdfsSinkConnector-101-Daily-0} Ignoring invalid task provided offset voucher-devalue-4/OffsetAndMetadata{offset=1, leaderEpoch=null, metadata=''} -- partition not assigned, assignment=[voucher-devalue-3, voucher-devalue-2, voucher-devalue-1, voucher-devalue-0] (org.apache.kafka.connect.runtime.WorkerSinkTask)
from kafka-connect-elasticsearch.
@dozturk please use an updated version of CP and the connector as this issue was resolved
closing because was framework issue and was fixed
from kafka-connect-elasticsearch.
Related Issues (20)
- How to Convert JSON String field to ES Object?
- Capture Kafka key without using it as ID HOT 2
- Suggestion for INSERT operation "Ignoring EXTERNAL version conflict for operation INDEX on document"
- Used Elastic Java REST client is deprecated in 7.15.0 HOT 1
- Error with `"behavior.on.null.values": "delete"`
- Consumer paused indefinitely when using `AsyncOffsetTracker` with lot of null values
- Cannot use data stream with time_series mode HOT 2
- Error: Cannot infer mapping without schema HOT 1
- Connector fails with payloads >20 MB HOT 1
- Can't create a connector even if its loaded in Strimzi
- Support requests per second configuration options
- Log when there are too many requests errors
- [BUG] `TOO_MANY_REQUESTS` error craches the tasks with a unrecoverable exceptions without retries
- Ignore 'document_parsing_exception' HOT 1
- Inconsistent Logging for Tombstone Messages in Elastic Sink Connector
- abnormal data loss question
- Data Stream naming is far too restrictive HOT 1
- Creating index based on Timestamp doesn't work
- Limit retry backoff (and unlimited retries) HOT 4
- add support for index templates other than logs and metrics as types when using data streams
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from kafka-connect-elasticsearch.