How to measure Spark performance and gather metrics about written data

When we want to instrumentate Apache Spark and gather metrics about job stages, we can use the sparkmeasure library created by LucaCanali. This library calculates how many bytes we process in the instrumented job stages and how long it took to process them. Suppose we export the measurements as metrics (for example, using AWS CloudWatch). In that case, we can set up alerts and anomaly detection to notify us when a stage suddenly processes significantly more (or less) data than usual.

Adding the library to the runtime environment

First, we have to pass the library to the Spark runtime. There are two options, we can package it in the jar file with our Spark code (for example, using the sbt-spark-package plugin or the sbt-assembly plugin) or pass the package name to the spark-shell script while running the Spark job: spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.12:0.17.


Subscribe to the newsletter and join the free email course.

Preparing the functions to gather and send metrics

To create a nice API, I suggest wrapping the library usage in a Scala class:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
case class SparkMetrics(sparkSession: SparkSession) {
  val metrics = ch.cern.sparkmeasure.StageMetrics(sparkSession)

  def withMetrics(
      hiveDatabase: String,
      hiveTable: String,
      partition: String
    )(func: (SparkSession) => Unit) {
    metrics.begin()
    
    func(sparkSession)

    metrics.end()
    metrics.createStageMetricsDF()

    sendStats(
        hiveDatabase,
        hiveTable,
        partition,
        metrics.aggregateStageMetrics().first()
    )
  }

  private def sendStats(database: String, table: String, partition: String, stageStatistics: Row) {
    val size = stageStatistics.getAs[Long]("sum(bytesWritten)")
    val records = stageStatistics.getAs[Long]("sum(recordsWritten)")

    # Here you send the statistics
    # For example, if you want to store them in CloudWatch, the code looks like this:

    val cw = AmazonCloudWatchClientBuilder.defaultClient

    val databaseDimension = new Dimension()
            .withName("database")
            .withValue(database)
    val tableDimension = new Dimension()
            .withName("table")
            .withValue(table)
    val partitionDimension = new Dimension()
            .withName("partition")
            .withValue(partition)

    val bytesDatum = new MetricDatum()
            .withMetricName("bytesWritten")
            .withUnit(StandardUnit.Bytes)
            .withValue(size)
            .withDimensions(databaseDimension, tableDimension, partitionDimension)
    val recordsDatum = new MetricDatum()
            .withMetricName("recordsWritten")
            .withUnit(StandardUnit.Count)
            .withValue(records)
            .withDimensions(databaseDimension, tableDimension, partitionDimension)

    val request = new PutMetricDataRequest()
            .withNamespace("Spark Job Name")
            .withMetricData(bytesDatum, recordsDatum)

    cw.putMetricData(request)
  }
}

To send CloudWatch metrics using the AWS Java SDK, we need the following imports:

1
2
3
4
5
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder
import com.amazonaws.services.cloudwatch.model.Dimension
import com.amazonaws.services.cloudwatch.model.MetricDatum
import com.amazonaws.services.cloudwatch.model.PutMetricDataRequest
import com.amazonaws.services.cloudwatch.model.StandardUnit

Gathering the metrics

We have done all of that preparation to get a simple API in Scala. When we want to gather metrics about some Spark operations, all we need to do is create an instance of SparkMetrics and wrap the instrumented operations in the withMetrics function:

1
2
3
SparkMetrics("hive_database", "hive_table", "partition", sparkSession).withMetrics {
    sparkSession => dataFrame.write.mode(SaveMode.Append).orc(s"s3_location")
}

Remember to share on social media!
If you like this text, please share it on Facebook/Twitter/LinkedIn/Reddit or other social media.

If you want to contact me, send me a message on LinkedIn or Twitter.

Would you like to have a call and talk? Please schedule a meeting using this link.


Bartosz Mikulski
Bartosz Mikulski * data/machine learning engineer * conference speaker * co-founder of Software Craft Poznan & Poznan Scala User Group

Subscribe to the newsletter and get access to my free email course on building trustworthy data pipelines.

Do you want to work with me at riskmethods?

REMOTE position (available in Poland or Germany)