Giter Site home page Giter Site logo

Comments (12)

ewencp avatar ewencp commented on July 18, 2024 1

@idarlington topic+partition+offset should be unique within a Kafka cluster -- that is how you uniquely identify a topic which is why the connector uses that. You could have conflicting IDs if you're pulling data from two Kafka clusters where the same topic name is used. If this is the cause of your data loss, you could add a RegexpRouter transformation to your connector to ensure the topic name is prepended with a cluster name as well so you'll have globally unique topic names.

Currently the ID is controlled by the key.ignore setting. If it is true then you get the topic+partition+offset format. If it is false, the ID will be the Kafka key, which allows you to update documents in Elasticsearch. I don't think there's a way to just use Elasticsearch IDs currently, though I think that's the behavior you would see if your data doesn't have keys.

from kafka-connect-elasticsearch.

kelbyloden avatar kelbyloden commented on July 18, 2024 1

See the note I added on issue #139 for a very minor code change I made to support allowing null document keys and thus auto-generating the ids in Elasticsearch.
#139 (comment)
I've been running this for several months now at an average rate of 2.5 billion messages per day with no issues.

from kafka-connect-elasticsearch.

idarlington avatar idarlington commented on July 18, 2024

Thanks @ewencp.

I am currently operating a single cluster. I noticed that the id is now topic+partition+logsize.

From the offsetchecker it seems the offset is not updating.

Group           Topic                          Pid Offset          logSize         Lag             Owner
my_group        gritServer                     0   81848           142559          60711           none

Also, if the server is restarted does the offset value change. I currently have my log.dir in tmp

Finally can you point me to examples of RegexpRouter transformation. Thank you.

from kafka-connect-elasticsearch.

ewencp avatar ewencp commented on July 18, 2024

Interesting that the offset does not seem to be correct. I'm looking at the code where that ID is generated and it's definitely using the Kafka offset for the last part. That wouldn't rule out an issue in the framework, but I don't think anything has changed there. Is the logSize and Lag from that command increasing? If the offset isn't, that would imply that offsets are not being successfully committed. Could you check your Connect logs to see if there are any errors or messages about offset commit failing?

I think it's probably not your problem since you seem to have tracked the issue down to the offset, but although the docs don't have an example of the regex router, I can give a quick example here:

transforms=Rename
transforms.Rename.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.Rename.regex=(.*)
transforms.Rename.replacement=prefix-$1

That will prefix the topic names with a constant prefix before the connector processes them.

Also, note that log.dir in tmp is a bad idea. It works fine when you're just developing/testing locally and don't care about the data, but depending on how your /tmp is managed, files could seemingly randomly be deleted and data would disappear.

from kafka-connect-elasticsearch.

idarlington avatar idarlington commented on July 18, 2024

Thanks @ewencp

Yes, the logsize and lag are increasing and the offset isn't increasing. BTW I am using Confluent 3.1.2 with ES 2.0.0

I can't find any errors in the logs.These are mostly the contents:

[2017-07-10 18:09:53,927] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-07-10 18:10:53,927] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-07-10 18:11:53,926] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-07-10 18:12:53,927] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-07-10 18:13:53,927] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-07-10 18:14:53,927] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-07-10 18:15:53,928] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-07-10 18:16:53,926] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-07-10 18:17:53,927] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-07-10 18:18:53,928] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)

I would be updating the log.dir then, I do hope moving the contents of the current dir and updating its value in server.properties would suffice.

from kafka-connect-elasticsearch.

ewencp avatar ewencp commented on July 18, 2024

I'm also wondering if possibly you got into a bad state for the topic such that the consumer cannot make progress. If you increase the log level, you should see more messages indicating the sink task's progress. Increasing all the way to TRACE, if even only for WorkerSinkTask, would give log info about both the messages as they are consumed (including even logging the key and value) and the offsets that are actually being committed.

You might also just want to check that all the files we'd expect to be in the Kafka directory are there, i.e. that something didn't get deleted.

from kafka-connect-elasticsearch.

idarlington avatar idarlington commented on July 18, 2024

@ewencp

I have increased the log to TRACE, this is a snippet:

