Giter Site home page Giter Site logo

After long time running, the processing time of "ProcessedOffsetManager.persists(partitonOffset_stream, props)" incresing. about kafka-spark-consumer HOT 13 CLOSED

dibbhatt avatar dibbhatt commented on June 10, 2024
After long time running, the processing time of "ProcessedOffsetManager.persists(partitonOffset_stream, props)" incresing.

from kafka-spark-consumer.

Comments (13)

dibbhatt avatar dibbhatt commented on June 10, 2024

You mean to say initially the offset persists is doing fast and after long time it started taking longer time ?

After how long you see this behavior ? If you restart is the ProcessedOffsetManager.persists still taking longer time ? This offset is writing to Zookeeper. So ideally it should not have issue.

Which version of this consumer you are using ?

from kafka-spark-consumer.

yufuxin avatar yufuxin commented on June 10, 2024

Yes, here is logs come from spark UI:
233 | Streaming job from [output operation 1, batch time 09:45:20] collect at ProcessedOffsetManager.java:79 | 2018/08/24 09:45:20 | 23 ms
659 | Streaming job from [output operation 1, batch time 10:04:10] collect at ProcessedOffsetManager.java:79 | 2018/08/24 10:04:10 | 48 ms | 2/2
The spark-streaming is created as following:
val ssc = new StreamingContext(sc, Seconds(10))
val numberOfReceivers = 12
val kafkaProperties: Map[String, String] =
Map("zookeeper.hosts" -> zkhosts,
"zookeeper.port" -> zkports,
"kafka.topic" -> topics,
"bootstrap.servers" -> brokers,
"group.id" -> groupId,
"security.protocol" -> "SASL_PLAINTEXT",
"kafka.consumer.id" -> "kafka-consumer"
)
val props = new java.util.Properties()
kafkaProperties foreach { case (key, value) => props.put(key, value) }
val stream = ReceiverLauncher.launch(ssc, props, numberOfReceivers, StorageLevel.MEMORY_ONLY)
If I restart the job, initially is fast and then becomes slow. I am using consumer-1.0.14, job script is 👍
nohup spark-submit --class com.haloutput.main.SparkConsumerKafka
--master yarn --deploy-mode cluster
--driver-cores 8 --driver-memory 16g --executor-memory 4g --executor-cores 6 --num-executors 12
--conf spark.yarn.maxAppAttempts=4
--conf spark.speculation=false
--conf spark.yarn.am.attemptFailuresValidityInterval=1h
--conf spark.streaming.kafka.maxRatePerPartition=12000
--conf spark.yarn.max.executor.failures=400
--conf spark.yarn.executor.failuresValidityInterval=1h
--conf spark.task.maxFailures=8
--conf spark.hadoop.fs.hdfs.impl.disable.cache=true
--conf "spark.streaming.kafka.consumer.cache.enabled=true"
--conf "spark.dynamicAllocation.enabled=false"
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas.conf"
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas.conf"
--files /etc/spark/conf/hive-site.xml,"./kafka_client_jaas.conf,./xxxx.keytab"
--jars spark-streaming-kafka-0-10_2.11-2.2.1.jar,log4j-api-2.7.jar,log4j-core-2.7.jar,metrics-core-2.2.0.jar,zkclient-0.9.jar,spark-avro_2.11
-3.2.0.jar,kafka-clients-0.11.0.0.jar,bijection-core_2.11-0.9.2.jar,bijection-avro_2.11-0.9.2.jar,kafka-spark-consumer-1.0.14.jar,kafka_2.11-0.11.0
.2.jar,json-simple-1.1.1.jar xxxx.jar ,xxx.xxx.xxx 2181 xxxx
xxx.xxx.xxx:6667 xxxx xxxxx xxxxxx xxxxx 2 4 0 >/dev/null&
Thanks,

from kafka-spark-consumer.

dibbhatt avatar dibbhatt commented on June 10, 2024

Can you switch to 1.0.13 version and let me know if you see same issue .

from kafka-spark-consumer.

yufuxin avatar yufuxin commented on June 10, 2024

I can not shift to 1.0.13 since the security connection issue.
Thanks

from kafka-spark-consumer.

