Comments (12)
@idarlington topic+partition+offset
should be unique within a Kafka cluster -- that is how you uniquely identify a topic which is why the connector uses that. You could have conflicting IDs if you're pulling data from two Kafka clusters where the same topic name is used. If this is the cause of your data loss, you could add a RegexpRouter
transformation to your connector to ensure the topic name is prepended with a cluster name as well so you'll have globally unique topic names.
Currently the ID is controlled by the key.ignore
setting. If it is true
then you get the topic+partition+offset
format. If it is false
, the ID will be the Kafka key, which allows you to update documents in Elasticsearch. I don't think there's a way to just use Elasticsearch IDs currently, though I think that's the behavior you would see if your data doesn't have keys.
from kafka-connect-elasticsearch.
See the note I added on issue #139 for a very minor code change I made to support allowing null document keys and thus auto-generating the ids in Elasticsearch.
#139 (comment)
I've been running this for several months now at an average rate of 2.5 billion messages per day with no issues.
from kafka-connect-elasticsearch.
Thanks @ewencp.
I am currently operating a single cluster. I noticed that the id is now topic+partition+logsize
.
From the offsetchecker it seems the offset is not updating.
Group Topic Pid Offset logSize Lag Owner
my_group gritServer 0 81848 142559 60711 none
Also, if the server is restarted does the offset value change. I currently have my log.dir
in tmp
Finally can you point me to examples of RegexpRouter
transformation. Thank you.
from kafka-connect-elasticsearch.
Interesting that the offset does not seem to be correct. I'm looking at the code where that ID is generated and it's definitely using the Kafka offset for the last part. That wouldn't rule out an issue in the framework, but I don't think anything has changed there. Is the logSize
and Lag
from that command increasing? If the offset isn't, that would imply that offsets are not being successfully committed. Could you check your Connect logs to see if there are any errors or messages about offset commit failing?
I think it's probably not your problem since you seem to have tracked the issue down to the offset, but although the docs don't have an example of the regex router, I can give a quick example here:
transforms=Rename
transforms.Rename.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.Rename.regex=(.*)
transforms.Rename.replacement=prefix-$1
That will prefix the topic names with a constant prefix before the connector processes them.
Also, note that log.dir
in tmp
is a bad idea. It works fine when you're just developing/testing locally and don't care about the data, but depending on how your /tmp
is managed, files could seemingly randomly be deleted and data would disappear.
from kafka-connect-elasticsearch.
Thanks @ewencp
Yes, the logsize
and lag
are increasing and the offset
isn't increasing. BTW I am using Confluent 3.1.2
with ES 2.0.0
I can't find any errors in the logs.These are mostly the contents:
[2017-07-10 18:09:53,927] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-07-10 18:10:53,927] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-07-10 18:11:53,926] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-07-10 18:12:53,927] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-07-10 18:13:53,927] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-07-10 18:14:53,927] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-07-10 18:15:53,928] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-07-10 18:16:53,926] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-07-10 18:17:53,927] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-07-10 18:18:53,928] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
I would be updating the log.dir
then, I do hope moving the contents of the current dir and updating its value in server.properties
would suffice.
from kafka-connect-elasticsearch.
I'm also wondering if possibly you got into a bad state for the topic such that the consumer cannot make progress. If you increase the log level, you should see more messages indicating the sink task's progress. Increasing all the way to TRACE, if even only for WorkerSinkTask, would give log info about both the messages as they are consumed (including even logging the key and value) and the offsets that are actually being committed.
You might also just want to check that all the files we'd expect to be in the Kafka directory are there, i.e. that something didn't get deleted.
from kafka-connect-elasticsearch.
I have increased the log to TRACE, this is a snippet:
[2017-07-11 07:39:54,670] TRACE Executing batch 450 of 1 records (io.confluent.connect.elasticsearch.bulk.BulkProcessor:347)
[2017-07-11 07:39:54,671] DEBUG POST method created based on client request (io.searchbox.client.http.JestHttpClient:99)
[2017-07-11 07:39:54,671] DEBUG Request method=POST url=http://localhost:9200/_bulk (io.searchbox.client.http.JestHttpClient:84)
[2017-07-11 07:39:54,671] DEBUG CookieSpec selected: default (org.apache.http.client.protocol.RequestAddCookies:122)
[2017-07-11 07:39:54,672] DEBUG Auth cache not set in the context (org.apache.http.client.protocol.RequestAuthCache:76)
[2017-07-11 07:39:54,672] DEBUG Connection request: [route: {}->http://localhost:9200][total kept alive: 2; route allocated: 2 of 2; total allocated: 2 of 20] (org.apache.http.impl.conn.PoolingHttpClientConnectionManager:249)
[2017-07-11 07:39:54,672] DEBUG Connection leased: [id: 0][route: {}->http://localhost:9200][total kept alive: 1; route allocated: 2 of 2; total allocated: 2 of 20] (org.apache.http.impl.conn.PoolingHttpClientConnectionManager:282)
[2017-07-11 07:39:54,672] DEBUG http-outgoing-0: set socket timeout to 3000 (org.apache.http.impl.conn.DefaultManagedHttpClientConnection:90)
[2017-07-11 07:39:54,673] DEBUG Executing request POST /_bulk HTTP/1.1 (org.apache.http.impl.execchain.MainClientExec:255)
[2017-07-11 07:39:54,673] DEBUG Target auth state: UNCHALLENGED (org.apache.http.impl.execchain.MainClientExec:260)
[2017-07-11 07:39:54,673] DEBUG Proxy auth state: UNCHALLENGED (org.apache.http.impl.execchain.MainClientExec:266)
[2017-07-11 07:39:54,673] DEBUG http-outgoing-0 >> POST /_bulk HTTP/1.1 (org.apache.http.headers:135)
[2017-07-11 07:39:54,673] DEBUG http-outgoing-0 >> Content-Length: 2216 (org.apache.http.headers:138)
[2017-07-11 07:39:54,674] DEBUG http-outgoing-0 >> Content-Type: application/json; charset=UTF-8 (org.apache.http.headers:138)
[2017-07-11 07:39:54,674] DEBUG http-outgoing-0 >> Host: localhost:9200 (org.apache.http.headers:138)
[2017-07-11 07:39:54,674] DEBUG http-outgoing-0 >> Connection: Keep-Alive (org.apache.http.headers:138)
[2017-07-11 07:39:54,674] DEBUG http-outgoing-0 >> User-Agent: Apache-HttpClient/4.5.1 (Java/1.8.0_111) (org.apache.http.headers:138)
[2017-07-11 07:39:54,674] DEBUG http-outgoing-0 >> Accept-Encoding: gzip,deflate (org.apache.http.headers:138)
[2017-07-11 07:39:54,675] DEBUG http-outgoing-0 >> "POST /_bulk HTTP/1.1[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,675] DEBUG http-outgoing-0 >> "Content-Length: 2216[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,675] DEBUG http-outgoing-0 >> "Content-Type: application/json; charset=UTF-8[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,676] DEBUG http-outgoing-0 >> "Host: localhost:9200[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,676] DEBUG http-outgoing-0 >> "Connection: Keep-Alive[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,676] DEBUG http-outgoing-0 >> "User-Agent: Apache-HttpClient/4.5.1 (Java/1.8.0_111)[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,677] DEBUG http-outgoing-0 >> "Accept-Encoding: gzip,deflate[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,677] DEBUG http-outgoing-0 >> "[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,677] DEBUG http-outgoing-0 >> "{"index":{"_id":"gritServer+0+199211","_index":"grits","_type":"docs"}}[\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,678] DEBUG http-outgoing-0 >> "{"data":[{"costSinceLast":[{"sourceName":"Sunhive_4","sourceType":"grid","costSinceLast":3.03,"configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"}],"powerSinceLast":[{"sourceName":"Sunhive_4","sourceType":"grid","powerSinceLast":141371.1,"configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"},{"sourceName":"","sourceType":"","powerSinceLast":252830.4,"configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"}],"energyTodaySource":[{"sourceName":"Sunhive_4","sourceType":"grid","energyToday":1111338.2,"configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"},{"sourceName":"","sourceType":"","energyToday":2078056.5,"configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"}],"activeSource":[{"sourceName":"Sunhive_4","sourceType":"grid","configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"}],"timeTodaySource":[{"sourceName":"Sunhive_4","sourceType":"grid","timeToday":1.3630964,"configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"},{"sourceName":"","sourceType":"","timeToday":0.0,"configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"}],"current":[{"sourceName":"Sunhive_4","value":[227.49,215.29,189.65],"sourceType":"grid","configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"},{"sourceName":"","value":[227.49,227.49,227.49,227.49,227.49],"sourceType":"","configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"}],"voltage":[{"sourceName":"Sunhive_4","value":[233.744,232.89,231.452],"sourceType":"grid","configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"},{"sourceName":"","value":[233.744,233.744,233.744,233.744,233.744],"sourceType":"","configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"}],"powerfactor":[{"sourceName":"Sunhive_4","value":[0.984,0.981,0.977],"sourceType":"grid","configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"},{"sourceName":"","value":[0.984,0.984,0.984,0.984,0.984],"sourceType":"","configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"}],"time":"2017-07-11T08:39:46.893+0100","Type":"1","id":"70:B3:D5:43:09:E6","costTodaySource":[{"sourceName":"Sunhive_4","sourceType":"grid","costToday":27783.455,"configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"}]}],"master":{"id":"70:B3:D5:43:09:E6","time":"2017-07-11T08:39:46.893+0100","configuration_IDs":["3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"]}}[\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,685] DEBUG http-outgoing-0 << "HTTP/1.1 200 OK[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,685] DEBUG http-outgoing-0 << "Content-Type: application/json; charset=UTF-8[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,685] DEBUG http-outgoing-0 << "Content-Length: 181[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,686] DEBUG http-outgoing-0 << "[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,686] DEBUG http-outgoing-0 << "{"took":7,"errors":false,"items":[{"index":{"_index":"grits","_type":"docs","_id":"gritServer+0+199211","_version":7,"_shards":{"total":2,"successful":1,"failed":0},"status":200}}]}" (org.apache.http.wire:86)
[2017-07-11 07:39:54,686] DEBUG http-outgoing-0 << HTTP/1.1 200 OK (org.apache.http.headers:124)
[2017-07-11 07:39:54,686] DEBUG http-outgoing-0 << Content-Type: application/json; charset=UTF-8 (org.apache.http.headers:127)
[2017-07-11 07:39:54,687] DEBUG http-outgoing-0 << Content-Length: 181 (org.apache.http.headers:127)
[2017-07-11 07:39:54,687] DEBUG Connection can be kept alive indefinitely (org.apache.http.impl.execchain.MainClientExec:284)
[2017-07-11 07:39:54,687] DEBUG Connection [id: 0][route: {}->http://localhost:9200] can be kept alive indefinitely (org.apache.http.impl.conn.PoolingHttpClientConnectionManager:314)
[2017-07-11 07:39:54,688] DEBUG Connection released: [id: 0][route: {}->http://localhost:9200][total kept alive: 2; route allocated: 2 of 2; total allocated: 2 of 20] (org.apache.http.impl.conn.PoolingHttpClientConnectionManager:320)
[2017-07-11 07:39:54,688] DEBUG Bulk operation was successfull (io.searchbox.core.Bulk:143)
tree output :
├── bin
│ ├── camus-config
│ ├── camus-run
│ ├── confluent-rebalancer
│ ├── connect-distributed
│ ├── connect-standalone
│ ├── control-center-3_0_0-reset
│ ├── control-center-3_0_1-reset
│ ├── control-center-console-consumer
│ ├── control-center-reset
│ ├── control-center-run-class
│ ├── control-center-set-acls
│ ├── control-center-start
│ ├── kafka-acls
│ ├── kafka-avro-console-consumer
│ ├── kafka-avro-console-producer
│ ├── kafka-configs
│ ├── kafka-console-consumer
│ ├── kafka-console-producer
│ ├── kafka-consumer-groups
│ ├── kafka-consumer-offset-checker
│ ├── kafka-consumer-perf-test
│ ├── kafka-mirror-maker
│ ├── kafka-preferred-replica-election
│ ├── kafka-producer-perf-test
│ ├── kafka-reassign-partitions
│ ├── kafka-replay-log-producer
│ ├── kafka-replica-verification
│ ├── kafka-rest-run-class
│ ├── kafka-rest-start
│ ├── kafka-rest-stop
│ ├── kafka-rest-stop-service
│ ├── kafka-run-class
│ ├── kafka-server-start
│ ├── kafka-server-stop
│ ├── kafka-simple-consumer-shell
│ ├── kafka-streams-application-reset
│ ├── kafka-topics
│ ├── kafka-verifiable-consumer
│ ├── kafka-verifiable-producer
│ ├── schema-registry-run-class
│ ├── schema-registry-start
│ ├── schema-registry-stop
│ ├── schema-registry-stop-service
│ ├── support-metrics-bundle
│ ├── windows
│ ├── zookeeper-security-migration
│ ├── zookeeper-server-start
│ ├── zookeeper-server-stop
│ └── zookeeper-shell
├── etc
│ ├── camus
│ ├── confluent-common
│ ├── confluent-control-center
│ ├── confluent-rebalancer
│ ├── kafka
│ ├── kafka-connect-elasticsearch
│ ├── kafka-connect-hdfs
│ ├── kafka-connect-jdbc
│ ├── kafka-connect-replicator
│ ├── kafka-rest
│ ├── rest-utils
│ └── schema-registry
etc/kafka
.
├── connect-console-sink.properties
├── connect-console-source.properties
├── connect-distributed.properties
├── connect-file-sink.properties
├── connect-file-source.properties
├── connect-log4j.properties
├── connect-standalone.properties
├── consumer.properties
├── log4j.properties
├── producer.properties
├── server.properties
├── tools-log4j.properties
└── zookeeper.properties
from kafka-connect-elasticsearch.
@idarlington Ok, so it looks like it's definitely making requests. We might want a larger snippet though because the TRACE and DEBUG output is so verbose -- most of that is from the underlying library making the request, but doesn't include much output from Connect itself. We'd probably want more lines that include io.confluent
or org.apache.kafka
logs.
from kafka-connect-elasticsearch.
Hi @ewencp, I apologize for the late reply.
You can find a larger log output here
Thanks.
from kafka-connect-elasticsearch.
@idarlington I'm not seeing any duplicates in that log? In fact it looks like requests are working as expected. I've grepped out a bit to help show what the connector is doing:
$ grep _id log | grep '>>' | cut -f 6 -d ' '
"{"index":{"_id":"gritServer+0+199576","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199577","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199578","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199579","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199580","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199581","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199582","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199583","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199584","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199585","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199586","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199587","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199588","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199589","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199590","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199591","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199592","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199593","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199594","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199595","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199596","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199597","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199598","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199599","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199600","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199601","_index":"grits","_type":"docs"}}[\n]"
"POST
"{"index":{"_id":"gritServer+0+199603","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199604","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199605","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199606","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199607","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199608","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199609","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199610","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199611","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199612","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199613","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199614","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199615","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199616","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199617","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199618","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199619","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199620","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199621","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199622","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199623","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199624","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199625","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199626","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199627","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199628","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199629","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199630","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199631","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199632","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199633","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199634","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199635","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199636","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199637","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199638","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199639","_index":"grits","_type":"docs"}}[\n]"
(It's missing one and has POST in there because the log is a bit mangled.) So for gritServer-0, we're seeing what we expect -- each message processed once, and in order.
Looking at the offset commits:
[2017-07-11 07:43:40,673] TRACE Flushing data to Elasticsearch with the following offsets: {dailyData-0=OffsetAndMetadata{offset=0, metadata=''}, gritServer-0=OffsetAndMetadata{offset=199607, metadata=''}} (io.confluent.connect.elasticsearch.ElasticsearchSinkT metadata=''}} (org.apache.kafka.clients.consumer.KafkaConsumer:1160)
[2017-07-11 07:43:40,675] TRACE Sending offset-commit request with {dailyData-0=OffsetAndMetadata{offset=0, metadata=''}, gritServer-0=OffsetAndMetadata{offset=199607, metadata=''}} to coordinator 45.76.39.129:9092 (id: 2147483647 rack: null) for group connect-elasticsearch-sink (org.aconnect-elasticsearch-sink committed offset 199607 for partition gritServer-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:640)
...snip...
[2017-07-11 07:43:58,194] TRACE Flushing data to Elasticsearch with the following offsets: {dailyData-0=OffsetAndMetadata{offset=0, metadata=''}, gritServer-0=OffsetAndMetadata{offset=199640, metadata=''}} (io.confluent.connect.SinkTask:130)
[2017-07-11 07:43:58,194] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
You can see that the offset being committed for gritServer-0 is definitely increasing. What's notable is that dailyData-0 is not increasing and none of the IDs reported above have that in their name, which indicates no messages are being seen for it. Is there actually any data flowing into that topic partition? The log clearly shows that fetch requests are being sent:
[2017-07-11 07:43:29,127] TRACE Added fetch request for partition dailyData-0 at offset 0 (org.apache.kafka.clients.consumer.internals.Fetcher:639)
but it seems they are not receiving any data in response.
It looks like things are running fine, there just isn't data in one of the topic partitions.
from kafka-connect-elasticsearch.
@ewencp I want to add another input to this thread
Using our predefined _id actually degradate the index time and index performance
This is also one of the Elastic recommendation to use their generated id's instead of ours so each bulk will not have to check whether the _id already exists.
I can understand the design but for some cases like basic logs, we don't need the validation of the _id and as a result the degradation but just able to index with less performance costs
from kafka-connect-elasticsearch.
I don't think there's a way to just use Elasticsearch IDs currently, though I think that's the behavior you would see if your data doesn't have keys.
The way to do that is send null
as the document ID in the indexing / bulk request, as opposed to forcing it to be a string like you do now.
In the logging use case it is usually okay to have some messages added twice (or even completely dropped), so IMO there should be an opt-in configuration which allows you to rely on ES IDs. This will result in much faster inserts to ES.
from kafka-connect-elasticsearch.
Related Issues (20)
- How to Convert JSON String field to ES Object?
- Capture Kafka key without using it as ID HOT 2
- Suggestion for INSERT operation "Ignoring EXTERNAL version conflict for operation INDEX on document"
- Used Elastic Java REST client is deprecated in 7.15.0 HOT 1
- Error with `"behavior.on.null.values": "delete"`
- Consumer paused indefinitely when using `AsyncOffsetTracker` with lot of null values
- Cannot use data stream with time_series mode HOT 2
- Error: Cannot infer mapping without schema HOT 1
- Connector fails with payloads >20 MB HOT 1
- Can't create a connector even if its loaded in Strimzi
- Support requests per second configuration options
- Log when there are too many requests errors
- [BUG] `TOO_MANY_REQUESTS` error craches the tasks with a unrecoverable exceptions without retries
- Ignore 'document_parsing_exception' HOT 1
- Inconsistent Logging for Tombstone Messages in Elastic Sink Connector
- abnormal data loss question
- Data Stream naming is far too restrictive HOT 1
- Creating index based on Timestamp doesn't work
- Limit retry backoff (and unlimited retries) HOT 4
- add support for index templates other than logs and metrics as types when using data streams
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from kafka-connect-elasticsearch.