Giter Site home page Giter Site logo

lucacanali / sparkmeasure Goto Github PK

View Code? Open in Web Editor NEW
687.0 34.0 144.0 1.9 MB

This is the development repository for sparkMeasure, a tool and library designed for efficient analysis and troubleshooting of Apache Spark jobs. It focuses on easing the collection and examination of Spark metrics, making it a practical choice for both developers and data engineers.

License: Apache License 2.0

Scala 91.25% Python 8.75%
spark apache-spark performance-troubleshooting performance-metrics python scala

sparkmeasure's Introduction

SparkMeasure

Test Maven Central DOI PyPI PyPI - Downloads API Documentation

SparkMeasure is a tool for performance troubleshooting of Apache Spark jobs

SparkMeasure is a tool and library designed for efficient analysis and troubleshooting of Apache Spark jobs. It focuses on easing the collection and examination of Spark metrics, making it a practical choice for both developers and data engineers. With sparkMeasure, users can obtain a clearer understanding of their Spark job performance, facilitating smoother and more reliable data processing operations.

Key Features

  • Interactive Troubleshooting: Ideal for real-time analysis of Spark workloads in notebooks and spark-shell/pyspark environments.
  • Development & CI/CD Integration: Facilitates testing, measuring, and comparing execution metrics of Spark jobs under various configurations or code changes.
  • Batch Job Analysis: With Flight Recorder mode sparkMeasure records and analyzes batch job metrics for thorough inspection.
  • Monitoring Capabilities: Seamlessly integrates with external systems like InfluxDB, Apache Kafka, and Prometheus Push Gateway for extensive monitoring.
  • Educational Tool: Serves as a practical example of implementing Spark Listeners for the collection of detailed Spark task metrics.
  • Language Compatibility: Fully supports Scala, Java, and Python, making it versatile for a wide range of Spark applications.

Contents

Resources

Main author and contact: [email protected]


Getting started with sparkMeasure

Demo

Watch the video Watch sparkMeasure's getting started demo tutorial

Examples of sparkMeasure on notebooks

Examples of sparkMeasure on the CLI

  • Run locally or on hosted resources
    • Open in GitHub Codespaces
# Scala CLI
spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.12:0.24

val stageMetrics = ch.cern.sparkmeasure.StageMetrics(spark)
stageMetrics.runAndMeasure(spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show())
# Python CLI
# pip install pyspark
pip install sparkmeasure
pyspark --packages ch.cern.sparkmeasure:spark-measure_2.12:0.24

from sparkmeasure import StageMetrics
stagemetrics = StageMetrics(spark)
stagemetrics.runandmeasure(globals(), 'spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show()')

The output should look like this:

+----------+
|  count(1)|
+----------+
|1000000000|
+----------+

Time taken: 3833 ms

Scheduling mode = FIFO
Spark Context default degree of parallelism = 8

