Building trustworthy data pipelines

This article is a text version of my talk, "Scale-up your job satisfaction, not your software," which I presented during the BerlinBuzzwords conference (June 14-17, 2021).


When was the last time you spent four hours debugging a data pipeline? It happened to me one day. To be precise, it happened to me one night.

I woke up at 2 am for no apparent reason. I had a feeling I should check our Slack channel delivering alerts when something is wrong. In retrospect, it was mistake number one.

I saw a terrible message in the Slack channel: one of the data pipelines was failing — the most important one. If that one ETL doesn’t work, everything else will fail too. I realized we would spend the whole day restarting ETLs one by one and manually checking the data.

But it was 2 am, I could still fix it. Sure, there would be one hour delay, but at least we wouldn’t waste a whole day fixing the problems. I didn’t have to fix it. I wasn’t the “on-call” engineer. However, I was the first person to see the problem, and I could mitigate the issues. I decided I must fix it. In retrospect, it was mistake number two.

I saw that someone had merged a code change before all the tests were executed. The author wanted to stop working at 5 pm and didn’t wait for the test. I couldn’t be mad. It took a lot of patience to wait for the tests. Unfortunately, a test failed 20 minutes later, and nobody noticed the failure.

I could revert the change and assume that the previous version still worked (because it worked the day before). Or, I could write a fix, wait until the tests pass, deploy it, and run the fixed version in production. Of course, reverting the change means that I would have to fix it twice, so I decided to “save some time.” In retrospect, I decided to make mistake number three. Sometime later, I heard my alarm clock ringing. It was already 6 am.

The fix took four hours, and I had to rework it anyway. At 3 am, my code looked like a brilliant idea, but it wasn’t even close to the most optimal solution.

Have you been there? Do you read this story wondering whether I work at your company?

Trustworthy data pipelines

Before switching to data engineering, I was a backend developer for eight years. I have seen projects with no tests and projects with 100% coverage of useless tests without assertions. Those tests existed only to raise the test coverage metric.

On the other hand, I had seen projects with useful tests with decent coverage, but people complained about test execution time. In those projects, it was unacceptable to wait for the test result longer than two minutes. People complained if it took more than 30 seconds.

Data engineering is somewhere in the middle between no tests and tests running for hours.

If I have to wait three minutes for Apache Spark to start in a test environment, I must be happy if tests run in less than 5 minutes. At some point, we need half an hour to run the tests. In such situations allowing merging before tests pass seems like an obvious and reasonable practice. This is the reality in data engineering. This is how we work. We cannot change that. Can we?

It may be a shock, but data engineering is software engineering. Let’s start at the beginning. What makes software useful? It is all about trust. If we trust the output, we use the software. If we don’t trust it, nobody wants to use it. Similarly, useful data pipelines are trustworthy data pipelines.

How do we build a trustworthy data pipeline?

Let me show you something:

A -> B -> C

This is the universal software architecture diagram ;) It fits almost all software projects, seriously. However, today it is a data pipeline.

What part of the pipeline can break? Every part. We may get invalid input data. We can fail to load it correctly. We may make a mistake in the processing code. We may write the data in the wrong location. The output format may no longer be correct and break another pipeline downstream. Finally, the scheduling mechanism may break, and the entire pipeline will not run at all.

According to Christopher Bergh - a co-author of the “DataOps Manifesto” and the “DataOps Cookbook” - we have two kinds of moving parts in the data pipelines. In a Value Pipeline, the code runs in production and doesn’t change, but the input data changes. In the case of an Innovation Pipeline, the code is still in development. We have consistent test data, but we change the implementation.

Therefore, we will focus on two things: checking whether the code works as expected and validating the data. However, before we even start, we have to simplify the pipeline and get the code under control.

Functional programming

Do you remember what a buzzword it was a few years ago? Every conference was full of talks about monads, monoids, Scala, and Haskell. We can ignore all those topics. We want deterministic output, so all we need are pure functions and immutability.

How do we achieve immutability? After all, we can change everything in a database or an S3 bucket.

First of all, whatever enters the data pipeline is immutable, and we must store it separately. I prefer to write the input data to a raw data bucket as fast as possible. This gives us two benefits. We can audit the input by checking what data we received from external systems. We can also rerun the pipeline.

Of course, keeping a copy of input data increases the cost, so most likely, we will have to remove it at some point. Will it be after 30 days or 30 minutes? I don’t know. It depends on how much we want to spend on storage and whether the data contains sensitive information.

