Data Transformation Design Pattern

When transforming data, I have often been using a pattern like the following. I’ve created a class that is responsible for different data transformation steps. Generally speaking, structuring data transformations using classes makes sense as classes are a neat way to group related functions together. For another developer who has to read the code it will be much easier to gain an overview because it’s immediately clear that all functions under a class must share some common functionality or logic.

Naive Approach

class DataTransformer:

    def __init__(self, data: DataFrame):
        self.data = data
    
    def drop_duplicates(self) -> DataFrame:
        self.data = self.data.dropDuplicates()

    def drop_nulls(self) -> DataFrame:
        self.data =  self.data.na.drop()

# Usage
data = load_data()
data_transformer = DataTransformer(data)
data_transformer = data_transformer.drop_duplicates()
data_transformer = data_transformer.drop_nulls()

Unfortunately, the above approach has a few flaws. First, method chaining is not possible. Instead, each time a method is called a new variable has to be defined or the previous variable needs to be overwritten. This is generally not recommended. Another drawback is that the data attribute is overwritten by each method, potentially leading to confusion and making it difficult to track the transformed data, especially when the data transformations are complex. Thus, side effects are easily introduced by this architecture. Moreover, transforming the original data attribute makes it impossible to reuse the original data. If it’s needed again (maybe outside of the class) the load_data method has to be called again, which is not desired in cases of large data load times.

A step into the right direction is the following. Instead of overwriting the data attribute in each transformation step, a dataframe object is returned. This retains the original data and is potentially less confusing and easier to debug.

Functional Approach

class DataTransformer:
    
    @staticmethod
    def drop_duplicates(data) -> DataFrame:
        return data.dropDuplicates()...

    @staticmethod
    def drop_nulls(data) -> DataFrame:
        return data.na.drop()

# Usage
parser = Parser()
df_deduplicated = parser.transform(df)
df_deduplicated_without_nulls = parser.drop_nulls(df_deduplicated)

However, this approach is still a little awkward, as method chaining is still not possible and the user of the class has to be quite creative in finding speaking variable names (my naming choice is certainly not a good example).

Instead, here are a few other approaches that fill some of the gaps I’ve highlighted.

Transformation Pipeline (my favorite)

from abc import ABC, abstractmethod 

class Transformer(ABC):
    @abstractmethod
    def transform(self, data: DataFrame) -> DataFrame:
        pass

class DuplicatesDropper(Transformer):
    def transform(self, data: DataFrame) -> DataFrame:
        return data.dropDuplicates()

class NullDropper(Transformer):
    def transform(self, data: DataFrame) -> DataFrame:
        return data.na.drop()


class Pipeline:
    def __init__(self, transformers: List[Transformer] = None):
        self.transformers = transformers or []
    
    def add(self, transformer: Transformer) -> 'Pipeline':
        self.transformers.append(transformer)
        return self
    
    def run(self, data: DataFrame) -> DataFrame:
        result = data
        for transformer in self.transformers:
            result = transformer.transform(result)
        return result

# Usage
result_df = (
    Pipeline()
    .add(DuplicatesDropper())
    .add(NullDropper())
    .run(initial_df)
)

Strengths of the Approach

  1. Separation of Concerns: Each transformer is responsible for a specific transformation. This makes the code easier to understand, test, and maintain.

  2. Extensibility: The Pipeline class can easily be extended with new transformers without modifying existing code. This makes the system flexible and adaptable to new requirements.

  3. Method Chaining: The Pipeline class supports method chaining, allowing for a clean and readable sequence of transformations.

  4. Modularity: The design is modular, with each transformer implementing a transform method. This makes it easy to add, remove, or modify transformations independently.

  5. Clear Usage Pattern: The usage pattern is clear and intuitive, demonstrating how to build a pipeline and apply transformations in a sequence.

There’s also a few other approaches which are solid choices too in my opinion:

Builder Pattern

class Transformer:
    def __init__(self, data: DataFrame):
        self.data = data
        
    def drop_duplicates(self) -> 'Transformer':
        self.data = self.data.dropDuplicates()
        return self
        
    def drop_nulls(self) -> 'Transformer':
        self.data = self.data.na.drop()
        return self
        
    def build(self) -> DataFrame:
        """Return the final transformed DataFrame."""
        return self.data

# Usage
result_df = (
    Transformer(initial_df)
    .drop_duplicates()
    .drop_nulls()
    .build()
)

Factory Method

class DataTransformer:
    def __init__(self, data: DataFrame):
        self.data = data
    
    def drop_duplicates(self) -> DataFrame:
        self.data = self.data.dropDuplicates()
        return self.data
    
    def drop_nulls(self) -> DataFrame:
        self.data = self.data.na.drop()
        return self.data
    
    @classmethod
    def transform(cls, data: DataFrame) -> DataFrame:
        parser = cls(data)
        parser.drop_duplicates()
        return parser.drop_nulls()

# Usage
transformed_df = DataTransformer.transform(df)

Strategy Pattern

from abc import ABC, abstractmethod

class TransformationStrategy(ABC):
    @abstractmethod
    def transform(self, data: DataFrame) -> DataFrame:
        pass

class DeduplicationStrategy(TransformationStrategy):
    def transform(self, data: DataFrame) -> DataFrame:
        return data.dropDuplicates()

class DropNullsStrategy(TransformationStrategy):
    def transform(self, data: DataFrame) -> DataFrame:
        return data.na.fill(0)

class DataFrameProcessor:
    def __init__(self, data: DataFrame):
        self.data = data
        
    def transform(self, strategy: TransformationStrategy) -> DataFrame:
        return strategy.transform(self.data)

# Usage
processor = DataFrameProcessor(df)
deduplicated_df = processor.transform(DeduplicationStrategy()).transform(DropNullsStrategy())