Aggregated Spark stage metrics:
numStages => 3
numTasks => 17
elapsedTime => 1112 (1 s)
stageDuration => 864 (0.9 s)
executorRunTime => 3358 (3 s)
executorCpuTime => 2168 (2 s)
executorDeserializeTime => 892 (0.9 s)
executorDeserializeCpuTime => 251 (0.3 s)
resultSerializationTime => 72 (72 ms)
jvmGCTime => 0 (0 ms)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 36 (36 ms)
resultSize => 16295 (15.9 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 0
recordsRead => 2000
bytesRead => 0 (0 Bytes)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 8
shuffleTotalBlocksFetched => 8
shuffleLocalBlocksFetched => 8
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 472 (472 Bytes)
shuffleLocalBytesRead => 472 (472 Bytes)
shuffleRemoteBytesRead => 0 (0 Bytes)
shuffleRemoteBytesReadToDisk => 0 (0 Bytes)
shuffleBytesWritten => 472 (472 Bytes)
shuffleRecordsWritten => 8

Average number of active tasks => 3.0

Stages and their duration:
Stage 0 duration => 355 (0.4 s)
Stage 1 duration => 411 (0.4 s)
Stage 3 duration => 98 (98 ms)
  • Stage metrics collection mode has an optional memory report command
    • this is new in sparkMeasure since version 0.21, it requires Spark versions 3.1 or higher
    • note: this report makes use of per-stage memory (executor metrics) data which is sent by the executors at each heartbeat to the driver, there could be a small delay or the order of a few seconds between the end of the job and the time the last metrics value is received.
    • If you receive the error message java.util.NoSuchElementException: key not found, retry running the report after waiting for a few seconds.
(scala)> stageMetrics.printMemoryReport
(python)> stagemetrics.print_memory_report()

Additional stage-level executor metrics (memory usasge info):

Stage 0 JVMHeapMemory maxVal bytes => 322888344 (307.9 MB)
Stage 0 OnHeapExecutionMemory maxVal bytes => 0 (0 Bytes)
Stage 1 JVMHeapMemory maxVal bytes => 322888344 (307.9 MB)
Stage 1 OnHeapExecutionMemory maxVal bytes => 0 (0 Bytes)
Stage 3 JVMHeapMemory maxVal bytes => 322888344 (307.9 MB)
Stage 3 OnHeapExecutionMemory maxVal bytes => 0 (0 Bytes)

Command line example for Task Metrics:

This is similar but slightly different from the example above as it collects metrics at the Task-level rather than Stage-level

# Scala CLI
spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.12:0.24

val taskMetrics = ch.cern.sparkmeasure.TaskMetrics(spark)
taskMetrics.runAndMeasure(spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show())
# Python CLI
# pip install pyspark
pip install sparkmeasure
pyspark --packages ch.cern.sparkmeasure:spark-measure_2.12:0.24

from sparkmeasure import TaskMetrics
taskmetrics = TaskMetrics(spark)
taskmetrics.runandmeasure(globals(), 'spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show()')

Spark configuration

  • Choose the sparkMeasure version suitable for your environment:

    • For Spark 3.x, please use the latest version
    • For Spark 2.4 and 2.3, use version 0.19
    • For Spark 2.1 and 2.2, use version 0.16
  • Where to get sparkMeasure:

  • Choose your preferred method to include sparkMeasure in your Spark environment:

    • --packages ch.cern.sparkmeasure:spark-measure_2.12:0.24
    • --jars /path/to/spark-measure_2.12-0.24.jar
    • --jars https://github.com/LucaCanali/sparkMeasure/releases/download/v0.24/spark-measure_2.12-0.24.jar
    • --conf spark.driver.extraClassPath=/path/to/spark-measure_2.12-0.24.jar

Examples:

  • Spark with Scala 2.12:

    • Scala: spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.12:0.24
    • Python: pyspark --packages ch.cern.sparkmeasure:spark-measure_2.12:0.24
  • Spark with Scala 2.13:

    • Scala: spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.13:0.24
    • Python: pyspark --packages ch.cern.sparkmeasure:spark-measure_2.13:0.24
      • note: pip install sparkmeasure to get the Python wrapper API
  • Spark 2.4 and 2.3 with Scala 2.11:

    • Scala: spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.11:0.19
    • Python: pyspark --packages ch.cern.sparkmeasure:spark-measure_2.11:0.19
      • note: pip install sparkmeasure==0.19 to get the Python wrapper API

Notes on Spark Metrics

Spark is instrumented with several metrics, collected at task execution, they are described in the documentation:

Some of the key metrics when looking at a sparkMeasure report are:

  • elapsedTime: the time taken by the stage or task to complete (in millisec)
  • executorRunTime: the time the executors spent running the task, (in millisec). Note this time is cumulative across all tasks executed by the executor.
  • executorCpuTime: the time the executors spent running the task, (in millisec). Note this time is cumulative across all tasks executed by the executor.
  • jvmGCTime: the time the executors spent in garbage collection, (in millisec).
  • shuffle metrics: several metrics with details on the I/O and time spend on shuffle
  • I/O metrics: details on the I/O (reads and writes). Note, currently there are no time-based metrics for I/O operations.

To learn more about the metrics, I advise you set up your lab environment and run some tests to see the metrics in action. A good place to start with is TPCDS PySpark - A tool you can use run TPCDS with PySpark, instrumented with sparkMeasure


Documentation, API, and examples

SparkMeasure is one tool for many different use cases, languages, and environments:


Architecture diagram

sparkMeasure architecture diagram


Main concepts underlying sparkMeasure implementation

  • The tool is based on the Spark Listener interface. Listeners transport Spark executor Task Metrics data from the executor to the driver. They are a standard part of Spark instrumentation, used by the Spark Web UI and History Server for example.
  • The tool is built on multiple modules implemented as classes
    • metrics collection and processing can be at the Stage-level or Task-level. The user chooses which mode to use with the API.
    • metrics are can be buffered into memory for real-time reporting, or they can be dumped to an external system in the "flight recorder mode".
    • supported external systems are File Systems supported by the Hadoop API, InfluxDB, Apache Kafka, Prometheus Pushgateway.
  • Metrics are flattened and collected into local memory structures in the driver (ListBuffer of a custom case class).
    • sparkMeasure in flight recorder mode when using one between the InfluxDB sink, Apache Kafka sink, and Prometheus Pushgateway sink, does not buffer, but rather writes the collected metrics directly
  • Metrics processing:
    • metrics can be aggregated into a report showing the cumulative values for each metric
    • aggregated metrics can also be returned as a Scala Map or Python dictionary
    • metrics can be converted into a Spark DataFrame for custom querying
  • Metrics data and reports can be saved for offline analysis.

FAQ:

  • Why measuring performance with workload metrics instrumentation rather than just using execution time measurements?

    • When measuring just the jobs' elapsed time, you treat your workload as "a black box" and most often this does not allow you to understand the root causes of performance regression.
      With workload metrics you can (attempt to) go further in understanding and perform root cause analysis, bottleneck identification, and resource usage measurement.
  • What are Apache Spark task metrics and what can I use them for?

    • Apache Spark measures several details of each task execution, including run time, CPU time, information on garbage collection time, shuffle metrics, and task I/O. See also Spark documentation for a description of the Spark Task Metrics
  • How is sparkMeasure different from Web UI/Spark History Server and EventLog?

    • sparkMeasure uses the same ListenerBus infrastructure used to collect data for the Web UI and Spark EventLog.
      • Spark collects metrics and other execution details and exposes them via the Web UI.
      • Notably, Task execution metrics are also available through the REST API
      • In addition, Spark writes all details of the task execution in the EventLog file (see config of spark.eventlog.enabled and spark.eventLog.dir)
      • The EventLog is used by the Spark History server + other tools and programs that can read and parse the EventLog file(s) for workload analysis and performance troubleshooting, see a proof-of-concept example of reading the EventLog with Spark SQL
    • There are key differences that motivate this development:
      • sparkMeasure can collect data at the stage completion-level, which is more lightweight than measuring all the tasks, in case you only need to compute aggregated performance metrics. When needed, sparkMeasure can also collect data at the task granularity level.
      • sparkMeasure has an API that makes it simple to add instrumentation/performance measurements in notebooks and in application code for Scala, Java, and Python.
      • sparkMeasure collects data in a flat structure, which makes it natural to use Spark SQL for workload data analysis/
      • sparkMeasure can sink metrics data into external systems (Filesystem, InfluxDB, Apache Kafka, Prometheus Pushgateway)
  • What are known limitations and gotchas?

    • sparkMeasure does not collect all the data available in the EventLog
    • See also the TODO and issues doc
    • The currently available Spark task metrics can give you precious quantitative information on resources used by the executors, however there do not allow to fully perform time-based analysis of the workload performance, notably they do not expose the time spent doing I/O or network traffic.
    • Metrics are collected on the driver, which could become a bottleneck. This is an issues affecting tools based on Spark ListenerBus instrumentation, such as the Spark WebUI. In addition, note that sparkMeasure in the current version buffers all data in the driver memory. The notable exception is when using the Flight recorder mode with InfluxDB or Apache Kafka or Prometheus Pushgateway sink, in this case metrics are directly sent to InfluxDB/Kafka/Prometheus Pushgateway.
    • Task metrics values are collected by sparkMeasure only for successfully executed tasks. Note that resources used by failed tasks are not collected in the current version. The notable exception is with the Flight recorder mode with InfluxDB or Apache Kafka or Prometheus Pushgateway sink.
    • sparkMeasure collects and processes data in order of stage and/or task completion. This means that the metrics data is not available in real-time, but rather with a delay that depends on the workload and the size of the data. Moreover, performance data of jobs executing at the same time can be mixed. This can be a noticeable issue if you run workloads with many concurrent jobs.
    • Task metrics are collected by Spark executors running on the JVM, resources utilized outside the JVM are currently not directly accounted for (notably the resources used when running Python code inside the python.daemon in the case of Python UDFs with PySpark).
  • When should I use Stage-level metrics and when should I use Task-level metrics?

    • Use stage metrics whenever possible as they are much more lightweight. Collect metrics at the task granularity if you need the extra information, for example if you want to study effects of skew, long tails and task stragglers.
  • How can I save/sink the collected metrics?

    • You can print metrics data and reports to standard output or save them to files, using a locally mounted filesystem or a Hadoop compliant filesystem (including HDFS). Additionally, you can sink metrics to external systems (such as Prometheus Pushgateway). The Flight Recorder mode can sink to InfluxDB, Apache Kafka or Prometheus Pushgateway.
  • How can I process metrics data?

    • You can use Spark to read the saved metrics data and perform further post-processing and analysis. See the also Notes on metrics analysis.
  • How can I contribute to sparkMeasure?

    • SparkMeasure has already profited from users submitting PR contributions. Additional contributions are welcome. See the TODO_and_issues list for a list of known issues and ideas on what you can contribute.

sparkmeasure's People

Contributors

alitet01 avatar arnovsky avatar brian-tecton-ai avatar cheneyyin avatar cruizen avatar dwurry avatar franciscofsoares avatar hoaihuongbk avatar jirapong avatar kudhru avatar lucacanali avatar pulakk avatar turtlemonvh avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

sparkmeasure's Issues

jupyter notebook example missing

I am wondering if you still have a sample of jupyter notebook with sparkmeasure
The following SparkMeasure_Jupyer_Python_getting_started.ipynb is not found as of today
Thanks in advance

Support for SparkR

Are there any plans to support bindings for R to measure performance metrics via sparkR?

New logo/icon proposal

Good day sir. I am a graphic designer and i am interested in designing a logo for your good project. I will be doing it as a gift for free. I just need your permission first before I begin my design. Hoping for your positive feedback. Thanks

TaskMetrics and StageMetrics does not extend a common trait

TasksMetrics and StageMetrics classes does not extend a common trait and this can cause some trouble while implementing those metrics in a generic way.

An example of a code that does not compile because of that:

val someExternalConfiguration = ExternalConfiguration.read();
val dataframe = spark.sql("SELECT * FROM SOME_WHERE")

someExternalConfiguration match {
    case "stages" => 
        val stagesMetrics = ch.cern.sparkmeasure.StageMetrics(spark)
        Writer.doWrite(dataFrame, stagesMetrics)
    case "tasks" => 
        val tasksMetrics = ch.cern.sparkmeasure.TaskMetrics(spark)
        Writer.doWrite(dataFrame, tasksMetrics)
}

object Writer {
    def doWrite(dataFrame: DataFrame, metrics: <Here should be the common trait>) {
        metrics.runAndMeasure(dataFrame.write.format("parquet").save("/tmp/any_where"))
    }
}

Uncaught throwable from user code: scala.MatchError: (elapsedTime,null) (of class scala.Tuple2) (taskmetrics.scala:206)

I am seeing an occasional exception when using the .report method on a TaskMetrics to render the metrics data as a string.

The problem seems to be associated with this line:
https://github.com/LucaCanali/sparkMeasure/blob/master/src/main/scala/ch/cern/sparkmeasure/taskmetrics.scala#L206

Here is the top of the stacktrace.

18/12/17 20:53:14 ERROR Uncaught throwable from user code: scala.MatchError: (elapsedTime,null) (of class scala.Tuple2)
	at ch.cern.sparkmeasure.TaskMetrics$$anonfun$report$1.apply(taskmetrics.scala:206)
	at ch.cern.sparkmeasure.TaskMetrics$$anonfun$report$1.apply(taskmetrics.scala:206)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
	at ch.cern.sparkmeasure.TaskMetrics.report(taskmetrics.scala:206)
	at com.ionic.helperfunctions.SparkMeasureHelpers$.save(SparkMeasure.scala:116)

It looks like this is probably likely caused by calling .report when there are no records in listenerTask.taskMetricsData (I'll try to confirm this).

If this is the case, there are a few options for fixing. The most obvious one is to change the match statement to something like:

      .map {
        case ((n: String, v: Long)) => Utils.prettyPrintValues(n, v)
        case ((n: String, null)) => n + " => null"
      }
     ).mkString("\n")

But there are, of course, other options.

printReport error on the complex query

I got an error after run following command

val stageMetrics = ch.cern.sparkmeasure.StageMetrics(spark) 

stageMetrics.begin()

spark.sql("SELECT 1=1")

stageMetrics.end()
stageMetrics.printReport()

got following error

at ch.cern.sparkmeasure.StageMetrics$$anonfun$report$1.apply(stagemetrics.scala:184)
	at ch.cern.sparkmeasure.StageMetrics$$anonfun$report$1.apply(stagemetrics.scala:184)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
	at ch.cern.sparkmeasure.StageMetrics.report(stagemetrics.scala:184)
	at ch.cern.sparkmeasure.StageMetrics.printReport(stagemetrics.scala:193)
	at linea8b963cf2ca141a2ac1e991add9914c629.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-652119476452024:3)
	at linea8b963cf2ca141a2ac1e991add9914c629.$read$$iw$$iw$$iw$$iw$$iw.<init>(command-652119476452024:48)
	at linea8b963cf2ca141a2ac1e991add9914c629.$read$$iw$$iw$$iw$$iw.<init>(command-652119476452024:50)
	at linea8b963cf2ca141a2ac1e991add9914c629.$read$$iw$$iw$$iw.<init>(command-652119476452024:52)
	at linea8b963cf2ca141a2ac1e991add9914c629.$read$$iw$$iw.<init>(command-652119476452024:54)
	at linea8b963cf2ca141a2ac1e991add9914c629.$read$$iw.<init>(command-652119476452024:56)
	at linea8b963cf2ca141a2ac1e991add9914c629.$read.<init>(command-652119476452024:58)
	at linea8b963cf2ca141a2ac1e991add9914c629.$read$.<init>(command-652119476452024:62)
	at linea8b963cf2ca141a2ac1e991add9914c629.$read$.<clinit>(command-652119476452024)
	at linea8b963cf2ca141a2ac1e991add9914c629.$eval$.$print$lzycompute(<notebook>:7)
	at linea8b963cf2ca141a2ac1e991add9914c629.$eval$.$print(<notebook>:6)

