Comments (15)
Hi ,
I will take a look at it what is going on.
Just for your information , the kafka.partitions.number probably seems to be misleading I guess.
This is the value to be set as number of partition for your topic . Even if you do not provide it , the consumer still calculate the partition number . So this value will not make any difference.
How many number of partitions are there in your topic ?
If you want as many receiver as you partition , you can use that many number of Receivers .
Say if your topic have 50 partition have 50 receiver which will distribute the load ..
int numberOfReceivers = 50;
JavaDStream unionStreams = ReceiverLauncher.launch(jsc, props, numberOfReceivers,StorageLevel.MEMORY_ONLY());
Just try this option and let me know if that helps .
from kafka-spark-consumer.
Hi,
I will eventually have data on 50 partitions, and more receivers but right now the data are only pushed to a single partition.
It's really not much (4 Mo/s), and I am expecting a single receiver to be able to handle it without requiring that much CPU, as even the Kafka client for PHP is way faster.
Increasing the number of receivers wouldn't help here, as the data is going to a single partition.
In the first setup, I had kafka.partitions.number
set to 1
. In the second setup, I still push the same data, and only to a single partition, but I had kafka.partitions.number
set to 50
as I am planning to eventually use 50 partitions, but with way more data than 4 Mo/s.
But right now 49 partitions are inactive so the CPU usage shouldn't go to the roof.
from kafka-spark-consumer.
Hi ,
For Receiver based consumer , Spark Receiver task always running on one Executor Core.
For your second setup, you specified kafka.partitions.number as 50 . What is the number of Receivers you specified ? That is also set as 50 or that is still 1 ?
The issue here is even if other 49 partition does not have any data , the Receiver task is a Long Running process which will continue check for Kafka partition every FillFreqMs duration for new data. And this leads to CPU usage.
There can be a workaround when you want to consume from ONLY one partition ..you can directly use below API. Let assume 0 is the Partition_0 you want to consume..
JavaDStream oneStream = jsc.receiverStream ( new KafkaReceiver(props, 0 , StorageLevel.MEMORY_ONLY()));
Or if you want to consume ONLY subset of partition ( ignoring others )..where partitionSet is Set of Integers of partition_id you want to consume using Single Receiver.
JavaDStream myStream = jsc.receiverStream ( new KafkaRangeReceiver(props, partitionSet , StorageLevel.MEMORY_ONLY()));
Once your all Partitions are getting data and when you have enough CPU core to test all your Receivers , you can use the ReceiverLauncher.launch API which eventually internally distribute the partitions to Receivers.
Dibyendu
from kafka-spark-consumer.
Hi Dibyendu,
I am using a single receiver in both cases. So in the second case, there is only one receiver for the 0-49 partition range.
Gotcha. Even when partitions don't have any data, they keep checking for data every FillFreqMs
. Still, it's quite surprising that 50 receivers checking every 200ms require 100% of the available CPU cycles.
from kafka-spark-consumer.
Yes it is surprising. I will try to create similar environment like yours with 50 partitions with one partition active and create one Receiver task. Can you please let me know which spark version you are running ?
Dibyendu
from kafka-spark-consumer.
sorry mistakenly press close button
from kafka-spark-consumer.
Hi Dibyendu
I am using Spark 1.2.0
from kafka-spark-consumer.
Hi ,
Made some changes to fix the High CPU usage while receiving from topics with large partitions. Kindly get the latest code and let me know if that improve your CPU consumption.
regards,
Dibyendu
from kafka-spark-consumer.
Awesome, thanks! Gonna give it a spin right away.
from kafka-spark-consumer.
Hi Dibyendu,
CPU usage remains 100% :(
from kafka-spark-consumer.
I hope you took the latest code from github , not from Spark-Packages Repo.
In my setup running on AWS with 4 node Spark 1.2 cluster , I created Kafka topic with 50 partition and receiving with 1 receiver. With the latest fix the CPU usage stays around 8 to 9 % for driver and around 17 % for the worker node where Receiver task is running .
I used the same Consumer code in which is there in github..
Is it possible to share the code you are using .
from kafka-spark-consumer.
And also your cluster configuration ..
from kafka-spark-consumer.
Hi Dibyendu,
My bad, the changes you made yesterday actually make a huge difference. I had overlooked that you had bumped the version number and my code was still loading the previous version.
This, along with disabling the WAL significantly reduced the CPU usage.
Kudos and thanks for addressing this issue so quickly. You rock.
from kafka-spark-consumer.
@jedisct1 glad to hear that it has worked for you .
from kafka-spark-consumer.
Hi @jedisct1
I have created a JIRA to track the progress of contributing back this project to Apache Spark.
https://issues.apache.org/jira/browse/SPARK-11045
This project is now presently in spark-packages and I believe this is the correct time to contribute it to Apache Spark Project and give better options to larger community around Kafka Connectivity for Spark Streaming.
kindly Vote for this JIRA.
from kafka-spark-consumer.
Related Issues (20)
- Why appear this exception information? HOT 8
- Not working with Spark 2.2.0 HOT 11
- How to use in kerberized context ? HOT 3
- AbstractMethodError with Spark 1.6.0 and Kafka 0.10.2 HOT 9
- Exception: Could not compute split, block not found HOT 6
- Hello, Compilation failed after changing Kafka version 0.10.0.0 HOT 5
- Kafka Headers Support HOT 7
- After long time running, the processing time of "ProcessedOffsetManager.persists(partitonOffset_stream, props)" incresing. HOT 13
- How to recover the failed receiver on a partition which has exception of " Offsets out of range with no configured reset policy for partitions:" HOT 18
- May I have a Scala sample of messageHandler to filter out some playload which includes some strings? HOT 20
- The Spark Streaming can not read kafka message HOT 8
- It works well in local model,but when I submit it in cluster model,the fixed rate is too small HOT 5
- Does this support spark structured streaming HOT 1
- java.lang.NoClassDefFoundError: kafka/api/OffsetRequest HOT 22
- Offset is still updated when exception occurs during processing HOT 28
- Manipulation of offsetRanges in each batch
- example doesn't build HOT 1
- Trying to fetch Multi topic In Local , But It is showing warning like this HOT 2
- Can a higher version of kafka be supported HOT 2
- Can a higher version of Spark be supported? Spark 3.2.0 for example. HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from kafka-spark-consumer.