This is the first part. What about pure functions? What is a pure function?

A pure function is a function whose output depends only on the input, and the function does not access or modify the global state. The definition does not resemble anything we do in data engineering.

What is a pure function in data engineering?

Let’s imagine a Spark data pipeline. What are the building blocks? We have the part loading the data, the processing part, and the code writing the output. We can extract the processing part to a separate function and assume that it is a pure function. Its output depends only on the input. The code doesn’t access or modify any external state. It fits the definition of pure function.

What are the benefits of pure functions? It is trivial to test the code. If the function’s output depends only on the input, we can quickly write many very simple tests.

The incorrect way to test code

Speaking of tests. Please take a look at the test below.

TodoService todoService = mock(TodoService.class);
List<String> allTodos = Arrays.asList("Task1", "Task2", "Task3");
when(todoService.retrieveTodos("User1")).thenReturn(allTodos);

NotificationService notificationService = mock(NotificationService.class);
PaymentService paymentService = mock(PaymentService.class);
TimeUtil timeUtil = mock(TimeUtil.class);

TodoBusinessImpl todoBusinessImpl = new TodoBusinessImpl(todoService, notificationService, paymentService, timeUtil);
List<TodoEntity> todos = todoBusinessImpl
        .retrieveTodos("User1");
assertEquals(3, todos.size());
assertFalse(todos.get(0).isDone());
assertTrue(todos.get(1).isDone());
assertFalse(todos.get(2).isDone());

Is this readable? What does it do? We would need to spend some time reading the code to figure it out. We see some mocks. We pass a lot of arguments to the object. If we looked at the implementation of the function we test, we would see many unused constructor parameters. Still, the constructor requires them, so we have to pass them anyway.

Also, look at the verification part. What do we expect? We may need to search for the Jira ticket and read the description. It is not apparent. The test method has no obvious business meaning.

Also, I am sure the programmer wrote the test after the implementation. The author wrote the code, tested it manually, and was told to write the test. As a result, a test has been written or rather copy-pasted from another test and modified. Such tests are easy to spot. If the author started with a test, there would be no needless parameters.

We can quite easily make a similar mess while testing a data pipeline. We need an extensive setup. We create multiple DataFrames containing at least a few rows. The assertions are never simple. The tests are long, and it is hard to tell what caused the failure when they fail. How do we improve that? The setup and assertions are technical details. If we keep them like in the example, we hide the business meaning in tons of noise.

Behavior Driven Development

Behaviour Driven Development is a testing method separating a description of the expected behavior from the technical details of the test implementation.

We separate the human-readable description of the specification from the test code and the production code. The specification is easy to understand. At least, it should be easy to understand. We can quite easily spot a mistake in the specification.

If the specification is easy to understand, we can easily spot a mistake in the test implementation. We no longer have to wonder whether a strange-looking line of code was intentional or not.

Testing the data