Better to have the IO metrics for non-hdfs type such as S3 Storage

We are using S3 compatible Object Storage for Spark Storage but the current default IO metrics support hdfs only.

SELECT non_negative_derivative("value", 1s) FROM "filesystem.hdfs.read_bytes" WHERE "applicationid" = '$ApplicationId' AND $timeFilter GROUP BY process

Is there anyway can able to fetch other distribute file system IO metrics ?

Thanks !

Flight Recorder Mode when Driver crashed from OOM

Hi,

I am wondering if you have any workaround or recommendation for using flight recorder mode when the driver can crashed from OOM. When the driver crashed from OOM, the Listener would never received onApplicationEnd and no metrics would be written to the sink. Ideally, we would still want to know all the metrics of the jobs accumulated right before it crashed.

spark 3.2.3 problems

Hi. I'm trying use this library.
Samples of code:

build.sbt

val hdpMinorVersion = "3.1.0.0-78"
val hadoopVersion = "3.1.1" + "." + hdpMinorVersion
val sparkVersion = "2.3.2" + "." + hdpMinorVersion


lazy val localResolvers = Seq(
  "mvnrepository" at "https://mvnrepository.com/artifact/",
  "Hortonworks HDP" at "http://repo.hortonworks.com/content/repositories/releases/",
  "Hortonworks Other Dependencies" at "http://repo.hortonworks.com/content/groups/public"
)

val projectResolvers: Seq[Resolver] = Seq(Resolver.defaultLocal, Resolver.mavenLocal) ++ localResolvers

resolvers := projectResolvers

lazy val sparkDependencies = Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion % Provided,
  "org.apache.spark" %% "spark-sql" % sparkVersion % Provided
)

lazy val hdpDependencies = Seq(
  "com.hortonworks.hive" %% "hive-warehouse-connector" % "1.0.0.3.0.1.0-187" % Provided intransitive()
)

lazy val staticAnalyzer = Seq(
  compilerPlugin(dependency = "org.wartremover" %% "wartremover" % "2.3.4")
)

libraryDependencies ++= sparkDependencies ++ hdpDependencies ++ staticAnalyzer ++ Seq(
  "io.monix" %% "monix" % "2.3.3",
  "org.typelevel" %% "cats-core" % "0.9.0",
  //  "io.monix" %% "monix-eval" % "2.3.3",
  "ch.cern.sparkmeasure" %% "spark-measure" % "0.13",
  "io.monix" %% "monix-cats" % "2.3.3",
  "org.scalatest" %% "scalatest" % "2.2.6" % "test"
)

spark-shell

scala> val taskMetrics = ch.cern.sparkmeasure.TaskMetrics(spark)
taskMetrics: ch.cern.sparkmeasure.TaskMetrics = TaskMetrics(org.apache.spark.sql.SparkSession@69c0bae6,false)

scala> taskMetrics.runAndMeasure
   def runAndMeasure[T](f: => T): T

