Giter Site home page Giter Site logo

Comments (18)

rmoff avatar rmoff commented on July 18, 2024 2

So far, so good - working with zero errors! Thanks @shikhar.

$ curl -X GET http://localhost:9200/_cat/indices|grep soe

green  open soe.card_details         1 0  4078 0 1.2mb 1.2mb
green  open soe.inventories          1 0 28651 0   5mb   5mb
green  open soe.addresses            1 0  4128 0 1.1mb 1.1mb
green  open soe.warehouses           1 0     0 0  159b  159b
green  open soe.order_items          1 0 29233 0 5.6mb 5.6mb
green  open soe.logon                1 0 29735 0   7mb   7mb
green  open soe.orders               1 0 23564 0 6.8mb 6.8mb
green  open soe.orderentry_metadata  1 0     0 0  159b  159b
green  open soe.product_information  1 0     0 0  159b  159b
green  open soe.product_descriptions 1 0     0 0  159b  159b
green  open soe.customers            1 0  4128 0   2mb   2mb

from kafka-connect-elasticsearch.

deeptiantony avatar deeptiantony commented on July 18, 2024 2

Hello @joseluisillana
I have the exactly same issue which you mentioned. The performance of the elasticsearch sink connector reduces over time until it hits flush time out expired issues. Was this resolved? If so, could you just post what config changes resolved it.

from kafka-connect-elasticsearch.

rmoff avatar rmoff commented on July 18, 2024

I'm getting the same error too. The data is showing up in Elasticsearch though, and there are no errors in the Elasticsearch log.

Config files: https://gist.github.com/rmoff/0e228583b6b00a4d06e656c9fd7f64d2

With DEBUG enabled: (full log here)

[2016-09-01 13:47:09,247] DEBUG [exchange: 12] Connection allocated: CPoolProxy{http-outgoing-1 [ACTIVE]} (org.apache.http.impl.nio.client.InternalHttpAsyncClient:307)
[I/O dispatcher 1] ERROR io.searchbox.client.http.JestHttpClient - Exception occurred during async execution.
[2016-09-01 13:47:09,247] DEBUG http-outgoing-2 192.168.10.109:51032<->192.168.10.118:9200[ACTIVE][r:]: Set attribute http.nio.exchange-handler (org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionImpl:182)
[2016-09-01 13:47:09,200] DEBUG [exchange: 156] Request connection for {}->http://elastic-01-node-01:9200 (org.apache.http.impl.nio.client.InternalHttpAsyncClient:352)
[2016-09-01 13:47:09,200] DEBUG Auth cache not set in the context (org.apache.http.client.protocol.RequestAuthCache:76)
java.net.SocketTimeoutException
[2016-09-01 13:47:09,200] DEBUG [exchange: 155] Request connection for {}->http://elastic-01-node-01:9200 (org.apache.http.impl.nio.client.InternalHttpAsyncClient:352)
        at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:371)
        at org.apache.http.impl.nio.client.InternalRequestExecutor.timeout(InternalRequestExecutor.java:116)
        at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)
        at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)
        at org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:177)
[2016-09-01 13:47:09,248] DEBUG [exchange: 158] Request connection for {}->http://elastic-01-node-01:9200 (org.apache.http.impl.nio.client.InternalHttpAsyncClient:352)
        at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:265)
        at org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:494)
        at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:215)
        at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:282)
        at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:106)
[2016-09-01 13:47:09,248] DEBUG http-outgoing-2 192.168.10.109:51032<->192.168.10.118:9200[ACTIVE][rw:]: Event set [w] (org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionImpl:108)
        at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:590)
        at java.lang.Thread.run(Thread.java:745)
[2016-09-01 13:47:09,248] DEBUG http-outgoing-2 192.168.10.109:51032<->192.168.10.118:9200[ACTIVE][rw:]: Set timeout 3000 (org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionImpl:154)
[I/O dispatcher 1] ERROR io.confluent.connect.elasticsearch.internals.BulkProcessor - Failed to execute the batch
java.net.SocketTimeoutException
        at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:371)
        at org.apache.http.impl.nio.client.InternalRequestExecutor.timeout(InternalRequestExecutor.java:116)
        at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)
[2016-09-01 13:47:09,248] DEBUG http-outgoing-2 [ACTIVE]: Connected (org.apache.http.impl.nio.client.InternalIODispatch:54)
        at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)
        at org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:177)
        at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:265)
        at org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:494)
        at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:215)
        at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:282)
        at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:106)
        at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:590)
