How to build maintainable software by abstracting the business rules in data engineering

We tend to abstract API/database dependencies and hide their complexity behind a DAO library or an API SKD. It is the correct approach. However, I think it is not enough. I think we forget about many parts of code that could benefit from adding an abstraction layer.

After all, how many times have you seen switching to a new database?

Abstracting the business rules

Abstracting the external applications gives us no benefit if the business code consists of multi-level nested conditional statements sharing a mutable state. In addition to encapsulating the database access, we should also encapsulate the business rules.

Which design patterns can we use to avoid having such a mess? We will need a few of them. First, we turn the body of every conditional condition into a separate strategy/policy object.

After that, we can extract the conditions to a factory method. It lets us hide the if statements in one place. In the case of a deeply nested conditional structure, I recommend having multiple factory methods.

Let’s take a look at a simplified example.

if some_condition == 'value' and some_other_condition == 123:
    # here is the code for usecase 1
elif some_condition == 'value_23' and some_other_condition == 987:
    if something == 3:
        # here is the code for usecase 2
    else:
        # here is the code for usecase 3
elif some_condition == 'value_88' and some_other_condition == 555:
    # here is the code for usecase 4
else:
    # here is the code for usecase 5

Let’s fix it step by step. First, I extract the body into strategy objects creating usecase classes.

class Usecase1():
    def __init__(self, dependencies):
        # setup dependencies

    def apply(self, parameters):
        # usecase body

Now, I copy the if statements to a factory method and return the strategy objects.

def pick_the_strategy():
    if some_condition == 'value' and some_other_condition == 123:
        return Usecase1() # imagine this has a meaningful name, not some generic Usecase1
    elif some_condition == 'value_23' and some_other_condition == 987:
        if something == 3:
            return Usecase2()
        else:
            return Usecase3()
    elif some_condition == 'value_88' and some_other_condition == 555:
        return Usecase4()
    else:
        return Usecase5()

To execute an use case, we need two lines of code. First, we get a strategy from the factory. Then we run the strategy.

usecase = pick_the_strategy()
usecase.apply(usecase_parameter1, usecase_parameter2)

Benefits of code abstractions

Why do we do it? We have simplified the code a bit but created multiple new classes. Previously, every change required modifying the code only in one place - somewhere in the vast nested if. It was error-prone, but we worked in a familiar setup. Have we improved anything?

We have at least three benefits of the new code structure.

First, we can easily test the code. Instead of one massive code block, we have a bunch of strategy objects. Most of them have no conditional branches, so we need only one test to check their behavior. In the case of more complex strategies, we have many tests, but all of those tests are easy to understand.

Second, code dependencies became clear.

Previously, we had to pass all dependencies to the object containing the massive logic even if only one code branch required those dependencies.

Is this a big problem? It is terrible for the test code. We have twelve dependencies, but we use only one of them. We see that the test’s author configured only one test dummy, but we don’t know why they did it. Is this the only dependency, or did they forget about the other dependencies? Are we testing the code, or do we have a useless test? You won’t know unless you carefully read the test code and the tested branch of the code.

After extracting strategy objects, we simplified the constructor. Now, we pass only the required dependencies. We immediately see what is required to run the code — no more useless mocks in the tests.

Last but not least, multiple programmers can work on the business logic without stepping on each other toes. Previously, we had one code block which required changes in various places to introduce new behavior or change the business logic. If many people worked on different tasks related to the same part of code, they had to resolve conflicts between pull requests.

Now, the only shared part is the factory method. The conditions may be quite complex, but the code structure is simple. We can quickly resolve a merge conflict there. On the other hand, the strategy objects implement independent behaviors, so it is unlikely that many programmers modify the same class simultaneously.

Code abstractions in data engineering

Can you imagine creating a factory method or strategy objects in the code using Apache Spark? I can’t do that either. It would be easy to cause a needless reshuffle or break Spark’s optimizations.

In data engineering, we need to implement strategies slightly differently. We do not use the factory pattern, but we can still use all benefits of code abstractions.

However, in this case, we extract a part of the pipeline (without breaking the data flow!) to a separate function which adds the factory method functionality using the case notation. Furthermore, we can extract the business logic of every case into another function - implementing a strategy pattern.

There is no easy, clean, and elegant way to do it. We could try using the when instruction, and applying a UDF function isn’t perfect because we want to use built-in transformations as much as possible.

It is easy to turn such a solution into a performance nightmare. We could turn every case into a separate operation filtering the data first, applying the transformation, and joining it with all of the other cases in the end. However, we must be careful here. Remember to look at the execution plan to check whether your solution creates needless reshuffling.

Object-oriented programming isn’t helpful in the case of data pipelines. We are better off focusing on creating an uninterrupted data flow and using functional programming principles. It pays off to remember that whatever you do, it will be executed as a series of map, reduce, and reshuffle operations.

Similar to the object-oriented version, the extracted data flow will be easier to test. However, this time, we test a part of the factory method together with the corresponding strategy implementation. Technically, we could use a mock implementation in the factory method test to check whether we interact with the correct strategy. Don’t do it.

Of course, we have an arbitrary rule saying that every test should verify one behavior. We intentionally break it by testing both selecting the behavior and executing it. Nevertheless, separating those actions into multiple tests would introduce needless complexity without giving us any benefits.

Older post

Testing legacy data pipelines

Do you struggle with maintaining your legacy data pipelines? Check out our article on how to add tests and refactor your code while working with legacy data pipelines.

Newer post

How to run batch inference using Sagemaker Batch Transform Jobs

Running a batch machine learning job using Sagemaker and data stored in S3.