Using AWS Deequ in Python with Python-Deequ

A year ago, I wrote a (blog post)[https://www.mikulskibartosz.name/measuring-data-quality-using-aws-deequ/] about using AWS Deequ to validate a dataset at the beginning of a data pipeline. I had to write the code in Scala because it was the only supported language at the time. When I was running it in production, I had to rewrite our custom EMR-controlling script to support a Scala job. Now, I wouldn’t have to do it because Deequ is available in Python.

Let’s take a look at the Python version of the library.

Importing Deequ

First, we have to import the libraries and create a Spark session. Note that we pass Maven libraries specified by Deequ to Spark. We do it because the Python version is a wrapper around the Scala code.

from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DoubleType, DateType

import pydeequ
from pydeequ.analyzers import *
from pydeequ.checks import *
from pydeequ.verification import *

spark = (SparkSession
    .builder
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .getOrCreate())

Using the analyzer

Before we start writing validation rules, we can automatically run analyzers to figure out what may be wrong with the dataset. To do it, we will have to load the file into a Spark Dataset:

schema = StructType([ \
    StructField("order_id", IntegerType(), True), \
    StructField("product_name", StringType(), True), \
    StructField("pieces_sold", IntegerType(), False), \
    StructField("price_per_item", DoubleType(), True), \
    StructField("order_date", DateType(), True), \
    StructField("shop_id", StringType(), False) \
  ])

df = spark.read.csv("MOCK_DATA.csv",  header=True, schema=schema)

In the next step, we specify the analyses, run them, and display the results:

analysisResult = AnalysisRunner(spark) \
                    .onData(df) \
                    .addAnalyzer(Size()) \
                    .addAnalyzer(Completeness("product_name")) \
                    .addAnalyzer(Completeness("pieces_sold")) \
                    .run()

analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()

In the console, we are going to see something like this:

+-------+------------+------------+------+
| entity|    instance|        name| value|
+-------+------------+------------+------+
|Dataset|           *|        Size|1000.0|
| Column|product_name|Completeness|   1.0|
| Column| pieces_sold|Completeness| 0.958|
+-------+------------+------------+------+

We have 1000 data rows in the file, and all of them have a not-null product_name. However, 4.2% of pieces_sold values are missing.

Running the validation

Analyzing the dataset is handy, but we use Deequ primarily to validate the data. Let’s do it now.

In the code below, I verify the following conditions:

  • there are at least 3 data rows in the dataset
  • the minimal value of price_per_item is zero
  • pieces_sold don’t have null values
  • product_name contains unique values
  • shop_id has one of the three values: shop_1, shop_2, shop_3
  • price_per_item does not have negative values (which is a duplicate of the second rule, but I wanted to show it anyway)
check = Check(spark, CheckLevel.Warning, "Review Check")

checkResult = VerificationSuite(spark) \
    .onData(df) \
    .addCheck(
        check.hasSize(lambda x: x >= 3) \
        .hasMin("price_per_item", lambda x: x == 0) \
        .isComplete("pieces_sold")  \
        .isUnique("product_name")  \
        .isContainedIn("shop_id", ["shop_1", "shop_2", "shop_3"]) \
        .isNonNegative("price_per_item")) \
    .run()

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show()

In the case of my test data, the checks return those error messages:

+------------+-----------+------------+--------------------+-----------------+--------------------+
|       check|check_level|check_status|          constraint|constraint_status|  constraint_message|
+------------+-----------+------------+--------------------+-----------------+--------------------+
|Review Check|    Warning|     Warning|SizeConstraint(Si...|          Success|                    |
|Review Check|    Warning|     Warning|MinimumConstraint...|          Failure|Value: 6.57 does ...|
|Review Check|    Warning|     Warning|CompletenessConst...|          Failure|Value: 0.958 does...|
|Review Check|    Warning|     Warning|UniquenessConstra...|          Failure|Value: 0.673 does...|
|Review Check|    Warning|     Warning|ComplianceConstra...|          Failure|Value: 0.754 does...|
|Review Check|    Warning|     Warning|ComplianceConstra...|          Success|                    |
+------------+-----------+------------+--------------------+-----------------+--------------------+

We see which checks failed and how many rows contain an invalid value.

What can we do with invalid values?

In one of my projects, we decided that an invalid value detected by Deequ is not a big deal, but we want to know about it. Because of that, we were not stopping the pipeline, but a message was sent to a Slack channel. This didn’t work as intended. Quickly, we started ignoring the messages. That’s why, now, I suggest running such checks only for the most critical data and stopping processing (with an error) if Deequ detects a problem.

Older post

Building and deploying ML models using Qwak ML platform

What is Qwak ML platform and how does it work?

Newer post

How to teach your team to write automated tests?

How to teach writing automated tests: TDD, BDD, and other techniques