[2017-07-11 07:39:54,670] TRACE Executing batch 450 of 1 records (io.confluent.connect.elasticsearch.bulk.BulkProcessor:347)
[2017-07-11 07:39:54,671] DEBUG POST method created based on client request (io.searchbox.client.http.JestHttpClient:99)
[2017-07-11 07:39:54,671] DEBUG Request method=POST url=http://localhost:9200/_bulk (io.searchbox.client.http.JestHttpClient:84)
[2017-07-11 07:39:54,671] DEBUG CookieSpec selected: default (org.apache.http.client.protocol.RequestAddCookies:122)
[2017-07-11 07:39:54,672] DEBUG Auth cache not set in the context (org.apache.http.client.protocol.RequestAuthCache:76)
[2017-07-11 07:39:54,672] DEBUG Connection request: [route: {}->http://localhost:9200][total kept alive: 2; route allocated: 2 of 2; total allocated: 2 of 20] (org.apache.http.impl.conn.PoolingHttpClientConnectionManager:249)
[2017-07-11 07:39:54,672] DEBUG Connection leased: [id: 0][route: {}->http://localhost:9200][total kept alive: 1; route allocated: 2 of 2; total allocated: 2 of 20] (org.apache.http.impl.conn.PoolingHttpClientConnectionManager:282)
[2017-07-11 07:39:54,672] DEBUG http-outgoing-0: set socket timeout to 3000 (org.apache.http.impl.conn.DefaultManagedHttpClientConnection:90)
[2017-07-11 07:39:54,673] DEBUG Executing request POST /_bulk HTTP/1.1 (org.apache.http.impl.execchain.MainClientExec:255)
[2017-07-11 07:39:54,673] DEBUG Target auth state: UNCHALLENGED (org.apache.http.impl.execchain.MainClientExec:260)
[2017-07-11 07:39:54,673] DEBUG Proxy auth state: UNCHALLENGED (org.apache.http.impl.execchain.MainClientExec:266)
[2017-07-11 07:39:54,673] DEBUG http-outgoing-0 >> POST /_bulk HTTP/1.1 (org.apache.http.headers:135)
[2017-07-11 07:39:54,673] DEBUG http-outgoing-0 >> Content-Length: 2216 (org.apache.http.headers:138)
[2017-07-11 07:39:54,674] DEBUG http-outgoing-0 >> Content-Type: application/json; charset=UTF-8 (org.apache.http.headers:138)
[2017-07-11 07:39:54,674] DEBUG http-outgoing-0 >> Host: localhost:9200 (org.apache.http.headers:138)
[2017-07-11 07:39:54,674] DEBUG http-outgoing-0 >> Connection: Keep-Alive (org.apache.http.headers:138)
[2017-07-11 07:39:54,674] DEBUG http-outgoing-0 >> User-Agent: Apache-HttpClient/4.5.1 (Java/1.8.0_111) (org.apache.http.headers:138)
[2017-07-11 07:39:54,674] DEBUG http-outgoing-0 >> Accept-Encoding: gzip,deflate (org.apache.http.headers:138)
[2017-07-11 07:39:54,675] DEBUG http-outgoing-0 >> "POST /_bulk HTTP/1.1[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,675] DEBUG http-outgoing-0 >> "Content-Length: 2216[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,675] DEBUG http-outgoing-0 >> "Content-Type: application/json; charset=UTF-8[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,676] DEBUG http-outgoing-0 >> "Host: localhost:9200[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,676] DEBUG http-outgoing-0 >> "Connection: Keep-Alive[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,676] DEBUG http-outgoing-0 >> "User-Agent: Apache-HttpClient/4.5.1 (Java/1.8.0_111)[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,677] DEBUG http-outgoing-0 >> "Accept-Encoding: gzip,deflate[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,677] DEBUG http-outgoing-0 >> "[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,677] DEBUG http-outgoing-0 >> "{"index":{"_id":"gritServer+0+199211","_index":"grits","_type":"docs"}}[\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,678] DEBUG http-outgoing-0 >> "{"data":[{"costSinceLast":[{"sourceName":"Sunhive_4","sourceType":"grid","costSinceLast":3.03,"configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"}],"powerSinceLast":[{"sourceName":"Sunhive_4","sourceType":"grid","powerSinceLast":141371.1,"configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"},{"sourceName":"","sourceType":"","powerSinceLast":252830.4,"configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"}],"energyTodaySource":[{"sourceName":"Sunhive_4","sourceType":"grid","energyToday":1111338.2,"configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"},{"sourceName":"","sourceType":"","energyToday":2078056.5,"configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"}],"activeSource":[{"sourceName":"Sunhive_4","sourceType":"grid","configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"}],"timeTodaySource":[{"sourceName":"Sunhive_4","sourceType":"grid","timeToday":1.3630964,"configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"},{"sourceName":"","sourceType":"","timeToday":0.0,"configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"}],"current":[{"sourceName":"Sunhive_4","value":[227.49,215.29,189.65],"sourceType":"grid","configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"},{"sourceName":"","value":[227.49,227.49,227.49,227.49,227.49],"sourceType":"","configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"}],"voltage":[{"sourceName":"Sunhive_4","value":[233.744,232.89,231.452],"sourceType":"grid","configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"},{"sourceName":"","value":[233.744,233.744,233.744,233.744,233.744],"sourceType":"","configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"}],"powerfactor":[{"sourceName":"Sunhive_4","value":[0.984,0.981,0.977],"sourceType":"grid","configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"},{"sourceName":"","value":[0.984,0.984,0.984,0.984,0.984],"sourceType":"","configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"}],"time":"2017-07-11T08:39:46.893+0100","Type":"1","id":"70:B3:D5:43:09:E6","costTodaySource":[{"sourceName":"Sunhive_4","sourceType":"grid","costToday":27783.455,"configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"}]}],"master":{"id":"70:B3:D5:43:09:E6","time":"2017-07-11T08:39:46.893+0100","configuration_IDs":["3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"]}}[\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,685] DEBUG http-outgoing-0 << "HTTP/1.1 200 OK[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,685] DEBUG http-outgoing-0 << "Content-Type: application/json; charset=UTF-8[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,685] DEBUG http-outgoing-0 << "Content-Length: 181[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,686] DEBUG http-outgoing-0 << "[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,686] DEBUG http-outgoing-0 << "{"took":7,"errors":false,"items":[{"index":{"_index":"grits","_type":"docs","_id":"gritServer+0+199211","_version":7,"_shards":{"total":2,"successful":1,"failed":0},"status":200}}]}" (org.apache.http.wire:86)
[2017-07-11 07:39:54,686] DEBUG http-outgoing-0 << HTTP/1.1 200 OK (org.apache.http.headers:124)
[2017-07-11 07:39:54,686] DEBUG http-outgoing-0 << Content-Type: application/json; charset=UTF-8 (org.apache.http.headers:127)
[2017-07-11 07:39:54,687] DEBUG http-outgoing-0 << Content-Length: 181 (org.apache.http.headers:127)
[2017-07-11 07:39:54,687] DEBUG Connection can be kept alive indefinitely (org.apache.http.impl.execchain.MainClientExec:284)
[2017-07-11 07:39:54,687] DEBUG Connection [id: 0][route: {}->http://localhost:9200] can be kept alive indefinitely (org.apache.http.impl.conn.PoolingHttpClientConnectionManager:314)
[2017-07-11 07:39:54,688] DEBUG Connection released: [id: 0][route: {}->http://localhost:9200][total kept alive: 2; route allocated: 2 of 2; total allocated: 2 of 20] (org.apache.http.impl.conn.PoolingHttpClientConnectionManager:320)
[2017-07-11 07:39:54,688] DEBUG Bulk operation was successfull (io.searchbox.core.Bulk:143)

tree output :

├── bin
│   ├── camus-config
│   ├── camus-run
│   ├── confluent-rebalancer
│   ├── connect-distributed
│   ├── connect-standalone
│   ├── control-center-3_0_0-reset
│   ├── control-center-3_0_1-reset
│   ├── control-center-console-consumer
│   ├── control-center-reset
│   ├── control-center-run-class
│   ├── control-center-set-acls
│   ├── control-center-start
│   ├── kafka-acls
│   ├── kafka-avro-console-consumer
│   ├── kafka-avro-console-producer
│   ├── kafka-configs
│   ├── kafka-console-consumer
│   ├── kafka-console-producer
│   ├── kafka-consumer-groups
│   ├── kafka-consumer-offset-checker
│   ├── kafka-consumer-perf-test
│   ├── kafka-mirror-maker
│   ├── kafka-preferred-replica-election
│   ├── kafka-producer-perf-test
│   ├── kafka-reassign-partitions
│   ├── kafka-replay-log-producer
│   ├── kafka-replica-verification
│   ├── kafka-rest-run-class
│   ├── kafka-rest-start
│   ├── kafka-rest-stop
│   ├── kafka-rest-stop-service
│   ├── kafka-run-class
│   ├── kafka-server-start
│   ├── kafka-server-stop
│   ├── kafka-simple-consumer-shell
│   ├── kafka-streams-application-reset
│   ├── kafka-topics
│   ├── kafka-verifiable-consumer
│   ├── kafka-verifiable-producer
│   ├── schema-registry-run-class
│   ├── schema-registry-start
│   ├── schema-registry-stop
│   ├── schema-registry-stop-service
│   ├── support-metrics-bundle
│   ├── windows
│   ├── zookeeper-security-migration
│   ├── zookeeper-server-start
│   ├── zookeeper-server-stop
│   └── zookeeper-shell
├── etc
│   ├── camus
│   ├── confluent-common
│   ├── confluent-control-center
│   ├── confluent-rebalancer
│   ├── kafka
│   ├── kafka-connect-elasticsearch
│   ├── kafka-connect-hdfs
│   ├── kafka-connect-jdbc
│   ├── kafka-connect-replicator
│   ├── kafka-rest
│   ├── rest-utils
│   └── schema-registry

etc/kafka

.
├── connect-console-sink.properties
├── connect-console-source.properties
├── connect-distributed.properties
├── connect-file-sink.properties
├── connect-file-source.properties
├── connect-log4j.properties
├── connect-standalone.properties
├── consumer.properties
├── log4j.properties
├── producer.properties
├── server.properties
├── tools-log4j.properties
└── zookeeper.properties

from kafka-connect-elasticsearch.

ewencp avatar ewencp commented on July 18, 2024

@idarlington Ok, so it looks like it's definitely making requests. We might want a larger snippet though because the TRACE and DEBUG output is so verbose -- most of that is from the underlying library making the request, but doesn't include much output from Connect itself. We'd probably want more lines that include io.confluent or org.apache.kafka logs.

from kafka-connect-elasticsearch.

idarlington avatar idarlington commented on July 18, 2024

Hi @ewencp, I apologize for the late reply.

You can find a larger log output here

Thanks.

from kafka-connect-elasticsearch.

ewencp avatar ewencp commented on July 18, 2024

@idarlington I'm not seeing any duplicates in that log? In fact it looks like requests are working as expected. I've grepped out a bit to help show what the connector is doing:

 $ grep _id log | grep '>>' | cut -f 6 -d ' '
"{"index":{"_id":"gritServer+0+199576","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199577","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199578","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199579","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199580","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199581","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199582","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199583","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199584","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199585","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199586","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199587","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199588","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199589","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199590","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199591","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199592","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199593","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199594","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199595","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199596","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199597","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199598","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199599","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199600","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199601","_index":"grits","_type":"docs"}}[\n]"
"POST
"{"index":{"_id":"gritServer+0+199603","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199604","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199605","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199606","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199607","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199608","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199609","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199610","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199611","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199612","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199613","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199614","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199615","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199616","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199617","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199618","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199619","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199620","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199621","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199622","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199623","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199624","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199625","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199626","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199627","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199628","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199629","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199630","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199631","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199632","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199633","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199634","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199635","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199636","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199637","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199638","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199639","_index":"grits","_type":"docs"}}[\n]"

(It's missing one and has POST in there because the log is a bit mangled.) So for gritServer-0, we're seeing what we expect -- each message processed once, and in order.

Looking at the offset commits:

[2017-07-11 07:43:40,673] TRACE Flushing data to Elasticsearch with the following offsets: {dailyData-0=OffsetAndMetadata{offset=0, metadata=''}, gritServer-0=OffsetAndMetadata{offset=199607, metadata=''}} (io.confluent.connect.elasticsearch.ElasticsearchSinkT metadata=''}}  (org.apache.kafka.clients.consumer.KafkaConsumer:1160)
[2017-07-11 07:43:40,675] TRACE Sending offset-commit request with {dailyData-0=OffsetAndMetadata{offset=0, metadata=''}, gritServer-0=OffsetAndMetadata{offset=199607, metadata=''}} to coordinator 45.76.39.129:9092 (id: 2147483647 rack: null) for group connect-elasticsearch-sink (org.aconnect-elasticsearch-sink committed offset 199607 for partition gritServer-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:640)
...snip...
[2017-07-11 07:43:58,194] TRACE Flushing data to Elasticsearch with the following offsets: {dailyData-0=OffsetAndMetadata{offset=0, metadata=''}, gritServer-0=OffsetAndMetadata{offset=199640, metadata=''}} (io.confluent.connect.SinkTask:130)
[2017-07-11 07:43:58,194] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)

You can see that the offset being committed for gritServer-0 is definitely increasing. What's notable is that dailyData-0 is not increasing and none of the IDs reported above have that in their name, which indicates no messages are being seen for it. Is there actually any data flowing into that topic partition? The log clearly shows that fetch requests are being sent:

[2017-07-11 07:43:29,127] TRACE Added fetch request for partition dailyData-0 at offset 0 (org.apache.kafka.clients.consumer.internals.Fetcher:639)

but it seems they are not receiving any data in response.

It looks like things are running fine, there just isn't data in one of the topic partitions.

from kafka-connect-elasticsearch.

panda87 avatar panda87 commented on July 18, 2024

@ewencp I want to add another input to this thread
Using our predefined _id actually degradate the index time and index performance
This is also one of the Elastic recommendation to use their generated id's instead of ours so each bulk will not have to check whether the _id already exists.

I can understand the design but for some cases like basic logs, we don't need the validation of the _id and as a result the degradation but just able to index with less performance costs

from kafka-connect-elasticsearch.

synhershko avatar synhershko commented on July 18, 2024

I don't think there's a way to just use Elasticsearch IDs currently, though I think that's the behavior you would see if your data doesn't have keys.

The way to do that is send null as the document ID in the indexing / bulk request, as opposed to forcing it to be a string like you do now.

In the logging use case it is usually okay to have some messages added twice (or even completely dropped), so IMO there should be an opt-in configuration which allows you to rely on ES IDs. This will result in much faster inserts to ES.

from kafka-connect-elasticsearch.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.