Using AWS Deequ in Python with Python-Deequ

A year ago, I wrote a (blog post)[https://www.mikulskibartosz.name/measuring-data-quality-using-aws-deequ/] about using AWS Deequ to validate a dataset at the beginning of a data pipeline. I had to write the code in Scala because it was the only supported language at the time. When I was running it in production, I had to rewrite our custom EMR-controlling script to support a Scala job. Now, I wouldn’t have to do it because Deequ is available in Python.

Let’s take a look at the Python version of the library.

Importing Deequ

First, we have to import the libraries and create a Spark session. Note that we pass Maven libraries specified by Deequ to Spark. We do it because the Python version is a wrapper around the Scala code.

1
2
3
4
5
6
7
8
9
10
11
12
13
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DoubleType, DateType

import pydeequ
from pydeequ.analyzers import *
from pydeequ.checks import *
from pydeequ.verification import *

spark = (SparkSession
    .builder
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .getOrCreate())

Using the analyzer

Before we start writing validation rules, we can automatically run analyzers to figure out what may be wrong with the dataset. To do it, we will have to load the file into a Spark Dataset:

1
2
3
4
5
6
7
8
9
10
schema = StructType([ \
    StructField("order_id", IntegerType(), True), \
    StructField("product_name", StringType(), True), \
    StructField("pieces_sold", IntegerType(), False), \
    StructField("price_per_item", DoubleType(), True), \
    StructField("order_date", DateType(), True), \
    StructField("shop_id", StringType(), False) \
  ])

df = spark.read.csv("MOCK_DATA.csv",  header=True, schema=schema)

In the next step, we specify the analyses, run them, and display the results:

1
2
3
4
5
6
7
8
9
analysisResult = AnalysisRunner(spark) \
                    .onData(df) \
                    .addAnalyzer(Size()) \
                    .addAnalyzer(Completeness("product_name")) \
                    .addAnalyzer(Completeness("pieces_sold")) \
                    .run()

analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()

In the console, we are going to see something like this:

1
2
3
4
5
6
7
+-------+------------+------------+------+
| entity|    instance|        name| value|
+-------+------------+------------+------+
|Dataset|           *|        Size|1000.0|
| Column|product_name|Completeness|   1.0|
| Column| pieces_sold|Completeness| 0.958|
+-------+------------+------------+------+

We have 1000 data rows in the file, and all of them have a not-null product_name. However, 4.2% of pieces_sold values are missing.

Running the validation

Analyzing the dataset is handy, but we use Deequ primarily to validate the data. Let’s do it now.

In the code below, I verify the following conditions:

  • there are at least 3 data rows in the dataset
  • the minimal value of price_per_item is zero
  • pieces_sold don’t have null values
  • product_name contains unique values
  • shop_id has one of the three values: shop_1, shop_2, shop_3
  • price_per_item does not have negative values (which is a duplicate of the second rule, but I wanted to show it anyway)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
check = Check(spark, CheckLevel.Warning, "Review Check")

checkResult = VerificationSuite(spark) \
    .onData(df) \
    .addCheck(
        check.hasSize(lambda x: x >= 3) \
        .hasMin("price_per_item", lambda x: x == 0) \
        .isComplete("pieces_sold")  \
        .isUnique("product_name")  \
        .isContainedIn("shop_id", ["shop_1", "shop_2", "shop_3"]) \
        .isNonNegative("price_per_item")) \
    .run()

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show()

In the case of my test data, the checks return those error messages:

1
2
3
4
5
6
7
8
9
10
+------------+-----------+------------+--------------------+-----------------+--------------------+
|       check|check_level|check_status|          constraint|constraint_status|  constraint_message|
+------------+-----------+------------+--------------------+-----------------+--------------------+
|Review Check|    Warning|     Warning|SizeConstraint(Si...|          Success|                    |
|Review Check|    Warning|     Warning|MinimumConstraint...|          Failure|Value: 6.57 does ...|
|Review Check|    Warning|     Warning|CompletenessConst...|          Failure|Value: 0.958 does...|
|Review Check|    Warning|     Warning|UniquenessConstra...|          Failure|Value: 0.673 does...|
|Review Check|    Warning|     Warning|ComplianceConstra...|          Failure|Value: 0.754 does...|
|Review Check|    Warning|     Warning|ComplianceConstra...|          Success|                    |
+------------+-----------+------------+--------------------+-----------------+--------------------+

We see which checks failed and how many rows contain an invalid value.

What can we do with invalid values?

In one of my projects, we decided that an invalid value detected by Deequ is not a big deal, but we want to know about it. Because of that, we were not stopping the pipeline, but a message was sent to a Slack channel. This didn’t work as intended. Quickly, we started ignoring the messages. That’s why, now, I suggest running such checks only for the most critical data and stopping processing (with an error) if Deequ detects a problem.

Did you enjoy reading this article?
Would you like to learn more about software craft in data engineering and MLOps?

Subscribe to the newsletter or add this blog to your RSS reader (does anyone still use them?) to get a notification when I publish a new essay!

Newsletter

Do you enjoy reading my articles?
Subscribe to the newsletter if you don't want to miss the new content, business offers, and free training materials.

Bartosz Mikulski

Bartosz Mikulski

  • Data/MLOps engineer by day
  • DevRel/copywriter by night
  • Python and data engineering trainer
  • Conference speaker
  • Contributed a chapter to the book "97 Things Every Data Engineer Should Know"
  • Twitter: @mikulskibartosz
Newsletter

Do you enjoy reading my articles?
Subscribe to the newsletter if you don't want to miss the new content, business offers, and free training materials.