scala> taskMetrics.runAndMeasure(spark.sql("select * from test.test"))
Hive Session ID = 04e7280b-0a45-4fad-867f-f1447faf6bf4
Time taken: 4895 ms
19/02/20 09:01:58 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
19/02/20 09:01:58 WARN TaskMetrics: Stage metrics data refreshed into temp view PerfTaskMetrics
scala.MatchError: (elapsedTime,null) (of class scala.Tuple2)                    
  at ch.cern.sparkmeasure.TaskMetrics$$anonfun$report$1.apply(taskmetrics.scala:206)
  at ch.cern.sparkmeasure.TaskMetrics$$anonfun$report$1.apply(taskmetrics.scala:206)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
  at ch.cern.sparkmeasure.TaskMetrics.report(taskmetrics.scala:206)
  at ch.cern.sparkmeasure.TaskMetrics.printReport(taskmetrics.scala:215)
  at ch.cern.sparkmeasure.TaskMetrics.runAndMeasure(taskmetrics.scala:282)
  ... 49 elided

What I'm doing wrong?

jobId field only comes as "0" or "1" in stage and task Metrics.

Hi,

I am evaluating sparkMeasure for my use case but I always gets jobID filed as "0" or a series of 1. Is this expected behavior or I am missing something here?

Below are the outputs of a spark2-submit pi job with sparkMeasure and from a spark-shell job resp.

+-----+--------+-------+--------------------+
|jobId|jobGroup|stageId| name|
+-----+--------+-------+--------------------+
| 0| null| 0|reduce at SparkPi...|
+-----+--------+-------+--------------------+

+-----+--------+
|jobId|jobGroup|
+-----+--------+
| 0| null|
| 1| null|
| 1| null|
| 1| null|
| 1| null|
| 1| null|
| 1| null|

Also, how can we figure out to which task a particular metric belongs to?

Thanks
Amit

Why sum(bytesWritten) => 0 (0 Bytes) always 0 ?

I used Spark to read local Parquet file and store it under different name.
I used file:// as i accessed file locally.

sum(bytesWritten) => 0 (0 Bytes) was reported 0. Why is this?

Thanks

switch to slf4j for logging

SLF4J is a nice logging abstraction that allows developers include logging but lets users choose their preferred logging framework.
Could you switch to having a compile dependency on slf4j-api jar and a test dependency on log4j and the log4j-slf4j-impl?

Send to Prometheus not available from Python

It is not possible to call:

  def sendReportPrometheus(serverIPnPort: String,
                 metricsJob: String,
                 labelName: String = sparkSession.sparkContext.appName,
                 labelValue: String = sparkSession.sparkContext.applicationId): Unit

From Python.

Question: InfluxDB 2.x Support

Is there support for InfluxDB 2.0 using Flight Recorder mode? The current credentials seem to only support username/password
authentication versus token/org credentials needed for InfluxDB 2.0

 val url = Utils.parseInfluxDBURL(conf, logger)
  val (username, password) = Utils.parseInfluxDBCredentials(conf, logger)

  // Tries to connect to InfluxDB, using the given URL and credentials
  val influxDB =  username match {
    case username if username.isEmpty =>
      // no username and password, InfluxDB must be running with auth-enabled=false
      InfluxDBFactory.connect(url)
    case _ => InfluxDBFactory.connect(url, username, password)
  }

How to get the execution time for EACH operator in Spark SQL?

Execuse me, I'm having some problem when getting the execution time of each operator in Spark SQL. For example, I'm using the belowed sql to test:
"select count(*) from range(1000) cross join range(1000) cross join range(1000)"
and I found some details in the Spark Web UI like this:
4
I wish to get the execution time of every operator, but operators like "RANGE, PROJECT etc" doesn't have the corresponding metrics.
I look into the log and I find that those operator doesn't have any metrics, so maybe this is the reason.
But I want to get execution time for each operator, how can I do that?

throwing error when trying to make work locally

File "C:\Users\talathar\Miniconda3\envs\XXXXX\lib\site-packages\sparkmeasure\stagemetrics.py", line 15, in init
self.stagemetrics = self.sc._jvm.ch.cern.sparkmeasure.StageMetrics(self.sparksession._jsparkSession)
File "C:\spark-3.2.1-bin-hadoop3.2\python\lib\py4j-0.10.9.3-src.zip\py4j\java_gateway.py", line 1586, in call
File "C:\spark-3.2.1-bin-hadoop3.2\python\pyspark\sql\utils.py", line 111, in deco
return f(*a, **kw)
File "C:\spark-3.2.1-bin-hadoop3.2\python\lib\py4j-0.10.9.3-src.zip\py4j\protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling None.ch.cern.sparkmeasure.StageMetrics.
: java.lang.NoClassDefFoundError: scala/Product$class

Dropping SparkListenerEvent because no remaining room in event queue

I launched sparkMeasure in a large job. Immediately I got:
18/02/07 08:21:56 ERROR org.apache.spark.scheduler.LiveListenerBus: Dropping SparkListenerEvent because no remaining room in event queue. This likely means one of the SparkListeners is too slow and cannot keep up with the rate at which tasks are being started by the scheduler.
18/02/07 08:21:56 WARN org.apache.spark.scheduler.LiveListenerBus: Dropped 1 SparkListenerEvents since Thu Jan 01 00:00:00 UTC 1970
18/02/07 08:22:56 WARN org.apache.spark.scheduler.LiveListenerBus: Dropped 13971 SparkListenerEvents since Wed Feb 07 08:21:56 UTC 2018
18/02/07 08:23:51 ERROR org.apache.spark.network.server.TransportRequestHandler: Error opening block StreamChunkId{streamId=1999850815777, chunkIndex=0} for request from /10.205.151.192:37514 org.apache.spark.storage.BlockNotFoundException: Block broadcast_32_piece0 not found

The job does continue, but it seems to be overloading the listenerbus.
I'll try --conf spark.scheduler.listenerbus.eventqueue.size=100000.

Did you already encounter this somewhere?

bytesRead not populated in some cases

I'm hoping to use sparkmeasure to compare performance of reading data from S3 vs reading from an optimised database.

CSV on S3 works as expected:

from sparkmeasure import StageMetrics
stagemetrics = StageMetrics(spark)

stagemetrics.begin()
spark.read.csv(CUSTOMERS_S3_URL, header=False, inferSchema=True).filter("_c3 = 'FEMALE'").show()
stagemetrics.end()
# stagemetrics.print_report()

metrics = stagemetrics.aggregate_stagemetrics()
print(f"""
{metrics['recordsRead'] = }
{metrics['bytesRead'] = }
""")

This reports:

metrics['recordsRead'] = 19022
metrics['bytesRead'] = 1107894

The Database query:

from sparkmeasure import StageMetrics
stagemetrics = StageMetrics(spark)

stagemetrics.begin()
spark.sql(f"""
    SELECT * 
    FROM ndb.`{DATABASE_NAME}`.`{DATABASE_SCHEMA}`.`{CUSTOMERS_TABLENAME}`
    WHERE cust_gender = 'FEMALE'
""").show()
stagemetrics.end()
# stagemetrics.print_report()

metrics = stagemetrics.aggregate_stagemetrics()
print(f"""
{metrics['recordsRead'] = }
{metrics['bytesRead'] = }
""")

For some reason this is reporting zero bytes read.