[2016-09-01 13:47:09,248] DEBUG Connection request: [route: {}->http://elastic-01-node-01:9200][total kept alive: 0; route allocated: 2 of 2; total allocated: 2 of 20] (org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager:249)
        at java.lang.Thread.run(Thread.java:745)
[I/O dispatcher 1] ERROR io.confluent.connect.elasticsearch.internals.BulkProcessor - Failed to execute the batch, retry or fail
java.net.SocketTimeoutException
        at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:371)
        at org.apache.http.impl.nio.client.InternalRequestExecutor.timeout(InternalRequestExecutor.java:116)
        at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)
[...]
[2016-09-01 13:47:12,145] DEBUG http-outgoing-2 [ACTIVE] Request ready (org.apache.http.impl.nio.client.InternalIODispatch:71)
[2016-09-01 13:47:12,145] DEBUG http-outgoing-2 192.168.10.109:51032<->192.168.10.118:9200[ACTIVE][r:w]: Event cleared [w] (org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionImpl:116)
[pool-2-thread-1] ERROR org.apache.kafka.connect.runtime.WorkerSinkTask - Commit of WorkerSinkTask{id=elasticsearch-sink-0} offsets failed due to exception while flushing:
org.apache.kafka.connect.errors.ConnectException: Flush failed with non retriable exception.
        at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:319)
        at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.flush(ElasticsearchSinkTask.java:137)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:275)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:155)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.connect.errors.ConnectException: Cannot finish flush messages within 10000
        at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:314)
        ... 11 more
[pool-2-thread-1] ERROR org.apache.kafka.connect.runtime.WorkerSinkTask - Rewinding offsets to last committed offsets
[pool-2-thread-1] ERROR org.apache.kafka.connect.runtime.WorkerSinkTask - Commit of WorkerSinkTask{id=elasticsearch-sink-0} offsets threw an unexpected exception:
org.apache.kafka.connect.errors.ConnectException: Flush failed with non retriable exception.
        at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:319)
        at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.flush(ElasticsearchSinkTask.java:137)

from kafka-connect-elasticsearch.

shikhar avatar shikhar commented on July 18, 2024

Can you try setting flush.timeout.ms to a higher value? It appears the default of 10000 is too low in your usage.

from kafka-connect-elasticsearch.

shikhar avatar shikhar commented on July 18, 2024

Since the default batch.size and max.buffered.records are a lot lower now, flushing shouldn't take as long. Please reopen if this continues to be an issue.

from kafka-connect-elasticsearch.

rmoff avatar rmoff commented on July 18, 2024

I bumped up flush.timeout.ms to 100000 and also set batch.size=2000 and max.buffered.records=20000, but still get errors.

[pool-2-thread-1] ERROR org.apache.kafka.connect.runtime.WorkerSinkTask - Commit of WorkerSinkTask{id=elasticsearch-sink3-0} offsets failed due to exception while flushing:
org.apache.kafka.connect.errors.ConnectException: Flush failed with non retriable exception.
        at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:319)
        at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.flush(ElasticsearchSinkTask.java:137)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:275)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:155)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.connect.errors.ConnectException: Cannot finish flush messages within 100000
        at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:314)
        ... 11 more
[pool-2-thread-1] ERROR org.apache.kafka.connect.runtime.WorkerSinkTask - Rewinding offsets to last committed offsets
[pool-2-thread-1] ERROR org.apache.kafka.connect.runtime.WorkerSinkTask - Commit of WorkerSinkTask{id=elasticsearch-sink3-0} offsets threw an unexpected exception:
org.apache.kafka.connect.errors.ConnectException: Flush failed with non retriable exception.
        at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:319)
        at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.flush(ElasticsearchSinkTask.java:137)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:275)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:155)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.connect.errors.ConnectException: Cannot finish flush messages within 100000
        at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:314)
        ... 11 more
