Testing data products: BDD for data engineers

This article consists of three parts. First, I explain what BDD is and why we should test data products. In the second part, I show how to setup Behave to verify PySpark code using a trivial example. In the last part of the text, I write about a hypothetical application, enumerate the requirements, write the specification, implement the tests, and write the production code to make them pass.

What is BDD?

Behavior Driven Development is a collaborative approach to software development, which minimizes the risk that the programmers will misunderstand the business requirements by linking the human-readable requirements files with test implementations.

As a result, we have extensive documentation and automatically verifiable specifications of the system. We write the specification in a natural language because we want all stakeholders to collaborate. Understanding the specification MUST NOT require any coding skills.

We convert the specification steps into automated tests by writing Python functions. The process preserves the mapping between the scenario step and the step implementation, so when the code fails, we see which scenarios don’t work correctly.

Why do we test data products?

When we build a data lake or a data warehouse, we are building a product. The data produced by our code is the product used by other teams or directly by the users. Every ETL/ELT pipeline, every machine learning model, every Spark job, and every SQL run in Hive or Trino outputs a part of the product.

It is tempting to ignore testing and let the users find problems. It is a terrible idea. Errors in data are more severe than errors in backend services because erroneous data produced by one pipeline propagate to subsequent pipelines and spoils the results downstream. The incorrect data records accumulate over time and lead to wrong business decisions.

If the data team lets the data rot and nobody trust the numbers anymore, we should burn the data warehouse to the ground and start from scratch. How do you feel about wasting a lot of time and tons of money building a useless (or even harmful) data product? We don’t want to be the people who destroyed the company, so we should write tons of tests. Testing a data product or a data pipeline doesn’t need to differ from testing backend services. After all, we have expectations about the produced outcome, and we can automatically verify those expectations.

Using Behave to test PySpark code

Before we start a more complicated example, let’s setup Behave and see how to use it to test PySpark code. The test doesn’t verify anything useful but checks whether we have configured everything correctly.

Let’s add behave as a project dependency. I use Poetry to manage Python dependencies, so I must open the pyproject.toml file, scroll to the [tool.poetry.dev-dependencies] section, and add behave = "^1.2.6".

When we use Behave, we have to follow a naming convention. All of the specification files are in the features directory. The Python implementations of steps are in the features/steps directory.

First, we create the features/env_test.feature file with the following specification:

Feature: Show that Spark works

  Scenario: Count the number of rows in an empty Dataframe
    Given an empty dataframe
    When we count the number of rows
    Then the result should be zero

After defining a scenario, we have to create the features/environment.py file. In this file, we define the functions running before/after all tests, before/after a single scenario, or before/after every step. We’ll use the file to create and stop the Spark session:

from behave import *
from pyspark.sql import SparkSession


def before_all(context):
    context.spark = SparkSession.builder \
        .master("local\[*]") \
        .appName("behave-with-spark") \
        .getOrCreate()


def after_all(context):
    if 'spark' in context:
        context.spark.sparkContext.stop()

Now, we can define the step definitions in the features/steps/env_test.py file:

from behave import *
from pyspark.sql.types import StructType, StructField, StringType

single_string_column_schema = StructType([StructField("col1", StringType())])


@given('an empty dataframe')
def step_impl(context):
    context.df = context.spark.createDataFrame([], schema=single_string_column_schema)


@when('we count the number of rows')
def step_impl(context):
    context.count = context.df.count()


@then('the result should be zero')
def step_impl(context):
    assert context.count == 0

Testing data products using BDD

In the next example, we’ll start with a business requirement, pretend that we do example mapping, write the specification, implement the tests, and, finally, write the production code. To make it simple, however, we’ll not perform an end-to-end test.

Let’s imagine that the product owner tells us: “We have to send a newsletter. Here is a template. Give me a file with recipient addresses and all of the variables.” What do we do?

How to define a BDD scenario using example mapping

We want to define a scenario. Every scenario consists of three parts: context, action, and outcome.

scenario = context + action + outcome

It is a good practice to begin with the outcome. What do we want to achieve? We may ask the product owner whether all subscribers should receive the newsletter. “Not everyone, only the people who purchased a product,” says the product owner. Great! It is our first Rule and the first Example:

Rule 1: Every subscriber who purchased a product receives an email

Example 1: Newsletter is sent to people who purchased at least one product

At this point, we should ask for a counterexample. What happens with the subscribers who haven’t purchased anything? Do we send a generic email, or do we ignore them? In this case, we want to ignore them, so here is our second example:

Example 2: Subscribers with no purchases don't receive the newsletter

We, the programmers, instantly start thinking about the solution. We can’t help it. Fortunately, the effort isn’t wasted because we quickly discover a problem: “If we join the subscribers table with purchases, we may get duplicates when someone bought more than one product!” It is a good observation. After a brief discussion, we may produce another Rule and two Examples:

Rule 2: Use only the most recent purchase

Example 3: People who purchased more than one product receive only one message

Example 4: Newsletter mentions only the most recent purchase

