How to measure Spark performance and gather metrics about written data

When we want to instrumentate Apache Spark and gather metrics about job stages, we can use the sparkmeasure library created by LucaCanali. This library calculates how many bytes we process in the instrumented job stages and how long it took to process them. Suppose we export the measurements as metrics (for example, using AWS CloudWatch). In that case, we can set up alerts and anomaly detection to notify us when a stage suddenly processes significantly more (or less) data than usual.

Adding the library to the runtime environment

First, we have to pass the library to the Spark runtime. There are two options, we can package it in the jar file with our Spark code (for example, using the sbt-spark-package plugin or the sbt-assembly plugin) or pass the package name to the spark-shell script while running the Spark job: spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.12:0.17.

Preparing the functions to gather and send metrics

To create a nice API, I suggest wrapping the library usage in a Scala class:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
case class SparkMetrics(sparkSession: SparkSession) {
  val metrics = ch.cern.sparkmeasure.StageMetrics(sparkSession)

  def withMetrics(
      hiveDatabase: String,
      hiveTable: String,
      partition: String
    )(func: (SparkSession) => Unit) {
    metrics.begin()
    
    func(sparkSession)

    metrics.end()
    metrics.createStageMetricsDF()

    sendStats(
        hiveDatabase,
        hiveTable,
        partition,
        metrics.aggregateStageMetrics().first()
    )
  }

  private def sendStats(database: String, table: String, partition: String, stageStatistics: Row) {
    val size = stageStatistics.getAs[Long]("sum(bytesWritten)")
    val records = stageStatistics.getAs[Long]("sum(recordsWritten)")

    # Here you send the statistics
    # For example, if you want to store them in CloudWatch, the code looks like this:

    val cw = AmazonCloudWatchClientBuilder.defaultClient

    val databaseDimension = new Dimension()
            .withName("database")
            .withValue(database)
    val tableDimension = new Dimension()
            .withName("table")
            .withValue(table)
    val partitionDimension = new Dimension()
            .withName("partition")
            .withValue(partition)

    val bytesDatum = new MetricDatum()
            .withMetricName("bytesWritten")
            .withUnit(StandardUnit.Bytes)
            .withValue(size)
            .withDimensions(databaseDimension, tableDimension, partitionDimension)
    val recordsDatum = new MetricDatum()
            .withMetricName("recordsWritten")
            .withUnit(StandardUnit.Count)
            .withValue(records)
            .withDimensions(databaseDimension, tableDimension, partitionDimension)

    val request = new PutMetricDataRequest()
            .withNamespace("Spark Job Name")
            .withMetricData(bytesDatum, recordsDatum)

    cw.putMetricData(request)
  }
}

To send CloudWatch metrics using the AWS Java SDK, we need the following imports:

1
2
3
4
5
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder
import com.amazonaws.services.cloudwatch.model.Dimension
import com.amazonaws.services.cloudwatch.model.MetricDatum
import com.amazonaws.services.cloudwatch.model.PutMetricDataRequest
import com.amazonaws.services.cloudwatch.model.StandardUnit

Gathering the metrics

We have done all of that preparation to get a simple API in Scala. When we want to gather metrics about some Spark operations, all we need to do is create an instance of SparkMetrics and wrap the instrumented operations in the withMetrics function:

1
2
3
SparkMetrics("hive_database", "hive_table", "partition", sparkSession).withMetrics {
    sparkSession => dataFrame.write.mode(SaveMode.Append).orc(s"s3_location")
}

Did you enjoy reading this article?
Would you like to learn more about software craft in data engineering and MLOps?

Subscribe to the newsletter or add this blog to your RSS reader (does anyone still use them?) to get a notification when I publish a new essay!

Newsletter

Do you enjoy reading my articles?
Subscribe to the newsletter if you don't want to miss the new content, business offers, and free training materials.

Bartosz Mikulski

Bartosz Mikulski

  • Data/MLOps engineer by day
  • DevRel/copywriter by night
  • Python and data engineering trainer
  • Conference speaker
  • Contributed a chapter to the book "97 Things Every Data Engineer Should Know"
  • Twitter: @mikulskibartosz
Newsletter

Do you enjoy reading my articles?
Subscribe to the newsletter if you don't want to miss the new content, business offers, and free training materials.