metrics['recordsRead'] = 9334
metrics['bytesRead'] = 0

Should I be looking into the DB plugin code to debug this? https://github.com/vast-data/vast-db-connectors

Error creating a TaskMetrics dataframe

Hello!

I'm trying to create a dataframe to save my task metrics in Hive. However, when I call the create_taskmetrics_DF method, as below, the error occurs on the print screen.

tm = taskmetrics.create_taskmetrics_DF("PerfTaskMetrics")

image

It looks like an error in the class, looking only at the error message.

testSparkMeasureScala sbt problem

Hi,

While trying to run the following example code:
https://github.com/LucaCanali/sparkMeasure/tree/master/examples/testSparkMeasureScala

I get the following error when running sbt package.

What am I doing wrong?

C:\repos\sparkMeasure\examples\testSparkMeasureScala> sbt package [info] welcome to sbt 1.7.1 (Oracle Corporation Java 19.0.1) [info] loading global plugins from C:\Users\DanielAronovich\.sbt\1.0\plugins [info] loading project definition from C:\repos\sparkMeasure\examples\testSparkMeasureScala\project [info] loading settings for project testsparkmeasurescala from build.sbt ... [info] set current project to testSparkMeasureScala (in build file:/C:/repos/sparkMeasure/examples/testSparkMeasureScala/) [warn] [warn] Note: Unresolved dependencies path: [error] sbt.librarymanagement.ResolveException: Error downloading ch.cern.sparkmeasure:spark-measure_2.12:0.21 [error] Not found [error] Not found [error] not found: C:\Users\DanielAronovich\.ivy2\localch.cern.sparkmeasure\spark-measure_2.12\0.21\ivys\ivy.xml [error] not found: https://repo1.maven.org/maven2/ch/cern/sparkmeasure/spark-measure_2.12/0.21/spark-measure_2.12-0.21.pom [error] at lmcoursier.CoursierDependencyResolution.unresolvedWarningOrThrow(CoursierDependencyResolution.scala:345) [error] at lmcoursier.CoursierDependencyResolution.$anonfun$update$38(CoursierDependencyResolution.scala:314) [error] at scala.util.Either$LeftProjection.map(Either.scala:573) [error] at lmcoursier.CoursierDependencyResolution.update(CoursierDependencyResolution.scala:314) [error] at sbt.librarymanagement.DependencyResolution.update(DependencyResolution.scala:60) [error] at sbt.internal.LibraryManagement$.resolve$1(LibraryManagement.scala:59) [error] at sbt.internal.LibraryManagement$.$anonfun$cachedUpdate$12(LibraryManagement.scala:133) [error] at sbt.util.Tracked$.$anonfun$lastOutput$1(Tracked.scala:73) [error] at sbt.internal.LibraryManagement$.$anonfun$cachedUpdate$20(LibraryManagement.scala:146) [error] at scala.util.control.Exception$Catch.apply(Exception.scala:228) [error] at sbt.internal.LibraryManagement$.$anonfun$cachedUpdate$11(LibraryManagement.scala:146) [error] at sbt.internal.LibraryManagement$.$anonfun$cachedUpdate$11$adapted(LibraryManagement.scala:127) [error] at sbt.util.Tracked$.$anonfun$inputChangedW$1(Tracked.scala:219) [error] at sbt.internal.LibraryManagement$.cachedUpdate(LibraryManagement.scala:160) [error] at sbt.Classpaths$.$anonfun$updateTask0$1(Defaults.scala:3688) [error] at scala.Function1.$anonfun$compose$1(Function1.scala:49) [error] at sbt.internal.util.$tilde$greater.$anonfun$$u2219$1(TypeFunctions.scala:62) [error] at sbt.std.Transform$$anon$4.work(Transform.scala:68) [error] at sbt.Execute.$anonfun$submit$2(Execute.scala:282) [error] at sbt.internal.util.ErrorHandling$.wideConvert(ErrorHandling.scala:23) [error] at sbt.Execute.work(Execute.scala:291) [error] at sbt.Execute.$anonfun$submit$1(Execute.scala:282) [error] at sbt.ConcurrentRestrictions$$anon$4.$anonfun$submitValid$1(ConcurrentRestrictions.scala:265) [error] at sbt.CompletionService$$anon$2.call(CompletionService.scala:64) [error] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) [error] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:577) [error] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) [error] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) [error] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) [error] at java.base/java.lang.Thread.run(Thread.java:1589) [error] (update) sbt.librarymanagement.ResolveException: Error downloading ch.cern.sparkmeasure:spark-measure_2.12:0.21 [error] Not found [error] Not found [error] not found: C:\Users\DanielAronovich\.ivy2\localch.cern.sparkmeasure\spark-measure_2.12\0.21\ivys\ivy.xml [error] not found: https://repo1.maven.org/maven2/ch/cern/sparkmeasure/spark-measure_2.12/0.21/spark-measure_2.12-0.21.pom [error] Total time: 4 s, completed Nov 15, 2022, 3:11:11 PM

NoSuchMethodError (ScalaObjectMapper)

Issue

When trying to use the Flight Recorder functionality, I get the following error:

java.lang.NoSuchMethodError: com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper.$init$(Lcom/fasterxml/jackson/module/scala/experimental/ScalaObjectMapper;)V
        at ch.cern.sparkmeasure.IOUtils$$anon$1.<init>(ioutils.scala:22)
        at ch.cern.sparkmeasure.IOUtils$.<init>(ioutils.scala:22)
        at ch.cern.sparkmeasure.IOUtils$.<clinit>(ioutils.scala)
        at ch.cern.sparkmeasure.FlightRecorderStageMetrics.onApplicationEnd(flightrecorder.scala:56)
        at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:57)
        at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
        at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
        at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
        at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
        at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
        at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
        at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
        at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
        at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1404)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)

Reproduce

To reproduce, checkout the latest version of the code and run:

spark-submit --master local[*] --jars ./target/scala-2.12/spark-measure_2.12-0.18-SNAPSHOT.jar --conf spark.extraListeners=ch.cern.sparkmeasure.FlightRecorderStageMetrics ~/pyspark_helloworld.py

where pyspark_helloworld.py is just:

import pyspark
sc = pyspark.SparkContext('local[*]')

txt = sc.textFile('file:////usr/share/doc/python-2.7.5/README')
print(txt.count())

python_lines = txt.filter(lambda line: 'python' in line.lower())
print(python_lines.count())

I am using the following versions:

  • Scala 2.12
  • PySpark 3.2.0
  • Python 3.7.11

Fix

Bumping the version of jackson-module-scala to 2.13.0 and rebuilding seems to fix the problem.

Notable difference to REST API

Hi Luca and community,

first of all - thanks for the great work - very much appreciate it!

This is not a real issue but rather an user question asking for clarification. I've been wondering if sparkMeasure provides any additional metrics than the default REST API?

I would like to collect spark job metrics while keeping any dependencies as minimal as possible. Using the default REST API to collect the metrics seems simple without needing to rely on an additional package. Of course sparkMeasure provides additional abstractions to aggregate on stage/task level and to compute many relevant metrics. That is of great use. We are likely to be interested in only a few core metrics and we don't need all of them.

Memory usage

Can we get memory usage of a spark-submit? stagemetrics.print_report() don't have that.

use json for the serialized data

