Comments (18)
So far, so good - working with zero errors! Thanks @shikhar.
$ curl -X GET http://localhost:9200/_cat/indices|grep soe
green open soe.card_details 1 0 4078 0 1.2mb 1.2mb
green open soe.inventories 1 0 28651 0 5mb 5mb
green open soe.addresses 1 0 4128 0 1.1mb 1.1mb
green open soe.warehouses 1 0 0 0 159b 159b
green open soe.order_items 1 0 29233 0 5.6mb 5.6mb
green open soe.logon 1 0 29735 0 7mb 7mb
green open soe.orders 1 0 23564 0 6.8mb 6.8mb
green open soe.orderentry_metadata 1 0 0 0 159b 159b
green open soe.product_information 1 0 0 0 159b 159b
green open soe.product_descriptions 1 0 0 0 159b 159b
green open soe.customers 1 0 4128 0 2mb 2mb
from kafka-connect-elasticsearch.
Hello @joseluisillana
I have the exactly same issue which you mentioned. The performance of the elasticsearch sink connector reduces over time until it hits flush time out expired issues. Was this resolved? If so, could you just post what config changes resolved it.
from kafka-connect-elasticsearch.
I'm getting the same error too. The data is showing up in Elasticsearch though, and there are no errors in the Elasticsearch log.
Config files: https://gist.github.com/rmoff/0e228583b6b00a4d06e656c9fd7f64d2
With DEBUG enabled: (full log here)
[2016-09-01 13:47:09,247] DEBUG [exchange: 12] Connection allocated: CPoolProxy{http-outgoing-1 [ACTIVE]} (org.apache.http.impl.nio.client.InternalHttpAsyncClient:307)
[I/O dispatcher 1] ERROR io.searchbox.client.http.JestHttpClient - Exception occurred during async execution.
[2016-09-01 13:47:09,247] DEBUG http-outgoing-2 192.168.10.109:51032<->192.168.10.118:9200[ACTIVE][r:]: Set attribute http.nio.exchange-handler (org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionImpl:182)
[2016-09-01 13:47:09,200] DEBUG [exchange: 156] Request connection for {}->http://elastic-01-node-01:9200 (org.apache.http.impl.nio.client.InternalHttpAsyncClient:352)
[2016-09-01 13:47:09,200] DEBUG Auth cache not set in the context (org.apache.http.client.protocol.RequestAuthCache:76)
java.net.SocketTimeoutException
[2016-09-01 13:47:09,200] DEBUG [exchange: 155] Request connection for {}->http://elastic-01-node-01:9200 (org.apache.http.impl.nio.client.InternalHttpAsyncClient:352)
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:371)
at org.apache.http.impl.nio.client.InternalRequestExecutor.timeout(InternalRequestExecutor.java:116)
at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)
at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)
at org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:177)
[2016-09-01 13:47:09,248] DEBUG [exchange: 158] Request connection for {}->http://elastic-01-node-01:9200 (org.apache.http.impl.nio.client.InternalHttpAsyncClient:352)
at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:265)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:494)
at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:215)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:282)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:106)
[2016-09-01 13:47:09,248] DEBUG http-outgoing-2 192.168.10.109:51032<->192.168.10.118:9200[ACTIVE][rw:]: Event set [w] (org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionImpl:108)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:590)
at java.lang.Thread.run(Thread.java:745)
[2016-09-01 13:47:09,248] DEBUG http-outgoing-2 192.168.10.109:51032<->192.168.10.118:9200[ACTIVE][rw:]: Set timeout 3000 (org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionImpl:154)
[I/O dispatcher 1] ERROR io.confluent.connect.elasticsearch.internals.BulkProcessor - Failed to execute the batch
java.net.SocketTimeoutException
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:371)
at org.apache.http.impl.nio.client.InternalRequestExecutor.timeout(InternalRequestExecutor.java:116)
at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)
[2016-09-01 13:47:09,248] DEBUG http-outgoing-2 [ACTIVE]: Connected (org.apache.http.impl.nio.client.InternalIODispatch:54)
at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)
at org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:177)
at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:265)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:494)
at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:215)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:282)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:106)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:590)
[2016-09-01 13:47:09,248] DEBUG Connection request: [route: {}->http://elastic-01-node-01:9200][total kept alive: 0; route allocated: 2 of 2; total allocated: 2 of 20] (org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager:249)
at java.lang.Thread.run(Thread.java:745)
[I/O dispatcher 1] ERROR io.confluent.connect.elasticsearch.internals.BulkProcessor - Failed to execute the batch, retry or fail
java.net.SocketTimeoutException
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:371)
at org.apache.http.impl.nio.client.InternalRequestExecutor.timeout(InternalRequestExecutor.java:116)
at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)
[...]
[2016-09-01 13:47:12,145] DEBUG http-outgoing-2 [ACTIVE] Request ready (org.apache.http.impl.nio.client.InternalIODispatch:71)
[2016-09-01 13:47:12,145] DEBUG http-outgoing-2 192.168.10.109:51032<->192.168.10.118:9200[ACTIVE][r:w]: Event cleared [w] (org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionImpl:116)
[pool-2-thread-1] ERROR org.apache.kafka.connect.runtime.WorkerSinkTask - Commit of WorkerSinkTask{id=elasticsearch-sink-0} offsets failed due to exception while flushing:
org.apache.kafka.connect.errors.ConnectException: Flush failed with non retriable exception.
at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:319)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.flush(ElasticsearchSinkTask.java:137)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:275)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:155)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.connect.errors.ConnectException: Cannot finish flush messages within 10000
at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:314)
... 11 more
[pool-2-thread-1] ERROR org.apache.kafka.connect.runtime.WorkerSinkTask - Rewinding offsets to last committed offsets
[pool-2-thread-1] ERROR org.apache.kafka.connect.runtime.WorkerSinkTask - Commit of WorkerSinkTask{id=elasticsearch-sink-0} offsets threw an unexpected exception:
org.apache.kafka.connect.errors.ConnectException: Flush failed with non retriable exception.
at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:319)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.flush(ElasticsearchSinkTask.java:137)
from kafka-connect-elasticsearch.
Can you try setting flush.timeout.ms
to a higher value? It appears the default of 10000 is too low in your usage.
from kafka-connect-elasticsearch.
Since the default batch.size
and max.buffered.records
are a lot lower now, flushing shouldn't take as long. Please reopen if this continues to be an issue.
from kafka-connect-elasticsearch.
I bumped up flush.timeout.ms
to 100000 and also set batch.size
=2000 and max.buffered.records
=20000, but still get errors.
[pool-2-thread-1] ERROR org.apache.kafka.connect.runtime.WorkerSinkTask - Commit of WorkerSinkTask{id=elasticsearch-sink3-0} offsets failed due to exception while flushing:
org.apache.kafka.connect.errors.ConnectException: Flush failed with non retriable exception.
at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:319)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.flush(ElasticsearchSinkTask.java:137)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:275)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:155)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.connect.errors.ConnectException: Cannot finish flush messages within 100000
at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:314)
... 11 more
[pool-2-thread-1] ERROR org.apache.kafka.connect.runtime.WorkerSinkTask - Rewinding offsets to last committed offsets
[pool-2-thread-1] ERROR org.apache.kafka.connect.runtime.WorkerSinkTask - Commit of WorkerSinkTask{id=elasticsearch-sink3-0} offsets threw an unexpected exception:
org.apache.kafka.connect.errors.ConnectException: Flush failed with non retriable exception.
at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:319)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.flush(ElasticsearchSinkTask.java:137)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:275)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:155)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.connect.errors.ConnectException: Cannot finish flush messages within 100000
at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:314)
... 11 more
[pool-2-thread-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking the coordinator confluent-01-node-01.moffatt.me:9092 (id: 2147483647 rack: null) dead for group connect-elasticsearch-sink3
[pool-2-thread-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered coordinator confluent-01-node-01.moffatt.me:9092 (id: 2147483647 rack: null) for group connect-elasticsearch-sink3.
The system load on both kafka and Elasticsearch servers is minimal. Let me know if there's any other useful info that I can provide.
from kafka-connect-elasticsearch.
@shikhar We might want to add some trace level logging to BulkProcessor
for every completed batch (we seem to log exceptions but not successes) and maybe add some info about how much work is outstanding to the exception message here or to the log messages we add. It seems even with logs from a failed case we may not be able to determine what's going wrong yet.
from kafka-connect-elasticsearch.
@srijiths @rmoff I would appreciate if you can report your experience since the merge of #26 - I am hopeful of the issues you ran into being resolved.
from kafka-connect-elasticsearch.
@shikhar Thank you
from kafka-connect-elasticsearch.
I am facing the same SocketTimeoutException issue connecting to AWS ElasticSearch service. Any update on What fixed this issue?
(io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.execute:356) - Failed to execute batch 3 of 1 records, retrying after 1000 ms
java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
at sun.security.ssl.InputRecord.read(InputRecord.java:503)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:973)
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:930)
at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
at org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:139)
at org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:155)
at org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:284)
at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:140)
at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57)
at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:261)
at org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:165)
at org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:167)
at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:272)
at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:124)
at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:271)
at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:184)
at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:88)
at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:184)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:107)
at io.searchbox.client.http.JestHttpClient.execute(JestHttpClient.java:47)
at io.confluent.connect.elasticsearch.BulkIndexingClient.execute(BulkIndexingClient.java:58)
at io.confluent.connect.elasticsearch.BulkIndexingClient.execute(BulkIndexingClient.java:
My configuration for the connector are 👍
{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": 1,
"topics": "prints",
"batch.size": 2000,
"max.buffered.records": 20000,
"topic.index.map": "prints:prints",
"type.name" : "main",
"linger.ms": 1,
"key.ignore": true,
"schema.ignore": true,
"flush.timeout.ms" : 100000,
"max.buffered.records" : 2000,
"max.retries" : 5000,
"retry.backoff.ms" : 1000
}
}
from kafka-connect-elasticsearch.
@tnegi7519 for me the code fix that @shikhar posted was the resolution. Have you checked that from your Kafka machine you can access the AWS Elasticsearch endpoint port successfully?
from kafka-connect-elasticsearch.
@tnegi7519 seems like you are running into the 3s default read timeout #27
from kafka-connect-elasticsearch.
Hi, I have the same problem that @rmoff , and I'm not be able to get Kafka Connect stable during a long period of time because that.
@rmoff Can you tell me what was your solution please?
from kafka-connect-elasticsearch.
from kafka-connect-elasticsearch.
Hi, @rmoff.
Sorry I have done the wrong question :).
I notice that the fixes in code written by @shikhar helps you a lot. I have the code updated too, but it seems that I have some additional problems because I get the same type of exceptions than you have again.
Could you tell me what is the throughput for input/output from Kafka Connect to ElasticSearch to compare with mine? Only for discard that my problem is about the amount of data or misconfiguration in my connector.
I ask you that because for me, Connect runs well at first time and when time pass, the performance goes down and down.. , reducing the amount of data flushed to ES to a minimal portion of it, and then this error traces appears on logs.
I am in a dead end way, please if someone can bring me some idea to beat this problem it would be very appreciated.
Thanks in advance¡
from kafka-connect-elasticsearch.
@joseluisillana Sounds like you might be hitting some sort of leak, maybe a heap size/GC issue. Can you share more about your setup? It will be hard to suggest changes without any logs, configs, etc.
from kafka-connect-elasticsearch.
@deeptiantony same here
from kafka-connect-elasticsearch.
@joseluisillana @deeptiantony @costa As mentioned, any logs, configs, GC logs, etc will probably be needed to help track this down.
from kafka-connect-elasticsearch.
Related Issues (20)
- How to Convert JSON String field to ES Object?
- Capture Kafka key without using it as ID HOT 2
- Suggestion for INSERT operation "Ignoring EXTERNAL version conflict for operation INDEX on document"
- Used Elastic Java REST client is deprecated in 7.15.0 HOT 1
- Error with `"behavior.on.null.values": "delete"`
- Consumer paused indefinitely when using `AsyncOffsetTracker` with lot of null values
- Cannot use data stream with time_series mode HOT 2
- Error: Cannot infer mapping without schema HOT 1
- Connector fails with payloads >20 MB HOT 1
- Can't create a connector even if its loaded in Strimzi
- Support requests per second configuration options
- Log when there are too many requests errors
- [BUG] `TOO_MANY_REQUESTS` error craches the tasks with a unrecoverable exceptions without retries
- Ignore 'document_parsing_exception' HOT 1
- Inconsistent Logging for Tombstone Messages in Elastic Sink Connector
- abnormal data loss question
- Data Stream naming is far too restrictive HOT 1
- Creating index based on Timestamp doesn't work
- Limit retry backoff (and unlimited retries) HOT 4
- add support for index templates other than logs and metrics as types when using data streams
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from kafka-connect-elasticsearch.