dibbhatt avatar dibbhatt commented on June 10, 2024

ok,got it. Give me a day or two. I will try to reproduce this issue and will get back to you.

from kafka-spark-consumer.

dibbhatt avatar dibbhatt commented on June 10, 2024

Is it possible to share me the timing of each stages of ProcessedOffsetManager.persists. Want to see if the issue is with the writing offset to ZK or getting the Max offsets for each partition .

from kafka-spark-consumer.

dibbhatt avatar dibbhatt commented on June 10, 2024

Also can you try to set consumer.num_fetch_to_buffer = 5 and see if this improves your offset commit. This property will buffer 5 fetch and write larger block to Spark Block Manager. Hence you will see your RDD partition is less but larger.

from kafka-spark-consumer.

yufuxin avatar yufuxin commented on June 10, 2024
111 Streaming job from [output operation 1, batch time 10:15:30] collect at ProcessedOffsetManager.java:79 2018/08/29 10:15:30 36 ms 2/2 41/41
110 Streaming job from [output operation 0, batch time 10:15:30] collect at SparkConsumerKafka.scala:127 2018/08/29 10:15:30 19 ms 1/1 40/40
109 Streaming job from [output operation 1, batch time 10:15:20] collect at ProcessedOffsetManager.java:79 2018/08/29 10:15:20 0.4 s 2/2 1242/1242
108 Streaming job from [output operation 0, batch time 10:15:20] collect at SparkConsumerKafka.scala:127 2018/08/29 10:15:20 0.3 s 1/1 1241/1241
107 Streaming job from [output operation 1, batch time 10:15:10] collect at ProcessedOffsetManager.java:79 2018/08/29 10:15:10 28 ms 2/2 17/17
106 Streaming job from [output operation 0, batch time 10:15:10] collect at SparkConsumerKafka.scala:127 2018/08/29 10:15:10 17 ms 1/1 16/16
105 Streaming job from [output operation 1, batch time 10:15:00] collect at ProcessedOffsetManager.java:79 2018/08/29 10:15:00 0.3 s 2/2 1267/1267
104 Streaming job from [output operation 0, batch time 10:15:00] collect at SparkConsumerKafka.scala:127 2018/08/29 10:15:00 0.3 s 1/1 1266/1266
103 Streaming job from [output operation 1, batch time 10:14:50] collect at ProcessedOffsetManager.java:79 2018/08/29 10:14:50 25 ms 2/2 22/22
102 Streaming job from [output operation 0, batch time 10:14:50] collect at SparkConsumerKafka.scala:127 2018/08/29 10:14:50 18 ms 1/1 21/21
101 Streaming job from [output operation 1, batch time 10:14:40] collect at ProcessedOffsetManager.java:79 2018/08/29 10:14:40 0.4 s 2/2 1260/1260
100 Streaming job from [output operation 0, batch time 10:14:40] collect at SparkConsumerKafka.scala:127 2018/08/29 10:14:40 0.3 s 1/1 1259/1259
99 Streaming job from [output operation 1, batch time 10:14:30] collect at ProcessedOffsetManager.java:79 2018/08/29 10:14:30 24 ms 2/2 14/14
98 Streaming job from [output operation 0, batch time 10:14:30] collect at SparkConsumerKafka.scala:127 2018/08/29 10:14:30 13 ms 1/1 13/13
97 Streaming job from [output operation 1, batch time 10:14:20] collect at ProcessedOffsetManager.java:79 2018/08/29 10:14:20 0.3 s 2/2 1269/1269

..........
After about 20 hours running:
..........