I think this would be easier for users to check the serialized metrics if it was saved in a text format like JSON. I understand that the processing is done by code but it useful if the formats are human readable.

taskVals.toDF java.lang.ClassCastException: ch.cern.sparkmeasure.StageVals incompatible with ch.cern.sparkmeasure.TaskVals

I'm trying to use the deserializer. However the toDF that follows is throwing an exception:

Details:
val taskVals = ch.cern.sparkmeasure.Utils.readSerializedTaskMetrics("/tmp/stageMetrics.serialized")
scala> val taskVals = ch.cern.sparkmeasure.Utils.readSerializedTaskMetrics("/tmp/stageMetrics.serialized")
taskVals: scala.collection.mutable.ListBuffer[ch.cern.sparkmeasure.TaskVals] = ListBuffer(StageVals(0,0,parquet at NativeMethodAccessorImpl.java:0,1531905390534,1531905392912,2378,1,1878,123,368,259,5,78,4802,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0), StageVals(1,1,count at NativeMethodAccessorImpl.java:0,1531905395225,1531905405201,9976,1003,1028843,104666,103972,21334,218,35221,1714153,144,0,0,0,100818815,11697322,0,0,0,0,0,0,0,1123,56844,1003), StageVals(1,2,count at NativeMethodAccessorImpl.java:0,1531905405221,1531905405591,370,1,336,176,26,19,1,0,2382,2,0,0,0,0,0,0,0,55,56844,1003,28,975,0,0,0), StageVals(2,3,collect at /root/SparCle/workload/sqlquery/data-layout-read.py:20,1531905405925,1531905413411,7486,1003,813482,25942,6350,1765,58,9238,1707192,144,0,0,0,100818815,156304569,0,0,0,0,0,...
scala> val taskMetricsDF = taskVals.toDF()
java.lang.RuntimeException: Error while encoding: java.lang.ClassCastException: ch.cern.sparkmeasure.StageVals incompatible with ch.cern.sparkmeasure.TaskVals
assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).jobId AS jobId#444
assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).stageId AS stageId#445
assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).index AS index#446L
assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).launchTime AS launchTime#447L
assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).finishTime AS finishTime#448L
assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).duration AS duration#449L
assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).schedulerDelay AS schedulerDelay#450L
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).executorId, true) AS executorId#451
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).host, true) AS host#452
assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).taskLocality AS taskLocality#453
assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).speculative AS speculative#454
assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).gettingResultTime AS gettingResultTime#455L
assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).successful AS successful#456
assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).executorRunTime AS executorRunTime#457L
assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).executorCpuTime AS executorCpuTime#458L
assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).executorDeserializeTime AS executorDeserializeTime#459L
assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).executorDeserializeCpuTime AS executorDeserializeCpuTime#460L
assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).resultSerializationTime AS resultSerializationTime#461L
assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).jvmGCTime AS jvmGCTime#462L
assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).resultSize AS resultSize#463L
assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).numUpdatedBlockStatuses AS numUpdatedBlockStatuses#464
assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).diskBytesSpilled AS diskBytesSpilled#465L
assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).memoryBytesSpilled AS memoryBytesSpilled#466L
assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).peakExecutionMemory AS peakExecutionMemory#467L
assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).recordsRead AS recordsRead#468L
assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).bytesRead AS bytesRead#469L
assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).recordsWritten AS recordsWritten#470L
assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).bytesWritten AS bytesWritten#471L
assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).shuffleFetchWaitTime AS shuffleFetchWaitTime#472L
assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).shuffleTotalBytesRead AS shuffleTotalBytesRead#473L
assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).shuffleTotalBlocksFetched AS shuffleTotalBlocksFetched#474L
assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).shuffleLocalBlocksFetched AS shuffleLocalBlocksFetched#475L
assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).shuffleRemoteBlocksFetched AS shuffleRemoteBlocksFetched#476L
assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).shuffleWriteTime AS shuffleWriteTime#477L
assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).shuffleBytesWritten AS shuffleBytesWritten#478L
assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).shuffleRecordsWritten AS shuffleRecordsWritten#479L
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:290)
at org.apache.spark.sql.SparkSession$$anonfun$2.apply(SparkSession.scala:464)
at org.apache.spark.sql.SparkSession$$anonfun$2.apply(SparkSession.scala:464)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.SparkSession.createDataset(SparkSession.scala:464)
at org.apache.spark.sql.SQLContext.createDataset(SQLContext.scala:377)
at org.apache.spark.sql.SQLImplicits.localSeqToDatasetHolder(SQLImplicits.scala:213)
... 48 elided
Caused by: java.lang.ClassCastException: ch.cern.sparkmeasure.StageVals incompatible with ch.cern.sparkmeasure.TaskVals
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:287)
... 60 more

scala>

Peak Memory usage - PySpark 3 on Azure Synapse

Found an odd issue. We recently started running our jobs through Azure Synapse. While using Azure HDI, we were able to record the peakExecutionMemory, but for some reason with Azure Synapse, all the values are 0.

We are using TaskMetrics to get the most information out of the run and within the csv generated, other columns are populated except for the peakExecutionMemory which are all 0.

Is this a known issue?

We are running with python 3.7, pyspark 3.2.1, and scala 2.12 and using the spark-measure_2.12:0.18.jar

Guide for metrics interpretation

Hi there,

thanks for all the resources you've provided to get a better understanding of spark metrics. Especially, I found your blog post very useful.

When using sparkMeasure, I still found it hard to come up with a proper interpretation for all the resulting task metrics. There are some easy once like CPU utilization (as described in your blog post). However, many others do not seem so trivial. For example:

  • How to interpret sum(memoryBytesSpilled) and sum(diskBytesSpilled)? From my understanding, spilling occurs if data size exceeds available memory of an executor. memoryBytesSpilled refers to the deserialized size in memory whereas diskBytesSpilled refers to serialized size on disk for the data that is required to be spilled (see here and here). Temporarily storing data on disk is a performance penalty and should be avoided.
  • How to interpret shuffleBytesWritten? Does this encompass the entire data which was required to be shuffled between all executors?
  • How to interpret bytesread and byteswritten? Does this relate to the total amount of bytes read as input and bytes written as output (also from multiple sources/targets like HDFS, Kudu, Cassandra within the same job)?

What I'm basically aiming at is a beginners tutorial to guide new users through all the metrics with an example of what they mean and how they might be relevant for performance (e.g. spilling = bad).

For now, it would be great if you could help me better understand the 3 questions raised above. In future terms, it would be awesome to start a small guide on how to interpret spark metrics for the entire spark community (I would be in for that). Perhaps there is already an existing one but I couldn't find anything appropriate neither in the official docs nor on some personal projects.

TypeError: 'JavaPackage' object is not callable

self.stagemetrics = self.sc._jvm.ch.cern.sparkmeasure.StageMetrics(self.sparksession._jsparkSession) TypeError: 'JavaPackage' object is not callable issue is hit when I spark-submit the .py script to emr.

Getting Spark Task related metrics using Prometheus Push Gateway

Hi,

I am able to see the stage related metrics exposed to prometheus using push gateway sink. But, I am not seeing any Task related metrics.

  1. Is Task related metrics are supported by prometheus push gateway sink ?
  2. If supported, is there a separate configuration to expose Task metrics ?

enable travis-ci

