Comments (13)
You mean to say initially the offset persists is doing fast and after long time it started taking longer time ?
After how long you see this behavior ? If you restart is the ProcessedOffsetManager.persists still taking longer time ? This offset is writing to Zookeeper. So ideally it should not have issue.
Which version of this consumer you are using ?
from kafka-spark-consumer.
Yes, here is logs come from spark UI:
233 | Streaming job from [output operation 1, batch time 09:45:20] collect at ProcessedOffsetManager.java:79 | 2018/08/24 09:45:20 | 23 ms
659 | Streaming job from [output operation 1, batch time 10:04:10] collect at ProcessedOffsetManager.java:79 | 2018/08/24 10:04:10 | 48 ms | 2/2
The spark-streaming is created as following:
val ssc = new StreamingContext(sc, Seconds(10))
val numberOfReceivers = 12
val kafkaProperties: Map[String, String] =
Map("zookeeper.hosts" -> zkhosts,
"zookeeper.port" -> zkports,
"kafka.topic" -> topics,
"bootstrap.servers" -> brokers,
"group.id" -> groupId,
"security.protocol" -> "SASL_PLAINTEXT",
"kafka.consumer.id" -> "kafka-consumer"
)
val props = new java.util.Properties()
kafkaProperties foreach { case (key, value) => props.put(key, value) }
val stream = ReceiverLauncher.launch(ssc, props, numberOfReceivers, StorageLevel.MEMORY_ONLY)
If I restart the job, initially is fast and then becomes slow. I am using consumer-1.0.14, job script is 👍
nohup spark-submit --class com.haloutput.main.SparkConsumerKafka
--master yarn --deploy-mode cluster
--driver-cores 8 --driver-memory 16g --executor-memory 4g --executor-cores 6 --num-executors 12
--conf spark.yarn.maxAppAttempts=4
--conf spark.speculation=false
--conf spark.yarn.am.attemptFailuresValidityInterval=1h
--conf spark.streaming.kafka.maxRatePerPartition=12000
--conf spark.yarn.max.executor.failures=400
--conf spark.yarn.executor.failuresValidityInterval=1h
--conf spark.task.maxFailures=8
--conf spark.hadoop.fs.hdfs.impl.disable.cache=true
--conf "spark.streaming.kafka.consumer.cache.enabled=true"
--conf "spark.dynamicAllocation.enabled=false"
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas.conf"
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas.conf"
--files /etc/spark/conf/hive-site.xml,"./kafka_client_jaas.conf,./xxxx.keytab"
--jars spark-streaming-kafka-0-10_2.11-2.2.1.jar,log4j-api-2.7.jar,log4j-core-2.7.jar,metrics-core-2.2.0.jar,zkclient-0.9.jar,spark-avro_2.11
-3.2.0.jar,kafka-clients-0.11.0.0.jar,bijection-core_2.11-0.9.2.jar,bijection-avro_2.11-0.9.2.jar,kafka-spark-consumer-1.0.14.jar,kafka_2.11-0.11.0
.2.jar,json-simple-1.1.1.jar xxxx.jar ,xxx.xxx.xxx 2181 xxxx
xxx.xxx.xxx:6667 xxxx xxxxx xxxxxx xxxxx 2 4 0 >/dev/null&
Thanks,
from kafka-spark-consumer.
Can you switch to 1.0.13 version and let me know if you see same issue .
from kafka-spark-consumer.
I can not shift to 1.0.13 since the security connection issue.
Thanks
from kafka-spark-consumer.
ok,got it. Give me a day or two. I will try to reproduce this issue and will get back to you.
from kafka-spark-consumer.
Is it possible to share me the timing of each stages of ProcessedOffsetManager.persists. Want to see if the issue is with the writing offset to ZK or getting the Max offsets for each partition .
from kafka-spark-consumer.
Also can you try to set consumer.num_fetch_to_buffer = 5 and see if this improves your offset commit. This property will buffer 5 fetch and write larger block to Spark Block Manager. Hence you will see your RDD partition is less but larger.
from kafka-spark-consumer.
111 | Streaming job from [output operation 1, batch time 10:15:30] collect at ProcessedOffsetManager.java:79 | 2018/08/29 10:15:30 | 36 ms | 2/2 | 41/41 |
---|---|---|---|---|---|
110 | Streaming job from [output operation 0, batch time 10:15:30] collect at SparkConsumerKafka.scala:127 | 2018/08/29 10:15:30 | 19 ms | 1/1 | 40/40 |
109 | Streaming job from [output operation 1, batch time 10:15:20] collect at ProcessedOffsetManager.java:79 | 2018/08/29 10:15:20 | 0.4 s | 2/2 | 1242/1242 |
108 | Streaming job from [output operation 0, batch time 10:15:20] collect at SparkConsumerKafka.scala:127 | 2018/08/29 10:15:20 | 0.3 s | 1/1 | 1241/1241 |
107 | Streaming job from [output operation 1, batch time 10:15:10] collect at ProcessedOffsetManager.java:79 | 2018/08/29 10:15:10 | 28 ms | 2/2 | 17/17 |
106 | Streaming job from [output operation 0, batch time 10:15:10] collect at SparkConsumerKafka.scala:127 | 2018/08/29 10:15:10 | 17 ms | 1/1 | 16/16 |
105 | Streaming job from [output operation 1, batch time 10:15:00] collect at ProcessedOffsetManager.java:79 | 2018/08/29 10:15:00 | 0.3 s | 2/2 | 1267/1267 |
104 | Streaming job from [output operation 0, batch time 10:15:00] collect at SparkConsumerKafka.scala:127 | 2018/08/29 10:15:00 | 0.3 s | 1/1 | 1266/1266 |
103 | Streaming job from [output operation 1, batch time 10:14:50] collect at ProcessedOffsetManager.java:79 | 2018/08/29 10:14:50 | 25 ms | 2/2 | 22/22 |
102 | Streaming job from [output operation 0, batch time 10:14:50] collect at SparkConsumerKafka.scala:127 | 2018/08/29 10:14:50 | 18 ms | 1/1 | 21/21 |
101 | Streaming job from [output operation 1, batch time 10:14:40] collect at ProcessedOffsetManager.java:79 | 2018/08/29 10:14:40 | 0.4 s | 2/2 | 1260/1260 |
100 | Streaming job from [output operation 0, batch time 10:14:40] collect at SparkConsumerKafka.scala:127 | 2018/08/29 10:14:40 | 0.3 s | 1/1 | 1259/1259 |
99 | Streaming job from [output operation 1, batch time 10:14:30] collect at ProcessedOffsetManager.java:79 | 2018/08/29 10:14:30 | 24 ms | 2/2 | 14/14 |
98 | Streaming job from [output operation 0, batch time 10:14:30] collect at SparkConsumerKafka.scala:127 | 2018/08/29 10:14:30 | 13 ms | 1/1 | 13/13 |
97 | Streaming job from [output operation 1, batch time 10:14:20] collect at ProcessedOffsetManager.java:79 | 2018/08/29 10:14:20 | 0.3 s | 2/2 | 1269/1269 |
..........
After about 20 hours running:
..........
14787 | Streaming job from [output operation 1, batch time 09:21:20] collect at ProcessedOffsetManager.java:79 | 2018/08/30 09:21:20 | 0.3 s | 2/2 | 816/816 |
---|---|---|---|---|---|
14786 | Streaming job from [output operation 0, batch time 09:21:20] collect at SparkConsumerKafka.scala:127 | 2018/08/30 09:21:20 | 0.2 s | 1/1 | 815/815 |
14785 | Streaming job from [output operation 1, batch time 09:21:10] collect at ProcessedOffsetManager.java:79 | 2018/08/30 09:21:11 | 96 ms | 2/2 | 308/308 |
14784 | Streaming job from [output operation 0, batch time 09:21:10] collect at SparkConsumerKafka.scala:127 | 2018/08/30 09:21:10 | 0.2 s | 1/1 | 307/307 |
14783 | count at AggregationHandler.scala:148 | 2018/08/30 09:21:10 | 0.2 s | 3/3 (288 skipped) | 601/601 (589480 skipped) |
14782 | Streaming job from [output operation 1, batch time 09:21:00] collect at ProcessedOffsetManager.java:79 | 2018/08/30 09:21:06 | 0.3 s | 2/2 | 794/794 |
14781 | Streaming job from [output operation 0, batch time 09:21:00] collect at SparkConsumerKafka.scala:127 | 2018/08/30 09:21:06 | 0.3 s | 1/1 | 793/793 |
14780 | save at AggregationHandler.scala:147 | 2018/08/30 09:21:06 | 0.6 s | 2/2 (288 skipped) | 401/401 (589480 skipped) |
14779 | pivot at AggregationHandler.scala:134 | 2018/08/30 09:21:01 | 13 ms | 1/1 (290 skipped) | 3/3 (590080 skipped) |
14778 | Streaming job from [output operation 1, batch time 09:20:50] collect at ProcessedOffsetManager.java:79 | 2018/08/30 09:20:57 | 5 s | 2/2 | 300/300 |
14777 | pivot at AggregationHandler.scala:134 | 2018/08/30 09:20:57 | 78 ms | 2/2 (289 skipped) | 201/201 (589880 skipped) |
14776 | Streaming job from [output operation 0, batch time 09:20:50] collect at SparkConsumerKafka.scala:127 | 2018/08/30 09:20:52 | 4 s | 1/1 | 299/299 |
14775 | pivot at AggregationHandler.scala:134 | 2018/08/30 09:20:52 | 0.2 s | 2/2 (288 skipped) | 600/600 (589480 skipped) |
14774 | Streaming job from [output operation 1, batch time 09:20:40] collect at ProcessedOffsetManager.java:79 | 2018/08/30 09:20:47 | 5 s | 2/2 | 782/782 |
14773 | count at AggregationHandler.scala:127 | 2018/08/30 09:20:47 | 0.2 s | 3/3 (288 skipped) | 601/601 (589480 skipped) |
14772 | Streaming job from [output operation 0, batch time 09:20:40] collect at SparkConsumerKafka.scala:127 | 2018/08/30 09:20:43 | 5 s | 1/1 | 781/781 |
14771 | run at ThreadPoolExecutor.java:1142 | 2018/08/30 09:20:43 | 0.2 s | 2/2 (288 skipped) | 600/600 (589480 skipped) |
14770 | save at AggregationHandler.scala:126 | 2018/08/30 09:20:38 | 0.6 s | 2/2 (288 skipped) | 401/401 (589480 skipped) |
14769 | run at ThreadPoolExecutor.java:1142 | 2018/08/30 09:20:33 | 0.2 s | 2/2 (288 skipped) | 600/600 (589480 skipped) |
14768 | Streaming job from [output operation 1, batch time 09:20:30] collect at ProcessedOffsetManager.java:79 | 2018/08/30 09:20:32 | 2 s | 2/2 | 298/298 |
14767 | Streaming job from [output operation 0, batch time 09:20:30] collect at SparkConsumerKafka.scala:127 | 2018/08/30 09:20:32 | 79 ms | 1/1 | 297/297 |
14766 | pivot at AggregationHandler.scala:107 | 2018/08/30 09:20:32 | 18 ms | 1/1 (290 skipped) | 2/2 (590080 skipped) |
14765 | pivot at AggregationHandler.scala:107 | 2018/08/30 09:20:27 | 54 ms | 2/2 (289 skipped) | 201/201 (589880 skipped) |
14764 | pivot at AggregationHandler.scala:107 | 2018/08/30 09:20:23 | 0.2 s | 2/2 (288 skipped) | 600/600 (589480 skipped) |
14763 | Streaming job from [output operation 1, batch time 09:20:20] collect at ProcessedOffsetManager.java:79 | 2018/08/30 09:20:21 | 2 s | 2/2 | 789/789 |
14762 | pivot at AggregationHandler.scala:97 | 2018/08/30 09:20:21 | 20 ms | 1/1 (290 skipped) | 7/7 (590080 skipped) |
14761 | Streaming job from [output operation 0, batch time 09:20:20] collect at SparkConsumerKafka.scala:127 | 2018/08/30 09:20:20 | 1 s | 1/1 | 788/788 |
14760 | pivot at AggregationHandler.scala:97 | 2018/08/30 09:20:20 | 18 ms | 1/1 (290 skipped) | 4/4 (590080 skipped) |
14759 | pivot at AggregationHandler.scala:97 | 2018/08/30 09:20:15 | 68 ms | 2/2 (289 skipped) | 201/201 (589880 skipped) |
14758 | Streaming job from [output operation 1, batch time 09:20:10] collect at ProcessedOffsetManager.java:79 | 2018/08/30 09:20:11 | 4 s | 2/2 | 322/322 |
14757 | Streaming job from [output operation 0, batch time 09:20:10] collect at SparkConsumerKafka.scala:127 | 2018/08/30 09:20:10 | 1 s | 1/1 | 321/321 |
14756 | pivot at AggregationHandler.scala:97 | 2018/08/30 09:20:07 | 4 s | 7/7 (283 skipped) | 10840/10840 (579240 skipped) |
14755 | Streaming job from [output operation 1, batch time 09:20:00] collect at ProcessedOffsetManager.java:79 | 2018/08/30 09:20:00 | 0.2 s | 2/2 | 707/707 |
14754 | Streaming job from [output operation 0, batch time 09:20:00] collect at SparkConsumerKafka.scala:127 | 2018/08/30 09:20:00 | 0.2 s | 1/1 | 706/706 |
14753 | Streaming job from [output operation 1, batch time 09:19:50] collect at ProcessedOffsetManager.java:79 | 2018/08/30 09:19:50 | 0.1 s | 2/2 | 512/512 |
14752 | Streaming job from [output operation 0, batch time 09:19:50] collect at SparkConsumerKafka.scala:127 | 2018/08/30 09:19:50 | 0.1 s | 1/1 | 511/511 |
14751 | Streaming job from [output operation 1, batch time 09:19:40] collect at ProcessedOffsetManager.java:79 | 2018/08/30 09:19:40 | 0.2 s | 2/2 | 588/588 |
14750 | Streaming job from [output operation 0, batch time 09:19:40] collect at SparkConsumerKafka.scala:127 | 2018/08/30 09:19:40 | 0.1 s | 1/1 | 587/587 |
14749 | Streaming job from [output operation 1, batch time 09:19:30] collect at ProcessedOffsetManager.java:79 | 2018/08/30 09:19:30 | 0.1 s | 2/2 | 393/393 |
14748 | Streaming job from [output operation 0, batch time 09:19:30] collect at SparkConsumerKafka.scala:127 | 2018/08/30 09:19:30 | 85 ms | 1/1 | 392/392 |
14747 | Streaming job from [output operation 1, batch time 09:19:20] collect at ProcessedOffsetManager.java:79 | 2018/08/30 09:19:20 | 0.2 s | 2/2 | 724/724 |
14746 | Streaming job from [output operation 0, batch time 09:19:20] collect at SparkConsumerKafka.scala:127 | 2018/08/30 09:19:20 | 0.2 s | 1/1 | 723/723 |
14745 | Streaming job from [output operation 1, batch time 09:19:10] collect at ProcessedOffsetManager.java:79 | 2018/08/30 09:19:10 | 0.1 s | 2/2 | 413/413 |
14744 | Streaming job from [output operation 0, batch time 09:19:10] collect at SparkConsumerKafka.scala:127 | 2018/08/30 09:19:10 | 0.1 s | 1/1 | 412/412 |
14743 | Streaming job from [output operation 1, batch time 09:19:00] collect at ProcessedOffsetManager.java:79 | 2018/08/30 09:19:00 | 0.2 s | 2/2 |
Thanks!
from kafka-spark-consumer.
So I see one instances of spike to 4 sec for ProcessedOffsetManager. And after that it is again fine
14758 | Streaming job from [output operation 1, batch time 09:20:10] collect at ProcessedOffsetManager.java:79 | 2018/08/30 09:20:11 | 4 s | 2/2 | 322/322
from kafka-spark-consumer.
For each cases , the prior stage is AggregationHandler.scala. Is this your business logic ?
from kafka-spark-consumer.
Yes, AggregationHandler will run the business logic on the received Message which collected in the last 20 minutes.
from kafka-spark-consumer.
Do you think this is an issue with ProcessedOffsetManager ? I do not find any issue. It could be issue with Zookeeper . How big is your ZK cluster ? Can you try increasing max connection settings( in zk.cfg) to some higher value ?
from kafka-spark-consumer.
I will try to change some parameter in zk.cfg. Thanks!
from kafka-spark-consumer.
Related Issues (20)
- Why appear this exception information? HOT 8
- Not working with Spark 2.2.0 HOT 11
- How to use in kerberized context ? HOT 3
- AbstractMethodError with Spark 1.6.0 and Kafka 0.10.2 HOT 9
- Exception: Could not compute split, block not found HOT 6
- Hello, Compilation failed after changing Kafka version 0.10.0.0 HOT 5
- Kafka Headers Support HOT 7
- How to recover the failed receiver on a partition which has exception of " Offsets out of range with no configured reset policy for partitions:" HOT 18
- May I have a Scala sample of messageHandler to filter out some playload which includes some strings? HOT 20
- The Spark Streaming can not read kafka message HOT 8
- It works well in local model,but when I submit it in cluster model,the fixed rate is too small HOT 5
- Does this support spark structured streaming HOT 1
- java.lang.NoClassDefFoundError: kafka/api/OffsetRequest HOT 22
- Offset is still updated when exception occurs during processing HOT 28
- Manipulation of offsetRanges in each batch
- example doesn't build HOT 1
- Trying to fetch Multi topic In Local , But It is showing warning like this HOT 2
- Can a higher version of kafka be supported HOT 2
- Can a higher version of Spark be supported? Spark 3.2.0 for example. HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from kafka-spark-consumer.