[pool-2-thread-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking the coordinator confluent-01-node-01.moffatt.me:9092 (id: 2147483647 rack: null) dead for group connect-elasticsearch-sink3
[pool-2-thread-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered coordinator confluent-01-node-01.moffatt.me:9092 (id: 2147483647 rack: null) for group connect-elasticsearch-sink3.

The system load on both kafka and Elasticsearch servers is minimal. Let me know if there's any other useful info that I can provide.

from kafka-connect-elasticsearch.

ewencp avatar ewencp commented on July 18, 2024

@shikhar We might want to add some trace level logging to BulkProcessor for every completed batch (we seem to log exceptions but not successes) and maybe add some info about how much work is outstanding to the exception message here or to the log messages we add. It seems even with logs from a failed case we may not be able to determine what's going wrong yet.

from kafka-connect-elasticsearch.

shikhar avatar shikhar commented on July 18, 2024

@srijiths @rmoff I would appreciate if you can report your experience since the merge of #26 - I am hopeful of the issues you ran into being resolved.

from kafka-connect-elasticsearch.

srijiths avatar srijiths commented on July 18, 2024

@shikhar Thank you

from kafka-connect-elasticsearch.

negi-tribhuwan avatar negi-tribhuwan commented on July 18, 2024

I am facing the same SocketTimeoutException issue connecting to AWS ElasticSearch service. Any update on What fixed this issue?
(io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.execute:356) - Failed to execute batch 3 of 1 records, retrying after 1000 ms
java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
at sun.security.ssl.InputRecord.read(InputRecord.java:503)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:973)
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:930)
at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
at org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:139)
at org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:155)
at org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:284)
at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:140)
at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57)
at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:261)
at org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:165)
at org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:167)
at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:272)
at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:124)
at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:271)
at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:184)
at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:88)
at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:184)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:107)
at io.searchbox.client.http.JestHttpClient.execute(JestHttpClient.java:47)
at io.confluent.connect.elasticsearch.BulkIndexingClient.execute(BulkIndexingClient.java:58)

at io.confluent.connect.elasticsearch.BulkIndexingClient.execute(BulkIndexingClient.java:

My configuration for the connector are 👍
{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": 1,
"topics": "prints",

"batch.size": 2000,
"max.buffered.records": 20000,
"topic.index.map": "prints:prints",
"type.name" : "main",
"linger.ms": 1,
"key.ignore": true,
"schema.ignore": true,
"flush.timeout.ms" : 100000,
"max.buffered.records" : 2000,
"max.retries" : 5000,
"retry.backoff.ms" : 1000

}
}

from kafka-connect-elasticsearch.

rmoff avatar rmoff commented on July 18, 2024

@tnegi7519 for me the code fix that @shikhar posted was the resolution. Have you checked that from your Kafka machine you can access the AWS Elasticsearch endpoint port successfully?

from kafka-connect-elasticsearch.

shikhar avatar shikhar commented on July 18, 2024

@tnegi7519 seems like you are running into the 3s default read timeout #27

from kafka-connect-elasticsearch.

joseluisillana avatar joseluisillana commented on July 18, 2024

Hi, I have the same problem that @rmoff , and I'm not be able to get Kafka Connect stable during a long period of time because that.

@rmoff Can you tell me what was your solution please?

from kafka-connect-elasticsearch.

rmoff avatar rmoff commented on July 18, 2024

@joseluisillana see above ⬆️

from kafka-connect-elasticsearch.

joseluisillana avatar joseluisillana commented on July 18, 2024

Hi, @rmoff.
Sorry I have done the wrong question :).
I notice that the fixes in code written by @shikhar helps you a lot. I have the code updated too, but it seems that I have some additional problems because I get the same type of exceptions than you have again.

Could you tell me what is the throughput for input/output from Kafka Connect to ElasticSearch to compare with mine? Only for discard that my problem is about the amount of data or misconfiguration in my connector.

I ask you that because for me, Connect runs well at first time and when time pass, the performance goes down and down.. , reducing the amount of data flushed to ES to a minimal portion of it, and then this error traces appears on logs.

I am in a dead end way, please if someone can bring me some idea to beat this problem it would be very appreciated.

Thanks in advance¡

from kafka-connect-elasticsearch.

ewencp avatar ewencp commented on July 18, 2024

@joseluisillana Sounds like you might be hitting some sort of leak, maybe a heap size/GC issue. Can you share more about your setup? It will be hard to suggest changes without any logs, configs, etc.

from kafka-connect-elasticsearch.

costa avatar costa commented on July 18, 2024

@deeptiantony same here

from kafka-connect-elasticsearch.

ewencp avatar ewencp commented on July 18, 2024

@joseluisillana @deeptiantony @costa As mentioned, any logs, configs, GC logs, etc will probably be needed to help track this down.

from kafka-connect-elasticsearch.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.