Measuring data quality using AWS Deequ

Isn’t it annoying when you get informed about a data-related problem by a user? You thought that everything works flawlessly, code is perfect, data is clean, but, suddenly, someone reports a mishap. It turned out that you had overlooked something. You can fix the issue, but the old, incorrect data will stay forever. Isn’t it annoying?

I was looking for a tool for measuring data quality because I hope that it will help me detect such problems sooner. I know that it is impossible to avoid them entirely, but if I can find them before we release new code into production, we can turn those huge mistakes into mild annoyance.

I started looking for ready-to-use solutions. After a few minutes of googling, I had two ideas: I could use either Apache Griffin or AWS Deequ. I quickly ruled Apache Griffin out after looking at its incomplete and superficial documentation. I moved on and never looked back at the Apache project.

AWS Deequ seemed to be a better idea. Not only it had extensive documentation, but also I hoped that AWS offers a managed service to run those data quality checks. Maybe, I don’t need to write anything at all.

I asked AWS Support about Deequ and their recommendations. Unfortunately, the response disappointed me. There is no managed service. The best they could do was sending me a tutorial about setting up Deequ on EMR. Deequ is built on top of Apache Spark, so their suggestion makes perfect sense.

The first problem started after adding Deequ to my Apache Spark project. I have not written any code yet because I wanted to take a step-by-step approach. After all, it usually makes debugging easier.

For some reason, after adding Deequ to dependencies, reading data from Athena tables (via Hive) stopped working. I looked at the error message and saw that my code tries to call a function that does not exist. In the past, I had wasted a tremendous amount of time chasing bugs caused by incompatible versions of Java libraries, so I immediately knew that a difference between the Apache Spark versions in my project and the Deequ library cased the missing function error.

I looked at the Deequ source code and saw that the authors use Apache Spark version 2.2.2. My EMR cluster is running Spark 2.4.0. There was a pull request in the Deequ repository that added support for the recent Spark version. The authors had not merged it yet because of an on-going discussion about backward compatibility.

I wasn’t interested in backward compatibility, so I cloned the repository, applied the pull request to my version, and built the jar file on my laptop.

I overlooked one more thing. Deequ has the Apache Spark dependency in the “compile” scope. I could not overwrite the Apache Spark library available on EMR because that brakes integration with AWS services. I changed the scope to “provided” and recompiled the library.

After that change, my Apache Spark project was working correctly again, and I could start implementing data quality checks. I picked an Athena table I am familiar with and started writing rules for that dataset.

checks = Check(CheckLevel.Error, "table_name")
    .areComplete(
        Seq("column_1", "column_2", "column_3", "column_4", "column_5", "column_6", "column_7")
    )
    .containsURL("some_url_column")
    .hasUniqueness(
        Seq("column_1", "column_2", "column_3", "column_4", "column_5"),
        (fraction: Double) => fraction == 1
    )
    .isContainedIn("column_6", Array("value_1", "value_2"))
  )

val verificationResult = VerificationSuite()
    .onData(sparkDataFrame)
    .addChecks(validationChecks())
    .useSparkSession(sparkSession)
    .run()

I didn’t know what kind of results to expect, so my first idea was to write the output to S3, use AWS Lambda to read the file, look for any errors and send a message to an SQS queue (we have a Lambda function that sends all messages published to that queue to our Slack channel).

It turned out that the output contains only coarse information about the error. It shows which violated rules, but it does not include the rows that break the constraint. Because of that, my code was writing three-line long files into S3. Therefore, I decided to skip the intermediate step entirely and write messages to SQS directly from my Spark job.

if (verificationResult.status != CheckStatus.Success) {
    val resultsForAllConstraints = verificationResult.checkResults
    .flatMap { case (_, checkResult) => checkResult.constraintResults }

    val errorsToReport = resultsForAllConstraints
    .filter {
        _.status != ConstraintStatus.Success
    }

    errorsToReport
        .zipWithIndex
        .map { case (result, index) =>
            val violatedConstraint = result.constraint.toString
            val errorMessage = result.message.getOrElse(""))

            ... // here we create a JSON object that contains all of the data we want to see in the Slack channel.

            new SendMessageBatchRequestEntry(index.toString, json)
        }.foreach {
            //sending to SQS
        }
}

I still wasn’t sure what I should do about the garrulous error messages. When a rule gets violated, Deequ logs the SQL-like code that describes the constraint as the message and tells us the percentage of rows that comply with the data restriction. How can I turn that into a useful error message?

First, I decided to split my constraints into individual rules. Seven columns should not contain nulls, but I have to list them one by one to know which one fails the expectations.

checks = Check(CheckLevel.Error, "table_name")
    .isComplete("column_1")
    .isComplete("column_2")
    .isComplete("column_3")
    .isComplete("column_4")
    .isComplete("column_5")
    .isComplete("column_6")
    .isComplete("column_7")

//and all of the other checks listed one by one

The code above makes the error messages a little bit more informative even though they stay ugly and strictly technical. Perhaps, it is okay because those messages will be seen only by data engineers.

In the next step, I added a metrics repository to my Deequ code and defined anomaly detection rules. I hope that those alerts will notify me when we fail to download all of the items from the external data source.

def metricsRepository(): MetricsRepository = ???
def resultKey(): ResultKey = ???

checks
    .useRepository(metricsRepository())
    .saveOrAppendResult(resultKey())
    .addAnomalyCheck(RelativeRateOfChangeStrategy(maxRateIncrease = Some(0.1)), Size())

In summation, I think that the Deequ-based approach to data quality tracking was reasonably successful. I wonder how it is going to work in the long run. After all, we need to define the rules for a few hundred tables, react to alerts, tune the constraints to avoid false positives, update those rules when business requirements change, etc.

I am sure about one thing; this is going to be a time-consuming effort. In some cases, figuring out what values constitute correct data may take a few weeks because people who wrote the code that produces the results are working somewhere else now and have left scarce documentation.

Anyway, I firmly believe in tracking data quality because wrong data is way worse than not having data at all. We cannot be a data-driven organization if people don’t trust the data. That is why extensive testing and instrumentation are a must for every data engineering team.

Nevertheless, I am not sure about using AWS Deequ. In my opinion, the library isn’t production-ready yet. The error messages it produces are weird, it does not support the most recent Apache Spark out of the box, and it seems that I cannot define my data constraints without extending the library.

We may continue looking for other tools that track data quality and evaluate them, too, because of those concerns. On the other hand, building internal tools based on the existing Deequ code is a possibility we may consider when we take into account the need to define custom rules.

Older post

How to conditionally skip tasks in an Airflow DAG

How to use XCom and PythonSensor to skip remaining tasks in an Airflow DAG.

Newer post

Check-Engine - data quality validation for PySpark 3.0.0

A PySpark library for data quality checks and data validation.