@LucaCanali would it be possible for you to login to Travis CI with your github credentials and enable Travis builds for sparkMeasure?

You could then add these badges at the top of your README.md.
These are copied from https://github.com/swagger-akka-http/swagger-akka-http/blob/master/README.md

[![Build Status](https://travis-ci.org/LucaCanali/sparkMeasure.svg?branch=master)](https://travis-ci.org/LucaCanali/sparkMeasure)
[![Maven Central](https://maven-badges.herokuapp.com/maven-central/LucaCanali/sparkMeasure/badge.svg)](https://maven-badges.herokuapp.com/maven-central/LucaCanali/sparkMeasure)

How to find time spent in I/O for a task

Thanks for this work @LucaCanali ! I had one question about I/O metrics, I know you have mentioned in the limitations sections of the README that Spark does not expose I/O and network related metrics. However I was wondering if there was any way to deduce approximately the time spent in I/O for a job given current metrics? For instance, what does the different between ExecutorRunTime and ExecutorCpuTime entail?

Issue when using sbt dependency - retrofit not resolved

I get following error as I try to import sparkMeasure to my project:

[SUCCESSFUL ] ch.cern.sparkmeasure#spark-measure_2.12;0.22!spark-measure_2.12.jar (190ms)
[warn] 	::::::::::::::::::::::::::::::::::::::::::::::
[warn] 	::          UNRESOLVED DEPENDENCIES         ::
[warn] 	::::::::::::::::::::::::::::::::::::::::::::::
[warn] 	:: com.squareup.retrofit2#retrofit;2.4.0: Resolution failed several times for dependency: com.squareup.retrofit2#retrofit;2.4.0 {compile=[compile(*), master(*)], runtime=[runtime(*)]}::
[warn] 	com.squareup.retrofit2#retrofit;2.4.0!retrofit.pom(pom.original) origin location must be absolute: file:/Users/anirudh.vyas/.m2/repository/com/squareup/retrofit2/retrofit/2.4.0/retrofit-2.4.0.pom
[warn] 	com.squareup.retrofit2#retrofit;2.4.0!retrofit.pom(pom.original) origin location must be absolute: file:/Users/anirudh.vyas/.m2/repository/com/squareup/retrofit2/retrofit/2.4.0/retrofit-2.4.0.pom
[warn] 	com.squareup.retrofit2#retrofit;2.4.0!retrofit.pom(pom.original) origin location must be absolute: file:/Users/anirudh.vyas/.m2/repository/com/squareup/retrofit2/retrofit/2.4.0/retrofit-2.4.0.pom
[warn] 	com.squareup.retrofit2#retrofit;2.4.0!retrofit.pom(pom.original) origin location must be absolute: file:/Users/anirudh.vyas/.m2/repository/com/squareup/retrofit2/retrofit/2.4.0/retrofit-2.4.0.pom
[warn] 	:: com.squareup.retrofit2#converter-moshi;2.4.0: Resolution failed several times for dependency: com.squareup.retrofit2#converter-moshi;2.4.0 {compile=[compile(*), master(*)], runtime=[runtime(*)]}::
[warn] 	Resolution failed several times for dependency: com.squareup.retrofit2#retrofit-converters;2.4.0 {}::
[warn] 	com.squareup.retrofit2#retrofit-converters;2.4.0!retrofit-converters.pom(pom.original) origin location must be absolute: file:/Users/anirudh.vyas/.m2/repository/com/squareup/retrofit2/retrofit-converters/2.4.0/retrofit-converters-2.4.0.pom
[warn] 	com.squareup.retrofit2#retrofit-converters;2.4.0!retrofit-converters.pom(pom.original) origin location must be absolute: file:/Users/anirudh.vyas/.m2/repository/com/squareup/retrofit2/retrofit-converters/2.4.0/retrofit-converters-2.4.0.pom
[warn] 	com.squareup.retrofit2#retrofit-converters;2.4.0!retrofit-converters.pom(pom.original) origin location must be absolute: file:/Users/anirudh.vyas/.m2/repository/com/squareup/retrofit2/retrofit-converters/2.4.0/retrofit-converters-2.4.0.pom
[warn] 	com.squareup.retrofit2#retrofit-converters;2.4.0!retrofit-converters.pom(pom.original) origin location must be absolute: file:/Users/anirudh.vyas/.m2/repository/com/squareup/retrofit2/retrofit-converters/2.4.0/retrofit-converters-2.4.0.pom
[warn]
[warn] 	Resolution failed several times for dependency: com.squareup.retrofit2#retrofit-converters;2.4.0 {}::
[warn] 	com.squareup.retrofit2#retrofit-converters;2.4.0!retrofit-converters.pom(pom.original) origin location must be absolute: file:/Users/anirudh.vyas/.m2/repository/com/squareup/retrofit2/retrofit-converters/2.4.0/retrofit-converters-2.4.0.pom
[warn] 	com.squareup.retrofit2#retrofit-converters;2.4.0!retrofit-converters.pom(pom.original) origin location must be absolute: file:/Users/anirudh.vyas/.m2/repository/com/squareup/retrofit2/retrofit-converters/2.4.0/retrofit-converters-2.4.0.pom
[warn] 	com.squareup.retrofit2#retrofit-converters;2.4.0!retrofit-converters.pom(pom.original) origin location must be absolute: file:/Users/anirudh.vyas/.m2/repository/com/squareup/retrofit2/retrofit-converters/2.4.0/retrofit-converters-2.4.0.pom
[warn] 	com.squareup.retrofit2#retrofit-converters;2.4.0!retrofit-converters.pom(pom.original) origin location must be absolute: file:/Users/anirudh.vyas/.m2/repository/com/squareup/retrofit2/retrofit-converters/2.4.0/retrofit-converters-2.4.0.pom
[warn]
[warn] 	Resolution failed several times for dependency: com.squareup.retrofit2#retrofit-converters;2.4.0 {}::
[warn] 	com.squareup.retrofit2#retrofit-converters;2.4.0!retrofit-converters.pom(pom.original) origin location must be absolute: file:/Users/anirudh.vyas/.m2/repository/com/squareup/retrofit2/retrofit-converters/2.4.0/retrofit-converters-2.4.0.pom
[warn] 	com.squareup.retrofit2#retrofit-converters;2.4.0!retrofit-converters.pom(pom.original) origin location must be absolute: file:/Users/anirudh.vyas/.m2/repository/com/squareup/retrofit2/retrofit-converters/2.4.0/retrofit-converters-2.4.0.pom
[warn] 	com.squareup.retrofit2#retrofit-converters;2.4.0!retrofit-converters.pom(pom.original) origin location must be absolute: file:/Users/anirudh.vyas/.m2/repository/com/squareup/retrofit2/retrofit-converters/2.4.0/retrofit-converters-2.4.0.pom
[warn] 	com.squareup.retrofit2#retrofit-converters;2.4.0!retrofit-converters.pom(pom.original) origin location must be absolute: file:/Users/anirudh.vyas/.m2/repository/com/squareup/retrofit2/retrofit-converters/2.4.0/retrofit-converters-2.4.0.pom
[warn]
[warn] 	Resolution failed several times for dependency: com.squareup.retrofit2#retrofit-converters;2.4.0 {}::
[warn] 	com.squareup.retrofit2#retrofit-converters;2.4.0!retrofit-converters.pom(pom.original) origin location must be absolute: file:/Users/anirudh.vyas/.m2/repository/com/squareup/retrofit2/retrofit-converters/2.4.0/retrofit-converters-2.4.0.pom
[warn] 	com.squareup.retrofit2#retrofit-converters;2.4.0!retrofit-converters.pom(pom.original) origin location must be absolute: file:/Users/anirudh.vyas/.m2/repository/com/squareup/retrofit2/retrofit-converters/2.4.0/retrofit-converters-2.4.0.pom
[warn] 	com.squareup.retrofit2#retrofit-converters;2.4.0!retrofit-converters.pom(pom.original) origin location must be absolute: file:/Users/anirudh.vyas/.m2/repository/com/squareup/retrofit2/retrofit-converters/2.4.0/retrofit-converters-2.4.0.pom
[warn] 	com.squareup.retrofit2#retrofit-converters;2.4.0!retrofit-converters.pom(pom.original) origin location must be absolute: file:/Users/anirudh.vyas/.m2/repository/com/squareup/retrofit2/retrofit-converters/2.4.0/retrofit-converters-2.4.0.pom
[warn]
[warn] 	:: com.squareup.okhttp3#okhttp;3.11.0: Resolution failed several times for dependency: com.squareup.okhttp3#okhttp;3.11.0 {compile=[compile(*), master(*)], runtime=[runtime(*)]}::
[warn] 	com.squareup.okhttp3#okhttp;3.11.0!okhttp.pom(pom.original) origin location must be absolute: file:/Users/anirudh.vyas/.m2/repository/com/squareup/okhttp3/okhttp/3.11.0/okhttp-3.11.0.pom
[warn] 	com.squareup.okhttp3#okhttp;3.11.0!okhttp.pom(pom.original) origin location must be absolute: file:/Users/anirudh.vyas/.m2/repository/com/squareup/okhttp3/okhttp/3.11.0/okhttp-3.11.0.pom
[warn] 	com.squareup.okhttp3#okhttp;3.11.0!okhttp.pom(pom.original) origin location must be absolute: file:/Users/anirudh.vyas/.m2/repository/com/squareup/okhttp3/okhttp/3.11.0/okhttp-3.11.0.pom
[warn] 	com.squareup.okhttp3#okhttp;3.11.0!okhttp.pom(pom.original) origin location must be absolute: file:/Users/anirudh.vyas/.m2/repository/com/squareup/okhttp3/okhttp/3.11.0/okhttp-3.11.0.pom
[warn] 	:: com.squareup.okhttp3#logging-interceptor;3.11.0: Resolution failed several times for dependency: com.squareup.okhttp3#logging-interceptor;3.11.0 {compile=[compile(*), master(*)], runtime=[runtime(*)]}::
[warn] 	com.squareup.okhttp3#logging-interceptor;3.11.0!logging-interceptor.pom(pom.original) origin location must be absolute: file:/Users/anirudh.vyas/.m2/repository/com/squareup/okhttp3/logging-interceptor/3.11.0/logging-interceptor-3.11.0.pom
[warn] 	com.squareup.okhttp3#logging-interceptor;3.11.0!logging-interceptor.pom(pom.original) origin location must be absolute: file:/Users/anirudh.vyas/.m2/repository/com/squareup/okhttp3/logging-interceptor/3.11.0/logging-interceptor-3.11.0.pom
[warn] 	com.squareup.okhttp3#logging-interceptor;3.11.0!logging-interceptor.pom(pom.original) origin location must be absolute: file:/Users/anirudh.vyas/.m2/repository/com/squareup/okhttp3/logging-interceptor/3.11.0/logging-interceptor-3.11.0.pom
[warn] 	com.squareup.okhttp3#logging-interceptor;3.11.0!logging-interceptor.pom(pom.original) origin location must be absolute: file:/Users/anirudh.vyas/.m2/repository/com/squareup/okhttp3/logging-interceptor/3.11.0/logging-interceptor-3.11.0.pom
[warn] 	::::::::::::::::::::::::::::::::::::::::::::::
[warn]
[warn] 	Note: Unresolved dependencies path:
[warn] 		com.squareup.retrofit2:retrofit:2.4.0
[warn] 		  +- org.influxdb:influxdb-java:2.14
[warn] 		  +- ch.cern.sparkmeasure:spark-measure_2.12:0.22 (Defaults.scala#L4318)
[warn] 		  +- com.workday.perftool:spark-commons_2.12:0.0.34-SNAPSHOT
[warn] 		com.squareup.retrofit2:converter-moshi:2.4.0
[warn] 		  +- org.influxdb:influxdb-java:2.14
[warn] 		  +- ch.cern.sparkmeasure:spark-measure_2.12:0.22 (Defaults.scala#L4318)
[warn] 		  +- com.workday.perftool:spark-commons_2.12:0.0.34-SNAPSHOT
[warn] 		com.squareup.okhttp3:okhttp:3.11.0
[warn] 		  +- org.influxdb:influxdb-java:2.14
[warn] 		  +- ch.cern.sparkmeasure:spark-measure_2.12:0.22 (Defaults.scala#L4318)
[warn] 		  +- com.workday.perftool:spark-commons_2.12:0.0.34-SNAPSHOT
[warn] 		com.squareup.okhttp3:logging-interceptor:3.11.0
[warn] 		  +- org.influxdb:influxdb-java:2.14
[warn] 		  +- ch.cern.sparkmeasure:spark-measure_2.12:0.22 (Defaults.scala#L4318)
[warn] 		  +- com.workday.perftool:spark-commons_2.12:0.0.34-SNAPSHOT

What should I do differently?

SparkMeasure isn't working with Databricks Unity Catalog and when using Spark Connect in general

I am trying to use SparkMeasure on Databricks, but unfortunately, it is not working when the Cluster is on Unity Catalog (Runtime 14.3 LTS).

When running the following code:

from sparkmeasure import StageMetrics
stagemetrics = StageMetrics(spark)

It returns this error:

[JVM_ATTRIBUTE_NOT_SUPPORTED] Attribute `sparkContext` is not supported in Spark Connect as it depends on the JVM. If you need to use this attribute, do not use Spark Connect when creating your session. Visit https://spark.apache.org/docs/latest/sql-getting-started.html#starting-point-sparksession for creating regular Spark Session in detail.
File <command-4176627973341233>, line 1
----> 1 stagemetrics = StageMetrics(spark)

I have already tried some configurations in the session, but nothing worked. When creating a cluster in the same runtime (14.3 LTS) but without Unity Catalog, the code works normally.

Is there any way to solve this? Thank you a lot!

Feature Request: Support Prometheus Gateway Sink in Flight Recorder Mode

Implement a prometheus sink in in Flight Recorder Mode that formats metrics into a Prometheus-friendly format and pushes them to a configured Prometheus Pushgateway endpoint.

sparkmeasure currently offers Prometheus sink capabilities, facilitating the monitoring of Spark performance metrics within Prometheus ecosystems. Extending this functionality to include support for Flight Recorder Mode represents a significant enhancement. This feature would enable users to push metrics directly to a Prometheus Pushgateway endpoint.

can't find spark-measure 0.21

when try to compile the testsparkmeasurescala_2.12

not found: https://repo1.maven.org/maven2/ch/cern/sparkmeasure/spark-measure_2.12/0.21/spark-measure_2.12-0.21.pom

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.