Example 3 is weird. It doesn’t look like a business specification. It is more like a test written by programmers who worry about using the wrong type of join while merging the tables. Should we keep it? It depends. From a business perspective, it is useless. If we include only the most recent purchase in the email, we don’t need another requirement telling us to send only one email. On the other hand, if the programmers made a mistake in the past and subscribers received multiple messages, we may feel safer when we explicitly state such assumptions.

Turning examples into Gherkin specifications

Now, we can take all of the rules and examples and turn them into Gherkin specifications. I suggest having user personas and using their names in the scenarios because it helps make the specification shorter and consistent. In this example, I have two personas: Alice (a frequent buyer) and Bob (a subscriber who doesn’t buy anything). Let’s create a new file features/mailing.feature and write the first example:

Feature: Sending a newsletter to the customers

  # Rule: Only people who purchased a product get the newsletter (not supported in Behave 1.2.6)

  Scenario: Newsletter is sent to people who purchased at least one product
      Given Alice purchased a product
      When we generate newsletter recipients
      Then Alice is on the list

Unfortunately, Behave 1.2.6 doesn’t support the Rule statement, but we still write them as comments. In my opinion, using rules to group scenarios makes the file easier to read. After all, when we upgrade Behave to a new version, we’ll have an even better specification.

The other scenarios look like this:

Scenario: Subscribers with no purchases don't receive the newsletter
    Given Bob bought no products
    When we generate newsletter recipients
    Then Bob is not on the list

# Rule: Use only the most recent purchase

  Scenario: People who purchased more than one product receive only one message
    Given Alice purchased a product
    And Alice purchased another product
    When we generate newsletter recipients
    Then Alice is on the list only once

  Scenario: Newsletter mentions only the most recent purchase
    Given Alice purchased a product
    And Alice purchased another product
    When we generate newsletter recipients
    Then Alice gets a message about the second purchase

Have you noticed the most significant benefit of BDD? All of the requirements are easily readable. People who aren’t familiar with the application can use the specification file as documentation. Also, because we separate the requirement definitions from the implementation, the files containing the descriptions are short.

Automating the specification using Behave

Now, we have to automate the steps in the specification. Let’s create a features/steps/mailing.py file with the following content:

from behave import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType
from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window

subscribers_struct = StructType([
    StructField("subscriber_id", IntegerType()),
    StructField("subscriber_name", StringType())
])

product_struct = StructType([
    StructField("subscriber_id", IntegerType()),
    StructField("product_name", StringType()),
    StructField("purchase_timestamp", LongType()),
])

ALICE_ID = 1
BOB_ID = 2
PRODUCT_NAME = 'Product A'
DIFFERENT_PRODUCT = 'some_other produt'


def generate_recipient_list(subscribers, products):
    return None # this is the function under test

As you see, we have defined a few constants and an empty generate_recipient_list function.

Now, we can focus on defining the steps and explaining how it works in Behave. Let’s begin with the steps in the Given part of the scenarios. We’ll define two helper functions to add the subscribers and products to the context. The context is an object provided by Behave, which contains the state shared between scenario steps.

We aren’t using any Spark DataFrames yet because it is easier and faster to operate on pure Python objects:

def add_subscriber(context, subscriber_id, subscriber_name):
    if 'subscribers' in context:
        context.subscribers.append([subscriber_id, subscriber_name])
    else:
        context.subscribers = [[subscriber_id, subscriber_name]]


def add_purchase(context, subscriber_id, product_name=PRODUCT_NAME, timestamp=1):
    if 'products' in context:
        context.products.append([subscriber_id, product_name, timestamp])
    else:
        context.products = [[subscriber_id, product_name, timestamp]]


@given(u'Alice purchased a product')
def add_customer_with_product(context):
    add_subscriber(context, ALICE_ID, 'Alice')
    add_purchase(context, ALICE_ID)


@given(u'Alice purchased another product')
def add_second_product_for_alice(context):
    add_purchase(context, ALICE_ID, DIFFERENT_PRODUCT, 2)


@given(u'Bob bought no products')
def add_subscriber_without_product(context):
    add_subscriber(context, BOB_ID, 'Bob')

After preparing the scenario context, we can implement the actions.

In the implementation, we’ll execute the function under test and print the result. Behave captures and prints the standard output in case of failures. When all of the tests pass, the print statement doesn’t pollute the log. However, when anything fails, we see the content of the DataFrame logged together with the failed assertion.

@when(u'we generate newsletter recipients')
def prepare_newsletter_recipients(context):
    if 'subscribers' in context:
        subscribers = context.spark.createDataFrame(context.subscribers, schema=subscribers_struct)
    else:
        subscribers = context.spark.createDataFrame([], schema=subscribers_struct)

    if 'products' in context:
        products = context.spark.createDataFrame(context.products, schema=product_struct)
    else:
        products = context.spark.createDataFrame([], schema=product_struct)

    context.result = generate_recipient_list(subscribers, products).cache()

    context.result.show()  # !!! Behave captures stdout and prints it when a step fails