Testing the code is the easy part. Software engineers have been doing it for decades (at least, they should be doing it. We have functional tests, integration tests, end-to-end tests, unit tests, and lengthy discussions about differences between those kinds of tests and proper naming. We can find a way to test the code. What about testing the data?

In production, the data changes all the time. Other teams release new features and start sending different kinds of events. We should be prepared for such changes. There are three ways:

  • We can write validation rules rejecting everything unusual, but in this case, the data engineering team becomes a colossal bottleneck preventing everyone else from achieving their goals.

  • We can accept almost everything and hope nobody makes a mistake. Good luck with that.

  • Or we can separate the correct data from unexpected values, but keep both, so we can update the pipeline and reprocess everything when we need to do it.

Why do we do it? What is the point? First of all, we can never let incorrect data into the production pipeline. It is too complicated, too time-consuming to fix the problems when they happen.

But that is not enough. We have tested our data and the code, but our implementation is not 100% bulletproof. We should test the data once again before we write it to the output location. The additional tests prevent us from propagating problems downstream.

How do we write tests for data? We can use Great Expectations or AWS Deequ, which lets us define validation rules for data. In the case of AWS Deequ, we can even run anomaly detection.

Even the most straightforward implementation will help us improve the pipeline. For example, we may consider adding a column with the validation results and splitting data into the correct and incorrect buckets. It is a good starting point.

Data branches

We can do one more thing. What if we didn’t allow incorrect data into the data lake at all? What if we had a data version control system with branches and pull requests and rejected branches if they fail validation?

I cannot promise you pull requests for data, but we can have branches. If we use tools like Lake.fs, we can create a new data branch for every data pipeline run and merge the changes into the main branch after testing.

Let’s imagine an ETL pipeline. We create a new branch. Ingest the data into the separate branch, run all of the tests there, do the processing, run some more tests and merge the data into the main branch when everything is fine. What is the benefit? The production branch contains only complete and valid data.

What is even more important, such data versioning tools handle merges as an atomic action. Either all files get merges, or none of them is merged. It means we no longer have a situation in which one process writes files to S3, and another starts reading them before the writer finishes. We no longer need marker files, status databases, or other ugly hacks to indicate which files are ready to use. If the file exists in the main branch, we can use it.

Testing the additional code

Still, all of those tests are not enough. In the end, we have the infrastructure building code and scripts we use occasionally.

Do you know what happened to me recently? I was working on deploying a Tensorflow machine learning model. I used my deployment script a few times in the past, so I knew it worked. I had the new model, and I wanted to deploy a Sagemaker Endpoint with the model. The deployment failed. I did not change the script, so I assumed it must work. I needed to test it. The only difference was the new model version, so I tried to deploy an older version. It failed. The script was the same, the model was the same, the deployment pipeline was defined in CloudFormation, and nobody made any manual changes.

Here is the log message I saw:

INFO:__main__:tensorflow serving model config:
model_config_list: {
 config: {
   name: 'saved_model'
   base_path: '/opt/ml/model/tensorflow/saved_model'
   model_platform: 'tensorflow'
   model_version_policy: {
     specific: {
       versions:
     }
   }
 }
}

It seems the version was missing. I started googling to find the Sagemaker code. Fortunately, I found a Github repository with some of the code used in Sagemaker Endpoints. A few hours later, I found the problematic line of code:

def find_model_versions(model_path):
 return [version.lstrip("0") for version in os.listdir(model_path) if version.isnumeric()]

# https://github.com/aws/deep-learning-containers

It removes leading zeros from the model version. All of them. Even if zero is the only thing in the version id. Do you know what the problem is? I have one model version per file, so all of them are version zero. I repackaged the model files changing version zero to version one, and tried rerunning the script. It worked.

I wasn’t happy. It was a machine learning deployment script. The data scientists need a few weeks or months to create an improved model version that we may want to deploy. I will run this script several times per year. Do I want to find out that the script does not work anymore on the day when I am supposed to deploy a new model? Of course not.

I want you to test one more thing — your scripts. I run my deployment scripts in an AWS Code Pipeline scheduled to run Monday to Friday at 9 am. It deploys a Sagemaker Endpoint, sends a few test requests, compares the predictions with the expected values, and removes the endpoints. If anything breaks, I will get an early morning email with the error message. It is not the best email I can see in the morning, but it is way better than learning that the deployment does not work anymore when we urgently need to deploy a new model.

This does not apply only to scripts. Do you have a pipeline running only once a month because it generates the monthly report? Do yourself a favor and schedule additional runs at least once a week. If anything breaks, you will have time to fix it before the CEO starts asking where the monthly report is.

Enough. I can write long blog posts about what you should be doing, but eventually, you have to do something. I have homework for you. Here are two software engineering books offering tons of valuable ideas for data engineering.

The first one is “The effective engineer” by Edmond Lau. It is a book about focusing on high-leverage tasks, optimizing the feedback loop, shortening cycle time, and quickly validating your ideas to reduce wasted time. It is also a book about pragmatic automation and choosing the right metrics.

The second book: “The Pragmatic Programmer” by Andrew Hunt and David Thomas. It is a book about writing good enough software, building prototypes, defining practical tests without overengineering it, and making a minimal but working version of the application as soon as possible.

When you finish reading the software books, I’ll give you a bonus homework. The last book does not even contain any code. It is a book about writing. A book about writing well. “On Writing Well” by William Zinsser. Why is it here? First of all, because I would like to see more well-written documentation. Second, because “On writing well” helps you clarify your ideas, express them precisely, and avoid needlessly over-complicated language.

Older post

Theory of constraints in data engineering

Are you busy, but nothing ever gets done? Perhaps, theory of constraints will help you

Newer post

How to add a new dataset to the Feast feature store

How to use Feast feature store in a local environment