Comments (9)
Thanks for testing this. This issue is already fixed in 1.0.8. Did you see same issue in 1.0.8 as well ?
from kafka-spark-consumer.
Hi, I'm a coworker with @ChorPangChan.
This issue is already fixed in 1.0.8. Did you see same issue in 1.0.8 as well ?
No, this is not fixed. We tested in 1.0.8 and found the issue still exists. Here are the logs:
A succeeded pattern:
17/01/20 14:18:34 DEBUG kafka.PartitionManager: Total 1 messages from Kafka: cdh005.testdev.local : 0 there in internal buffers
17/01/20 14:18:34 DEBUG kafka.PartitionManager: Store for topic topic_tid_9999_dev for partition 0 is 76
17/01/20 14:18:34 DEBUG kafka.PartitionManager: LastComitted Offset 75
17/01/20 14:18:34 DEBUG kafka.PartitionManager: New Emitted Offset 77
17/01/20 14:18:34 DEBUG kafka.PartitionManager: Enqueued Offset 76
17/01/20 14:18:34 DEBUG kafka.PartitionManager: Committing offset for Partition{host=cdh005.testdev.local:9092, partition=0}
17/01/20 14:18:34 DEBUG kafka.PartitionManager: Wrote committed offset to ZK: 77
17/01/20 14:18:34 DEBUG kafka.PartitionManager: Committed offset 77 for Partition{host=cdh005.testdev.local:9092, partition=0} for consumer: test_test_kpi_reg_user
... followed by a failed pattern:
17/01/20 14:19:21 DEBUG kafka.PartitionManager: Total 1 messages from Kafka: cdh005.testdev.local : 0 there in internal buffers
17/01/20 14:19:21 DEBUG kafka.PartitionManager: Store for topic topic_tid_9999_dev for partition 0 is 77
17/01/20 14:19:21 DEBUG kafka.PartitionManager: LastComitted Offset 77
17/01/20 14:19:21 DEBUG kafka.PartitionManager: New Emitted Offset 78
17/01/20 14:19:21 DEBUG kafka.PartitionManager: Enqueued Offset 77
17/01/20 14:19:21 DEBUG kafka.PartitionManager: Last Enqueued offset 77 not incremented since previous Comitted Offset 77 for partition Partition{host=cdh005.testdev.local:9092, partition=0} for Consumer test_test_kpi_reg_user.
17/01/20 14:19:21 DEBUG kafka.PartitionManager: LastComitted Offset 77
17/01/20 14:19:21 DEBUG kafka.PartitionManager: New Emitted Offset 78
17/01/20 14:19:21 DEBUG kafka.PartitionManager: Enqueued Offset 77
17/01/20 14:19:21 DEBUG kafka.PartitionManager: Last Enqueued offset 77 not incremented since previous Comitted Offset 77 for partition Partition{host=cdh005.testdev.local:9092, partition=0} for Consumer test_test_kpi_reg_user.
17/01/20 14:19:21 DEBUG kafka.PartitionManager: LastComitted Offset 77
17/01/20 14:19:21 DEBUG kafka.PartitionManager: New Emitted Offset 78
17/01/20 14:19:21 DEBUG kafka.PartitionManager: Enqueued Offset 77
17/01/20 14:19:21 DEBUG kafka.PartitionManager: Last Enqueued offset 77 not incremented since previous Comitted Offset 77 for partition Partition{host=cdh005.testdev.local:9092, partition=0} for Consumer test_test_kpi_reg_user.
...
# Infinity loop
The result is even worse than 1.0.6. Committing to zookeeper kept failing, and fell into an infinity loop.
from kafka-spark-consumer.
@dibbhatt
It seems that you want to put _emittedToOffset
into zkp:
https://github.com/dibbhatt/kafka-spark-consumer/blob/master/src/main/java/consumer/kafka/PartitionManager.java#L290
If this succeeded, _lastEnquedOffset
will equal to _lastComittedOffset
in next loop, so the condition in here will be false:
https://github.com/dibbhatt/kafka-spark-consumer/blob/master/src/main/java/consumer/kafka/PartitionManager.java#L286
... and the next offset will never be pushed into zkp.
from kafka-spark-consumer.
What if I do
if (_lastEnquedOffset >= _lastComittedOffset) {
...
}
that should work right. Can you please raise a PR and I will merge it .
Dib
from kafka-spark-consumer.
It works well for a running program which keeps consuming messages from kafka. But when the program starts/restarts, there will be a new infinity loop which keeps committing same offset to zkp:
17/01/20 17:21:18 DEBUG kafka.PartitionManager: Read partition information from : /consumer-path/test_test_kpi_reg_user/topic_tid_9999_dev/partition_0 --> {"partition":0,"offset":85,"topic":"topic_tid_9999_dev","broker":{"port":9092,"host":"cdh005.testdev.local"},"consumer":{"id":"test_test_kpi_reg_user"}}
17/01/20 17:21:18 DEBUG kafka.PartitionManager: Read last commit offset from zookeeper:85 ; old topology_id:test_test_kpi_reg_user - new consumer_id: test_test_kpi_reg_user
17/01/20 17:21:18 DEBUG kafka.PartitionManager: Starting Consumer cdh005.testdev.local :0 from offset 85
17/01/20 17:21:18 DEBUG kafka.PartitionManager: LastComitted Offset 85
17/01/20 17:21:18 DEBUG kafka.PartitionManager: New Emitted Offset 85
17/01/20 17:21:18 DEBUG kafka.PartitionManager: Enqueued Offset 85
17/01/20 17:21:18 DEBUG kafka.PartitionManager: Committing offset for Partition{host=cdh005.testdev.local:9092, partition=0}
17/01/20 17:21:18 DEBUG kafka.PartitionManager: Wrote committed offset to ZK: 85
17/01/20 17:21:18 DEBUG kafka.PartitionManager: Committed offset 85 for Partition{host=cdh005.testdev.local:9092, partition=0} for consumer: test_test_kpi_reg_user
17/01/20 17:21:18 DEBUG kafka.PartitionManager: LastComitted Offset 85
17/01/20 17:21:18 DEBUG kafka.PartitionManager: New Emitted Offset 85
17/01/20 17:21:18 DEBUG kafka.PartitionManager: Enqueued Offset 85
17/01/20 17:21:18 DEBUG kafka.PartitionManager: Committing offset for Partition{host=cdh005.testdev.local:9092, partition=0}
17/01/20 17:21:18 DEBUG kafka.PartitionManager: Wrote committed offset to ZK: 85
17/01/20 17:21:18 DEBUG kafka.PartitionManager: Committed offset 85 for Partition{host=cdh005.testdev.local:9092, partition=0} for consumer: test_test_kpi_reg_user
...
# Infinity loop
from kafka-spark-consumer.
The infinity loops caused by this line:
https://github.com/dibbhatt/kafka-spark-consumer/blob/master/src/main/java/consumer/kafka/PartitionManager.java#L211
commit()
should be executed only in the case of buffer is not empty. That is, it should be inside of the if
block at L207.
from kafka-spark-consumer.
Actually, we can simply fix this issue by changing from >
to >=
. But in fact, the code saves the _emittedToOffset
instead of the last-processed-offset - which is mentioned in README.md
- into zookeeper. I think you should unify it.
from kafka-spark-consumer.
OK sure. If you can make the PR with fix and ReadMe that will be great. . Or I will take a look at it over the weekend to fix.
from kafka-spark-consumer.
OK. I'll make a PR next Monday.
from kafka-spark-consumer.
Related Issues (20)
- Why appear this exception information? HOT 8
- Not working with Spark 2.2.0 HOT 11
- How to use in kerberized context ? HOT 3
- AbstractMethodError with Spark 1.6.0 and Kafka 0.10.2 HOT 9
- Exception: Could not compute split, block not found HOT 6
- Hello, Compilation failed after changing Kafka version 0.10.0.0 HOT 5
- Kafka Headers Support HOT 7
- After long time running, the processing time of "ProcessedOffsetManager.persists(partitonOffset_stream, props)" incresing. HOT 13
- How to recover the failed receiver on a partition which has exception of " Offsets out of range with no configured reset policy for partitions:" HOT 18
- May I have a Scala sample of messageHandler to filter out some playload which includes some strings? HOT 20
- The Spark Streaming can not read kafka message HOT 8
- It works well in local model,but when I submit it in cluster model,the fixed rate is too small HOT 5
- Does this support spark structured streaming HOT 1
- java.lang.NoClassDefFoundError: kafka/api/OffsetRequest HOT 22
- Offset is still updated when exception occurs during processing HOT 28
- Manipulation of offsetRanges in each batch
- example doesn't build HOT 1
- Trying to fetch Multi topic In Local , But It is showing warning like this HOT 2
- Can a higher version of kafka be supported HOT 2
- Can a higher version of Spark be supported? Spark 3.2.0 for example. HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from kafka-spark-consumer.