This is an Integration Project of Apache Kafka with Spark
In this project, you will be provided with a real-world dataset, extracted from Kaggle, on San Francisco crime incidents, and you will provide statistical analyses of the data using Apache Spark Structured Streaming. You will draw on the skills and knowledge you've learned in this course to create a Kafka server to produce data and ingest data through Spark Structured Streaming.
You can try to answer the following questions with the dataset:
- What are the top types of crimes in San Fransisco?
- What is the crime density by location?
You may choose to create your project in the workspace we provide here, or if you wish to develop your project locally, you will need to set up your environment properly as described below:
- Spark 2.4.3
- Scala 2.11.x
- Java 1.8.x
- Kafka build with Scala 2.11.x
- Python 3.6.x or 3.7.x
- Download Spark from https://spark.apache.org/downloads.html. Choose "Prebuilt for Apache Hadoop 2.7 and later."
- Unpack Spark in one of your folders (I usually put all my dev requirements in /home/users/user/dev).
- Download binary for Kafka from this location https://kafka.apache.org/downloads, with Scala 2.11, version 2.3.0. Unzip in your local directory where you unzipped your Spark binary as well. Exploring the Kafka folder, you’ll see the scripts to execute in
bin
folders, and config files underconfig
folder. You’ll need to modifyzookeeper.properties
andserver.properties
. - Download Scala from the official site, or for Mac users, you can also use
brew install scala
, but make sure you download version 2.11.x. - Run below to verify correct versions:
java -version scala -version
- Make sure your ~/.bash_profile looks like below (might be different depending on your directory):
export SPARK_HOME=/Users/dev/spark-2.4.3-bin-hadoop2.7 export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home export SCALA_HOME=/usr/local/scala/ export PATH=$JAVA_HOME/bin:$SPARK_HOME/bin:$SCALA_HOME/bin:$PATH
Please follow the directions found in this helpful StackOverflow post: https://stackoverflow.com/questions/25481325/how-to-set-up-spark-on-windows
SF Crime Data
You can find three Python files that are starter code, the project dataset, and some other necessary resources in a zip file called "SF Crime Data Project Files" in the Resources tab in the left sidebar of your classroom:
producer_server.py
kafka_server.py
data_stream.py
police-department-calls-for-service.json
radio_code.json
start.sh
requirements.txt
These files are also included in the Project Workspace.
These starter code files should be edited:
producer_server.py
data_stream.py
kafka_server.py
The following file should be created separately for you to check if your kafka_server.py
is working properly:
consumer_server.py
Create a new repo that will contain all these files for your project. You will submit a link to this repo as a key part of your project submission. If you complete the project in the classroom workspace here, just download the files you worked on and add them to your repo.
This project requires creating topics, starting Zookeeper and Kafka servers, and your Kafka bootstrap server. You’ll need to choose a port number (e.g., 9092, 9093..) for your Kafka topic, and come up with a Kafka topic name and modify the zookeeper.properties and server.properties appropriately.
-
Install requirements using
./start.sh
if you use conda for Python. If you use pip rather than conda, then usepip install -r requirements.txt
. -
Use the commands below to start the Zookeeper and Kafka servers. You can find the bin and config folder in the Kafka binary that you have downloaded and unzipped.
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
- You can start the bootstrap server using this Python command:
python producer_server.py
.
- Modify the zookeeper.properties and producer.properties given to suit your topic and port number of your choice. Start up these servers in the terminal using the commands:
/usr/bin/zookeeper-server-start /etc/kafka/zookeeper.properties kafka-server-start /etc/kafka/server.properties
-
You’ll need to open up two terminal tabs to execute each command.
-
Install requirements using the provided
./start.sh
script. This needs to be done every time you re-open the workspace, or anytime after you've refreshed, or woken up, or reset data, or used the "Get New Content" button in this workspace. -
In the terminal, to install other packages that you think are necessary to complete the project, use
conda install <package_name>
. You may need to reinstall these packages every time you re-open the workspace, or anytime after you've refreshed, or woken up, or reset data, or used the "Get New Content" button in this workspace.
- The first step is to build a simple Kafka server.
- Complete the code for the server in
producer_server.py
andkafka_server.py
.
Local Environment
To see if you correctly implemented the server, use the command below to see your output
bin/kafka-console-consumer.sh --bootstrap-server localhost:<your-port-number> --topic <your-topic-name> --from-beginning
- Start Zookeeper server
$zookeeper-server-start config/zookeeper.properties
- Start Kafka server
$kafka-server-start /config/server.properties
- Create the topic sf_crime
$kafka-topics --zookeeper localhost:2181 --create --topic sf_crime --replication-factor 1 --partitions 1
- Verify if the topic sf_crime exists
$kafka-topics --zookeeper localhost:2181 --list
- Start the Kafka producer server
$python kafka_server.py
- Run kafka-console-consumer
kafka-console-consumer --bootstrap-server localhost:9092 --topic sf_crime --from-beginning
Kafka Consumer Console Output (Screenshot)
- Apache Spark already has an integration with Kafka brokers, so we would not normally need a separate Kafka consumer. However, we are going to ask you to create one anyway. Why? We'd like you to create the consumer to demonstrate your understanding of creating a complete Kafka Module (producer and consumer) from scratch. In production, you might have to create a dummy producer or consumer to just test out your theory and this will be great practice for that.
- Implement all the TODO items in
data_stream.py
. You may need to explore the dataset beforehand using a Jupyter Notebook. - Do a spark-submit using this command:
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.4 --master local[*] data_stream.py
. - Take a screenshot of your progress reporter after executing a Spark job. You will need to include this screenshot as part of your project submission.
- Take a screenshot of the Spark Streaming UI as the streaming continues. You will need to include this screenshot as part of your project submission. Run the following to view Spark UI
Progress Reporter Output (Screenshot)
wget "http://localhost:3000"
Or just click Preview button
in the online environment
Spark Console UI
-
did changing values on the SparkSession property parameters affect the throughput and latency of the data?
- Yes, changing to different values impacted the time it tooks to process and complete jobs / tasks
- If core's numbers are assigned to master("local[*]") we can see it has had the most visible impact. I took a comparative between different measures with 1 core and with the cores the host allow me (*) and it seems if I assign 1 core, more tasks has been completed in the same time as I would increment the number of cores, even when the real processing time is more delayed. When 1 core is assigned Shuffle reads and writes decrease.
- Another configs I tuned were maxRatePerPartition and maxOffsetsPerTrigger, this options affect the behaviour in terms of throughput and latency.
-
What were the 2-3 most efficient SparkSession property key/value pairs? Through testing multiple variations on values, how can you tell these were the most optimal?
I did some camparisons and I think the best values were obtained assigning the next values
Property Value maxRatePerPartition 100 maxOffsetsPerTrigger 200 master local[1]
MaxRatePerPartition = 20 & maxOffsetPerTrigger = 20 |
MaxRatePerPartition = 20 & maxOffsetPerTrigger = 20 |
|||
Task Time |
2.0 minutes |
Task Time |
2.0 minutes |
|
shufle read |
79 KB |
shufle read |
51.3 KB |
|
shuffled write |
79.2 KB |
shuffled write |
51.5 KB |
|
Complete Tasks |
1529 |
Complete Tasks |
3172 |
|
Storage/Memory |
647.4 KB/384.1 KB |
Storage/Memory |
1.3 KB/384.1 MB |
|
MaxRatePerPartition = 100 & maxOffsetPerTrigger = 100 |
MaxRatePerPartition = 100 & maxOffsetPerTrigger = 100 |
|||
Task Time |
2.0 minutes |
Task Time |
2.0 minutes |
|
shufle read |
120.8 KB |
shufle read |
47.4 KB |
|
shuffled write |
121.3 KB |
shuffled write |
47.7 KB |
|
Complete Tasks |
1490 |
Complete Tasks |
3494 |
|
Storage/Memory |
647.5 KB/384.1 KB |
Storage/Memory |
1.5 MB / 384.1 MB |
|
MaxRatePerPartition = 100 & maxOffsetPerTrigger = 200 |
MaxRatePerPartition = 100 & maxOffsetPerTrigger = 200 |
|||
Task Time |
2.0 minutes |
Task Time |
2.0 minutes |
|
shufle read |
6.6 KB |
shufle read |
42.8 KB |
|
shuffled write |
6.9 KB |
shuffled write |
43 KB |
|
Complete Tasks |
1696 |
Complete Tasks |
3371 |
|
Storage/Memory |
728.1 KB/384.1 KB |
Storage/Memory |
1.4 MB/384.1 MB |
|
MaxRatePerPartition = 200 & maxOffsetPerTrigger = 100 |
MaxRatePerPartition = 200 & maxOffsetPerTrigger = 100 |
|||
Task Time |
2.0 minutes |
Task Time |
2.0 minutes |
|
shufle read |
14.3 KB |
shufle read |
32.4 KB |
|
shuffled write |
14.5 KB |
shuffled write |
32.8 KB |
|
Complete Tasks |
1689 |
Complete Tasks |
3371 |
|
Storage/Memory |
728.1 KB/384.1 KB |
Storage/Memory |
1.4 MB/384.1 MB |
|
MaxRatePerPartition = 200 & maxOffsetPerTrigger = 200 |
MaxRatePerPartition = 200 & maxOffsetPerTrigger = 200 |
|||
Task Time |
2.0 minutes |
Task Time |
2.0 minutes |
|
shufle read |
19.9 KB |
shufle read |
24.6 KB |
|
shuffled write |
20.0 KB |
shuffled write |
24.9 KB |
|
Complete Tasks |
1743 |
Complete Tasks |
3372 |
|
Storage/Memory |
728.4 KB/384.1 KB |
Storage/Memory |
1.4 MB / 384.1 MB |
|
MaxRatePerPartition = 400 & maxOffsetPerTrigger = 400 |
MaxRatePerPartition = 400 & maxOffsetPerTrigger = 400 |
|||
Task Time |
2.0 minutes |
Task Time |
2.0 minutes |
|
shufle read |
25.2 KB |
shufle read |
11.8 KB |
|
shuffled write |
25.2 KB |
shuffled write |
11.9 KB |
|
Complete Tasks |
1588 |
Complete Tasks |
3366 |
|
Storage/Memory |
647.4 KB/384.1 KB |
Storage/Memory |
1.4 MB/384.1 KB |