Testing data products: BDD for data engineers
This article consists of three parts. First, I explain what BDD is and why we should test data products. In the second part, I show how to setup Behave to verify PySpark code using a trivial example. In the last part of the text, I write about a hypothetical application, enumerate the requirements, write the specification, implement the tests, and write the production code to make them pass.
What is BDD?
Behavior Driven Development is a collaborative approach to software development, which minimizes the risk that the programmers will misunderstand the business requirements by linking the human-readable requirements files with test implementations.
As a result, we have extensive documentation and automatically verifiable specifications of the system. We write the specification in a natural language because we want all stakeholders to collaborate. Understanding the specification MUST NOT require any coding skills.
We convert the specification steps into automated tests by writing Python functions. The process preserves the mapping between the scenario step and the step implementation, so when the code fails, we see which scenarios don’t work correctly.
Why do we test data products?
When we build a data lake or a data warehouse, we are building a product. The data produced by our code is the product used by other teams or directly by the users. Every ETL/ELT pipeline, every machine learning model, every Spark job, and every SQL run in Hive or Trino outputs a part of the product.
It is tempting to ignore testing and let the users find problems. It is a terrible idea. Errors in data are more severe than errors in backend services because erroneous data produced by one pipeline propagate to subsequent pipelines and spoils the results downstream. The incorrect data records accumulate over time and lead to wrong business decisions.
If the data team lets the data rot and nobody trust the numbers anymore, we should burn the data warehouse to the ground and start from scratch. How do you feel about wasting a lot of time and tons of money building a useless (or even harmful) data product? We don’t want to be the people who destroyed the company, so we should write tons of tests. Testing a data product or a data pipeline doesn’t need to differ from testing backend services. After all, we have expectations about the produced outcome, and we can automatically verify those expectations.
Using Behave to test PySpark code
Before we start a more complicated example, let’s setup Behave and see how to use it to test PySpark code. The test doesn’t verify anything useful but checks whether we have configured everything correctly.
behave as a project dependency. I use Poetry to manage Python dependencies, so I must open the
pyproject.toml file, scroll to the
[tool.poetry.dev-dependencies] section, and add
behave = "^1.2.6".
When we use Behave, we have to follow a naming convention. All of the specification files are in the
features directory. The Python implementations of steps are in the
First, we create the
features/env_test.feature file with the following specification:
1 2 3 4 5 6 Feature: Show that Spark works Scenario: Count the number of rows in an empty Dataframe Given an empty dataframe When we count the number of rows Then the result should be zero
After defining a scenario, we have to create the
features/environment.py file. In this file, we define the functions running before/after all tests, before/after a single scenario, or before/after every step. We’ll use the file to create and stop the Spark session:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from behave import * from pyspark.sql import SparkSession def before_all(context): context.spark = SparkSession.builder \ .master("local\[*]") \ .appName("behave-with-spark") \ .getOrCreate() def after_all(context): if 'spark' in context: context.spark.sparkContext.stop()
Now, we can define the step definitions in the
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 from behave import * from pyspark.sql.types import StructType, StructField, StringType single_string_column_schema = StructType([StructField("col1", StringType())]) @given('an empty dataframe') def step_impl(context): context.df = context.spark.createDataFrame(, schema=single_string_column_schema) @when('we count the number of rows') def step_impl(context): context.count = context.df.count() @then('the result should be zero') def step_impl(context): assert context.count == 0
Testing data products using BDD
In the next example, we’ll start with a business requirement, pretend that we do example mapping, write the specification, implement the tests, and, finally, write the production code. To make it simple, however, we’ll not perform an end-to-end test.
Let’s imagine that the product owner tells us: “We have to send a newsletter. Here is a template. Give me a file with recipient addresses and all of the variables.” What do we do?
How to define a BDD scenario using example mapping
We want to define a scenario. Every scenario consists of three parts: context, action, and outcome.
scenario = context + action + outcome
It is a good practice to begin with the outcome. What do we want to achieve? We may ask the product owner whether all subscribers should receive the newsletter. “Not everyone, only the people who purchased a product,” says the product owner. Great! It is our first Rule and the first Example:
1 2 3 Rule 1: Every subscriber who purchased a product receives an email Example 1: Newsletter is sent to people who purchased at least one product
At this point, we should ask for a counterexample. What happens with the subscribers who haven’t purchased anything? Do we send a generic email, or do we ignore them? In this case, we want to ignore them, so here is our second example:
1 Example 2: Subscribers with no purchases don't receive the newsletter
We, the programmers, instantly start thinking about the solution. We can’t help it. Fortunately, the effort isn’t wasted because we quickly discover a problem: “If we join the subscribers table with purchases, we may get duplicates when someone bought more than one product!” It is a good observation. After a brief discussion, we may produce another Rule and two Examples:
1 2 3 4 5 Rule 2: Use only the most recent purchase Example 3: People who purchased more than one product receive only one message Example 4: Newsletter mentions only the most recent purchase
Example 3 is weird. It doesn’t look like a business specification. It is more like a test written by programmers who worry about using the wrong type of join while merging the tables. Should we keep it? It depends. From a business perspective, it is useless. If we include only the most recent purchase in the email, we don’t need another requirement telling us to send only one email. On the other hand, if the programmers made a mistake in the past and subscribers received multiple messages, we may feel safer when we explicitly state such assumptions.
Turning examples into Gherkin specifications
Now, we can take all of the rules and examples and turn them into Gherkin specifications. I suggest having user personas and using their names in the scenarios because it helps make the specification shorter and consistent. In this example, I have two personas: Alice (a frequent buyer) and Bob (a subscriber who doesn’t buy anything). Let’s create a new file
features/mailing.feature and write the first example:
1 2 3 4 5 6 7 8 Feature: Sending a newsletter to the customers # Rule: Only people who purchased a product get the newsletter (not supported in Behave 1.2.6) Scenario: Newsletter is sent to people who purchased at least one product Given Alice purchased a product When we generate newsletter recipients Then Alice is on the list
Unfortunately, Behave 1.2.6 doesn’t support the Rule statement, but we still write them as comments. In my opinion, using rules to group scenarios makes the file easier to read. After all, when we upgrade Behave to a new version, we’ll have an even better specification.
The other scenarios look like this:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 Scenario: Subscribers with no purchases don't receive the newsletter Given Bob bought no products When we generate newsletter recipients Then Bob is not on the list # Rule: Use only the most recent purchase Scenario: People who purchased more than one product receive only one message Given Alice purchased a product And Alice purchased another product When we generate newsletter recipients Then Alice is on the list only once Scenario: Newsletter mentions only the most recent purchase Given Alice purchased a product And Alice purchased another product When we generate newsletter recipients Then Alice gets a message about the second purchase
Have you noticed the most significant benefit of BDD? All of the requirements are easily readable. People who aren’t familiar with the application can use the specification file as documentation. Also, because we separate the requirement definitions from the implementation, the files containing the descriptions are short.
Automating the specification using Behave
Now, we have to automate the steps in the specification. Let’s create a
features/steps/mailing.py file with the following content:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 from behave import * from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType from pyspark.sql.functions import col, row_number from pyspark.sql.window import Window subscribers_struct = StructType([ StructField("subscriber_id", IntegerType()), StructField("subscriber_name", StringType()) ]) product_struct = StructType([ StructField("subscriber_id", IntegerType()), StructField("product_name", StringType()), StructField("purchase_timestamp", LongType()), ]) ALICE_ID = 1 BOB_ID = 2 PRODUCT_NAME = 'Product A' DIFFERENT_PRODUCT = 'some_other produt' def generate_recipient_list(subscribers, products): return None # this is the function under test
As you see, we have defined a few constants and an empty
Now, we can focus on defining the steps and explaining how it works in Behave. Let’s begin with the steps in the
Given part of the scenarios. We’ll define two helper functions to add the subscribers and products to the
context is an object provided by Behave, which contains the state shared between scenario steps.
We aren’t using any Spark DataFrames yet because it is easier and faster to operate on pure Python objects:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 def add_subscriber(context, subscriber_id, subscriber_name): if 'subscribers' in context: context.subscribers.append([subscriber_id, subscriber_name]) else: context.subscribers = [[subscriber_id, subscriber_name]] def add_purchase(context, subscriber_id, product_name=PRODUCT_NAME, timestamp=1): if 'products' in context: context.products.append([subscriber_id, product_name, timestamp]) else: context.products = [[subscriber_id, product_name, timestamp]] @given(u'Alice purchased a product') def add_customer_with_product(context): add_subscriber(context, ALICE_ID, 'Alice') add_purchase(context, ALICE_ID) @given(u'Alice purchased another product') def add_second_product_for_alice(context): add_purchase(context, ALICE_ID, DIFFERENT_PRODUCT, 2) @given(u'Bob bought no products') def add_subscriber_without_product(context): add_subscriber(context, BOB_ID, 'Bob')
After preparing the scenario context, we can implement the actions.
In the implementation, we’ll execute the function under test and print the result. Behave captures and prints the standard output in case of failures. When all of the tests pass, the print statement doesn’t pollute the log. However, when anything fails, we see the content of the DataFrame logged together with the failed assertion.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @when(u'we generate newsletter recipients') def prepare_newsletter_recipients(context): if 'subscribers' in context: subscribers = context.spark.createDataFrame(context.subscribers, schema=subscribers_struct) else: subscribers = context.spark.createDataFrame(, schema=subscribers_struct) if 'products' in context: products = context.spark.createDataFrame(context.products, schema=product_struct) else: products = context.spark.createDataFrame(, schema=product_struct) context.result = generate_recipient_list(subscribers, products).cache() context.result.show() # !!! Behave captures stdout and prints it when a step fails
Finally, we can implement the functions verifying the outcome:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 def verify_recipient_with_product_only_once(context, subscriber_name): df = context.result if subscriber_name: only_one_person = df.filter((df['subscriber_name'] == subscriber_name) & (df['product_name'].isNotNull())) else: only_one_person = df.filter((df['subscriber_name'].isNull()) & (df['product_name'].isNotNull())) assert only_one_person.count() == 1 @then(u'Alice is on the list') def verify_alice_with_product(context): verify_recipient_with_product_only_once(context, 'Alice') @then(u'Alice is on the list only once') def verify_alice_with_product(context): verify_recipient_with_product_only_once(context, 'Alice') @then(u'Alice gets a message about the second purchase') def verify_alice_with_second_product(context): df = context.result only_one_person = df.filter((df['subscriber_name'] == 'Alice') & (df['product_name'] == DIFFERENT_PRODUCT)) assert only_one_person.count() == 1 @then(u'Bob is not on the list') def verify_no_bob(context): df = context.result only_bob = df.filter(df['subscriber_name'] == 'Bob') assert only_bob.count() == 0
Last but not least, we have to implement the function under test. In a real application, we would import it from a separate Python module. Here, we put it directly in the test file:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 def generate_recipient_list(subscribers, products): window = Window \ .partitionBy(col('subscriber_id')) \ .orderBy(col("purchase_timestamp").desc()) most_recent_purchase = products \ .withColumn( 'position_in_group', row_number().over(window) ) \ .where(col('position_in_group') == 1) \ .drop('position_in_group') return subscribers.join(most_recent_purchase, subscribers.subscriber_id == most_recent_purchase.subscriber_id, 'inner')
Running Behave in Poetry
When we use Poetry as the Python dependency manager, we can run the Behave tests using the command line:
1 2 # run in the parent directory of features/ poetry run behave
In the output, we see the scenario steps, the file containing the step implementation, and the test duration:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 Feature: Show that Spark works # features/env.feature:1 Scenario: Count the number of rows in an empty Dataframe # features/env_test.feature:3 Given an empty dataframe # features/steps/env_test.py:7 3.134s When we count the number of rows # features/steps/env_test.py:12 4.196s Then the result should be zero # features/steps/env_test.py:17 0.000s Feature: Sending a newsletter to the customers # features/mailing.feature:1 Scenario: Newsletter is sent to people who purchased at least one product # features/mailing.feature:5 Given Alice purchased a product # features/steps/mailing.py:53 0.004s When we generate newsletter recipients # features/steps/mailing.py:69 4.792s Then Alice is on the list # features/steps/mailing.py:96 2.385s Scenario: Subscribers with no purchases don't receive the newsletter # features/mailing.feature:10 Given Bob bought no products # features/steps/mailing.py:64 0.000s When we generate newsletter recipients # features/steps/mailing.py:69 2.056s Then Bob is not on the list # features/steps/mailing.py:113 1.680s Scenario: People who purchased more than one product receive only one message # features/mailing.feature:17 Given Alice purchased a product # features/steps/mailing.py:53 0.001s And Alice purchased another product # features/steps/mailing.py:59 0.000s When we generate newsletter recipients # features/steps/mailing.py:69 1.613s Then Alice is on the list only once # features/steps/mailing.py:101 1.678s Scenario: Newsletter mentions only the most recent purchase # features/mailing.feature:23 Given Alice purchased a product # features/steps/mailing.py:53 0.001s And Alice purchased another product # features/steps/mailing.py:59 0.000s When we generate newsletter recipients # features/steps/mailing.py:69 1.716s Then Alice gets a message about the second purchase # features/steps/mailing.py:106 1.525s 2 features passed, 0 failed, 0 skipped 5 scenarios passed, 0 failed, 0 skipped 17 steps passed, 0 failed, 0 skipped, 0 undefined Took 0m24.782s
Is BDD sufficient to test a data product?
Of course, no. In addition to testing the code, we should also validate the input data in the pipeline, and reject the invalid values (or separate them from the correct data). It won’t hurt to be extra paranoid and also validate the output data.
BDD won’t help us with data validation, but we can use it to test the code used to validate the DataFrames.
Additionally, we should log metrics describing the datasets. The minimal set of metrics may include:
- the number of rows
- the number of null values in every column
- max/min values in all columns
- the number of duplicates (if it isn’t too expansive to calculate)
If you want to learn more about data pipeline testing, read my interview with Christopher Bergh, the author of “DataOps Cookbook”, and my article about applying functional programming and the Unix philosophy to data processing.
You may also like
- Dependencies between DAGs: How to wait until another DAG finishes in Airflow?
- What is the difference between CUBE and ROLLUP and how to use it in Apache Spark?
- Definition of done for data engineers
- Three biggest traps to avoid while setting Spark executor memory
- Speed up counting the distinct elements in a Spark DataFrame