14787 Streaming job from [output operation 1, batch time 09:21:20] collect at ProcessedOffsetManager.java:79 2018/08/30 09:21:20 0.3 s 2/2 816/816
14786 Streaming job from [output operation 0, batch time 09:21:20] collect at SparkConsumerKafka.scala:127 2018/08/30 09:21:20 0.2 s 1/1 815/815
14785 Streaming job from [output operation 1, batch time 09:21:10] collect at ProcessedOffsetManager.java:79 2018/08/30 09:21:11 96 ms 2/2 308/308
14784 Streaming job from [output operation 0, batch time 09:21:10] collect at SparkConsumerKafka.scala:127 2018/08/30 09:21:10 0.2 s 1/1 307/307
14783 count at AggregationHandler.scala:148 2018/08/30 09:21:10 0.2 s 3/3 (288 skipped) 601/601 (589480 skipped)
14782 Streaming job from [output operation 1, batch time 09:21:00] collect at ProcessedOffsetManager.java:79 2018/08/30 09:21:06 0.3 s 2/2 794/794
14781 Streaming job from [output operation 0, batch time 09:21:00] collect at SparkConsumerKafka.scala:127 2018/08/30 09:21:06 0.3 s 1/1 793/793
14780 save at AggregationHandler.scala:147 2018/08/30 09:21:06 0.6 s 2/2 (288 skipped) 401/401 (589480 skipped)
14779 pivot at AggregationHandler.scala:134 2018/08/30 09:21:01 13 ms 1/1 (290 skipped) 3/3 (590080 skipped)
14778 Streaming job from [output operation 1, batch time 09:20:50] collect at ProcessedOffsetManager.java:79 2018/08/30 09:20:57 5 s 2/2 300/300
14777 pivot at AggregationHandler.scala:134 2018/08/30 09:20:57 78 ms 2/2 (289 skipped) 201/201 (589880 skipped)
14776 Streaming job from [output operation 0, batch time 09:20:50] collect at SparkConsumerKafka.scala:127 2018/08/30 09:20:52 4 s 1/1 299/299
14775 pivot at AggregationHandler.scala:134 2018/08/30 09:20:52 0.2 s 2/2 (288 skipped) 600/600 (589480 skipped)
14774 Streaming job from [output operation 1, batch time 09:20:40] collect at ProcessedOffsetManager.java:79 2018/08/30 09:20:47 5 s 2/2 782/782
14773 count at AggregationHandler.scala:127 2018/08/30 09:20:47 0.2 s 3/3 (288 skipped) 601/601 (589480 skipped)
14772 Streaming job from [output operation 0, batch time 09:20:40] collect at SparkConsumerKafka.scala:127 2018/08/30 09:20:43 5 s 1/1 781/781
14771 run at ThreadPoolExecutor.java:1142 2018/08/30 09:20:43 0.2 s 2/2 (288 skipped) 600/600 (589480 skipped)
14770 save at AggregationHandler.scala:126 2018/08/30 09:20:38 0.6 s 2/2 (288 skipped) 401/401 (589480 skipped)
14769 run at ThreadPoolExecutor.java:1142 2018/08/30 09:20:33 0.2 s 2/2 (288 skipped) 600/600 (589480 skipped)
14768 Streaming job from [output operation 1, batch time 09:20:30] collect at ProcessedOffsetManager.java:79 2018/08/30 09:20:32 2 s 2/2 298/298
14767 Streaming job from [output operation 0, batch time 09:20:30] collect at SparkConsumerKafka.scala:127 2018/08/30 09:20:32 79 ms 1/1 297/297
14766 pivot at AggregationHandler.scala:107 2018/08/30 09:20:32 18 ms 1/1 (290 skipped) 2/2 (590080 skipped)
14765 pivot at AggregationHandler.scala:107 2018/08/30 09:20:27 54 ms 2/2 (289 skipped) 201/201 (589880 skipped)
14764 pivot at AggregationHandler.scala:107 2018/08/30 09:20:23 0.2 s 2/2 (288 skipped) 600/600 (589480 skipped)
14763 Streaming job from [output operation 1, batch time 09:20:20] collect at ProcessedOffsetManager.java:79 2018/08/30 09:20:21 2 s 2/2 789/789
14762 pivot at AggregationHandler.scala:97 2018/08/30 09:20:21 20 ms 1/1 (290 skipped) 7/7 (590080 skipped)
14761 Streaming job from [output operation 0, batch time 09:20:20] collect at SparkConsumerKafka.scala:127 2018/08/30 09:20:20 1 s 1/1 788/788
14760 pivot at AggregationHandler.scala:97 2018/08/30 09:20:20 18 ms 1/1 (290 skipped) 4/4 (590080 skipped)
14759 pivot at AggregationHandler.scala:97 2018/08/30 09:20:15 68 ms 2/2 (289 skipped) 201/201 (589880 skipped)
14758 Streaming job from [output operation 1, batch time 09:20:10] collect at ProcessedOffsetManager.java:79 2018/08/30 09:20:11 4 s 2/2 322/322
14757 Streaming job from [output operation 0, batch time 09:20:10] collect at SparkConsumerKafka.scala:127 2018/08/30 09:20:10 1 s 1/1 321/321
14756 pivot at AggregationHandler.scala:97 2018/08/30 09:20:07 4 s 7/7 (283 skipped) 10840/10840 (579240 skipped)
14755 Streaming job from [output operation 1, batch time 09:20:00] collect at ProcessedOffsetManager.java:79 2018/08/30 09:20:00 0.2 s 2/2 707/707
14754 Streaming job from [output operation 0, batch time 09:20:00] collect at SparkConsumerKafka.scala:127 2018/08/30 09:20:00 0.2 s 1/1 706/706
14753 Streaming job from [output operation 1, batch time 09:19:50] collect at ProcessedOffsetManager.java:79 2018/08/30 09:19:50 0.1 s 2/2 512/512
14752 Streaming job from [output operation 0, batch time 09:19:50] collect at SparkConsumerKafka.scala:127 2018/08/30 09:19:50 0.1 s 1/1 511/511
14751 Streaming job from [output operation 1, batch time 09:19:40] collect at ProcessedOffsetManager.java:79 2018/08/30 09:19:40 0.2 s 2/2 588/588
14750 Streaming job from [output operation 0, batch time 09:19:40] collect at SparkConsumerKafka.scala:127 2018/08/30 09:19:40 0.1 s 1/1 587/587
14749 Streaming job from [output operation 1, batch time 09:19:30] collect at ProcessedOffsetManager.java:79 2018/08/30 09:19:30 0.1 s 2/2 393/393
14748 Streaming job from [output operation 0, batch time 09:19:30] collect at SparkConsumerKafka.scala:127 2018/08/30 09:19:30 85 ms 1/1 392/392
14747 Streaming job from [output operation 1, batch time 09:19:20] collect at ProcessedOffsetManager.java:79 2018/08/30 09:19:20 0.2 s 2/2 724/724
14746 Streaming job from [output operation 0, batch time 09:19:20] collect at SparkConsumerKafka.scala:127 2018/08/30 09:19:20 0.2 s 1/1 723/723
14745 Streaming job from [output operation 1, batch time 09:19:10] collect at ProcessedOffsetManager.java:79 2018/08/30 09:19:10 0.1 s 2/2 413/413
14744 Streaming job from [output operation 0, batch time 09:19:10] collect at SparkConsumerKafka.scala:127 2018/08/30 09:19:10 0.1 s 1/1 412/412
14743 Streaming job from [output operation 1, batch time 09:19:00] collect at ProcessedOffsetManager.java:79 2018/08/30 09:19:00 0.2 s 2/2  

Thanks!

from kafka-spark-consumer.

dibbhatt avatar dibbhatt commented on June 10, 2024

So I see one instances of spike to 4 sec for ProcessedOffsetManager. And after that it is again fine

14758 | Streaming job from [output operation 1, batch time 09:20:10] collect at ProcessedOffsetManager.java:79 | 2018/08/30 09:20:11 | 4 s | 2/2 | 322/322

from kafka-spark-consumer.

dibbhatt avatar dibbhatt commented on June 10, 2024

For each cases , the prior stage is AggregationHandler.scala. Is this your business logic ?

from kafka-spark-consumer.

yufuxin avatar yufuxin commented on June 10, 2024

Yes, AggregationHandler will run the business logic on the received Message which collected in the last 20 minutes.

from kafka-spark-consumer.

dibbhatt avatar dibbhatt commented on June 10, 2024

Do you think this is an issue with ProcessedOffsetManager ? I do not find any issue. It could be issue with Zookeeper . How big is your ZK cluster ? Can you try increasing max connection settings( in zk.cfg) to some higher value ?

from kafka-spark-consumer.

yufuxin avatar yufuxin commented on June 10, 2024

I will try to change some parameter in zk.cfg. Thanks!

from kafka-spark-consumer.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.