Finally, we can implement the functions verifying the outcome:

def verify_recipient_with_product_only_once(context, subscriber_name):
    df = context.result
    if subscriber_name:
        only_one_person = df.filter((df['subscriber_name'] == subscriber_name) & (df['product_name'].isNotNull()))
    else:
        only_one_person = df.filter((df['subscriber_name'].isNull()) & (df['product_name'].isNotNull()))

    assert only_one_person.count() == 1


@then(u'Alice is on the list')
def verify_alice_with_product(context):
    verify_recipient_with_product_only_once(context, 'Alice')


@then(u'Alice is on the list only once')
def verify_alice_with_product(context):
    verify_recipient_with_product_only_once(context, 'Alice')


@then(u'Alice gets a message about the second purchase')
def verify_alice_with_second_product(context):
    df = context.result
    only_one_person = df.filter((df['subscriber_name'] == 'Alice') & (df['product_name'] == DIFFERENT_PRODUCT))
    assert only_one_person.count() == 1


@then(u'Bob is not on the list')
def verify_no_bob(context):
    df = context.result
    only_bob = df.filter(df['subscriber_name'] == 'Bob')
    assert only_bob.count() == 0

Last but not least, we have to implement the function under test. In a real application, we would import it from a separate Python module. Here, we put it directly in the test file:

def generate_recipient_list(subscribers, products):
    window = Window \
        .partitionBy(col('subscriber_id')) \
        .orderBy(col("purchase_timestamp").desc())

    most_recent_purchase = products \
        .withColumn(
            'position_in_group',
            row_number().over(window)
        ) \
        .where(col('position_in_group') == 1) \
        .drop('position_in_group')

    return subscribers.join(most_recent_purchase, subscribers.subscriber_id == most_recent_purchase.subscriber_id, 'inner')

Running Behave in Poetry

When we use Poetry as the Python dependency manager, we can run the Behave tests using the command line:

# run in the parent directory of features/
poetry run behave

In the output, we see the scenario steps, the file containing the step implementation, and the test duration:

Feature: Show that Spark works # features/env.feature:1

  Scenario: Count the number of rows in an empty Dataframe  # features/env_test.feature:3
    Given an empty dataframe                                # features/steps/env_test.py:7 3.134s
    When we count the number of rows                        # features/steps/env_test.py:12 4.196s
    Then the result should be zero                          # features/steps/env_test.py:17 0.000s

Feature: Sending a newsletter to the customers # features/mailing.feature:1

  Scenario: Newsletter is sent to people who purchased at least one product  # features/mailing.feature:5
    Given Alice purchased a product                                          # features/steps/mailing.py:53 0.004s
    When we generate newsletter recipients                                   # features/steps/mailing.py:69 4.792s
    Then Alice is on the list                                                # features/steps/mailing.py:96 2.385s

  Scenario: Subscribers with no purchases don't receive the newsletter  # features/mailing.feature:10
    Given Bob bought no products                                        # features/steps/mailing.py:64 0.000s
    When we generate newsletter recipients                              # features/steps/mailing.py:69 2.056s
    Then Bob is not on the list                                         # features/steps/mailing.py:113 1.680s

  Scenario: People who purchased more than one product receive only one message  # features/mailing.feature:17
    Given Alice purchased a product                                              # features/steps/mailing.py:53 0.001s
    And Alice purchased another product                                        # features/steps/mailing.py:59 0.000s
    When we generate newsletter recipients                                       # features/steps/mailing.py:69 1.613s
    Then Alice is on the list only once                                          # features/steps/mailing.py:101 1.678s

  Scenario: Newsletter mentions only the most recent purchase  # features/mailing.feature:23
    Given Alice purchased a product                            # features/steps/mailing.py:53 0.001s
    And Alice purchased another product                      # features/steps/mailing.py:59 0.000s
    When we generate newsletter recipients                     # features/steps/mailing.py:69 1.716s
    Then Alice gets a message about the second purchase        # features/steps/mailing.py:106 1.525s

2 features passed, 0 failed, 0 skipped
5 scenarios passed, 0 failed, 0 skipped
17 steps passed, 0 failed, 0 skipped, 0 undefined
Took 0m24.782s

Is BDD sufficient to test a data product?

Of course, no. In addition to testing the code, we should also validate the input data in the pipeline, and reject the invalid values (or separate them from the correct data). It won’t hurt to be extra paranoid and also validate the output data.

BDD won’t help us with data validation, but we can use it to test the code used to validate the DataFrames.

Additionally, we should log metrics describing the datasets. The minimal set of metrics may include:

  • the number of rows
  • the number of null values in every column
  • max/min values in all columns
  • the number of duplicates (if it isn’t too expansive to calculate)

If you want to learn more about data pipeline testing, read my interview with Christopher Bergh, the author of “DataOps Cookbook”, and my article about applying functional programming and the Unix philosophy to data processing.

Older post

Definition of done for data engineers

When can data engineers be sure that they have done the task?

Newer post

How to test REST API contract using BDD

Testing a REST API using Behave in Python