Check-Engine - data quality validation for PySpark 3.0.0

Last week, I was testing whether we can use AWS Deequ for data quality validation. I ran into a few problems. First of all, it was using an outdated version of Spark, so I had to clone the repository, update the dependencies, modify some code, and build my copy of the AWS Deequ jar. Second, the library does not support PySpark, and it is available only for Scala. Normally, I would not consider it a problem (quite the contrary, I enjoy writing Scala code ;) ), but my team has almost all of our code in Python.

I had to not only build the library but also configure an Airflow DAG to run a Scala program. It is easy, but not as easy as reusing the code we already created to run PySpark apps.

After doing all of that and convincing the team that AWS Deequ is good enough to use it for data validation, I started thinking about implementing AWS Deequ for PySpark. I concluded that it should not be too hard. After all, I can fulfill 99% of my team’s data validation needs by making simple value comparisons. Naturally, it is nice to have anomaly detection, but I can implement that later, or never.

The API I want to have

Before I started writing the code, I wanted to know what API I would like to use. It was useful not only to plan the work but also to decide what tests I have to write. I wanted the API to look like this:

result = ValidateSparkDataFrame(spark_session, spark_data_frame) \
        .is_not_null("column_name") \
        .are_not_null(["column_name_2", "column_name_3"]) \
        .is_min("numeric_column", 10) \
        .is_max("numeric_column", 20) \
        .is_unique("column_name") \
        .are_unique(["column_name_2", "column_name_3"]) \
        .execute()

where the result variable contains a named tuple:

class ValidationResult(NamedTuple):
    correct_data: DataFrame
    erroneous_data: DataFrame
    errors: List[ValidationError]

Of course, I need more validation rules: matching text with regex, checking whether a value is in a set of correct values, validating the distribution of data, etc. Anyway, I have to start with something smaller, so the API above is good enough.

Test-driven development

I had to start with tests not only because I think that we should write tests before the implementation, but also because I must trust that library. It would be unreasonable to have a testing library that is not adequately tested.

I started the implementation by checking whether all values in a DataFrame column are unique. I wasn’t sure how I want to structure the code, but I concluded that it is best to trust the TDD process and not worry about it.

I needed nine tests to verify the behavior of the is_unique constraint. In the end, I had nicely separated the code that prepares the DataFrame (in this case, counts the number of occurrences of particular value) and the code that verifies whether the DataFrame adheres to the expectations.

I have to admit that I wrote too much code in this step. I broke some TDD rules and wrote the code I didn’t need at that point. In fact, I generalized the code so much that adding the next validation rule was trivial. I had to copy-paste the existing method, change names, and two lambda functions.

After that, I implemented the not-null constraint and started looking for a better abstraction. I didn’t want the main class of the library to grow to a few hundred lines, so I made an abstract _Constraint class:

class _Constraint(ABC):
    def __init__(self, column_name: str):
        self.column_name = column_name
        self.constraint_column_name = _generate_constraint_column_name(self.constraint_name(), column_name)

    @abstractmethod
    def constraint_name(self):
        pass

    @abstractmethod
    def prepare_df_for_check(self, data_frame: DataFrame) -> DataFrame:
        return data_frame

    @abstractmethod
    def filter_success(self, data_frame: DataFrame) -> DataFrame:
        return data_frame

    @abstractmethod
    def filter_failure(self, data_frame: DataFrame) -> DataFrame:
        return data_frame

    def validate_self(self, data_frame: DataFrame, df_columns: List[str]) -> Tuple[bool, str]:
        return self.column_name in df_columns, f"There is no '{self.column_name}' column"

In the next step, I extracted the validation rules to separate files containing the classes that extend the _Constraint. For example, the _Unique constraint looks like this:

class _Unique(_Constraint):
    def __init__(self, column_name: str):
        super().__init__(column_name)

    def prepare_df_for_check(self, data_frame: DataFrame) -> DataFrame:
        count_repetitions: DataFrame = data_frame \
            .groupby(self.column_name) \
            .count() \
            .withColumnRenamed("count", self.constraint_column_name)

        return data_frame.join(count_repetitions, self.column_name, "left")

    def filter_success(self, data_frame: DataFrame) -> DataFrame:
        return data_frame.filter(f"{self.constraint_column_name} == 1")

    def filter_failure(self, data_frame: DataFrame) -> DataFrame:
        return data_frame.filter(f"{self.constraint_column_name} > 1")

    def constraint_name(self):
        return "unique"

At this point, adding more rules was straightforward. I was copy-pasting the test code, tweaking the test inputs and expected values, and a few lines long implementations of the actual validation rules.

Now I think that before I write another validation rule, I have to refactor the tests because of all of the copy-pasting. I am on the fence here. On the one hand, there are numerous repetitions in those tests, and I feel that I should generalize that code. On the other hand, I don’t mind repetition in the test code.

I think it is better than having the code scattered around a few files and complicating the implementation so much that you doubt whether the tests work correctly. I write tests that do not depend on the internals of the class under the test, so I hope that I will not need to change the test code unless the expected behavior of the class changes. In the case of a data validation library, it will probably never happen.

Github Actions

Finally, I had to configure continuous integration to build a Python package. I must admit that I have no idea how to do it. I did it by copy-pasting code from the documentation, looking at errors, changing some code, and pushing another commit to the repository. In total, I needed around forty commits to make it work, but I am still not satisfied with the way it works.

Of course, in the end, I squashed all of the intermediate commits and replaced them with a single one to hide the embarrassing history of my struggle with setting up Github Actions.

Check-Engine

Here it is https://github.com/mikulskibartosz/check-engine - the Check-Engine library for data validation on PySpark 3.0.0.

Of course, it is the pre-pre-alpha version. Of course, it would be ridiculous to use it in production right now. The code works correctly, and I have tests to prove that, but a validation library that implements only four rules is nearly useless.

Next steps

In the near future, I am going to implement the missing data checks, such as numeric value ranges, expected values of categorical variables, regex matching (with predefined checks for URLs, emails, personal id numbers), text length validation, etc.

After that, I will add tests that depend on multiple columns. For example, verifying whether the value in column A is greater than the corresponding value of column B.

Furthermore, I am going to implement checks for numeric value distribution within a single column (mean, median, standard deviation, quantiles). This should allow performing rudimentary anomaly detection, at least in the case of normally distributed datasets.

Long-term ideas

I am not going to call it a long-term plan because some people may assume that I promise to implement all of that. Right now, I don’t promise anything. I just think that the following features are good ideas.

In the subsequent versions, I want to implement code that generates error metrics and either sends them to CloudWatch or allow the user to provide a custom function to handle them.

After that, I want to revisit the problem of anomaly detection and make it similar to the anomaly detection feature of AWS Deequ.

In the last step, I want to prepare a single AWS Operator that encapsulates setting up a transient EMR cluster, running the PySpark code on that cluster, gathering the results, and deciding whether the DAG should fail or continue running depending on the validation result.

Older post

Measuring data quality using AWS Deequ

How to measure data quality in Athena tables using AWS Deequ running on an EMR cluster.

Newer post

How to send AWS CloudWatch Alerts to a Slack channel using Terraform

How to use Terraform to configure a CloudWatch alert and send the message to a Slack channel.