dfchain documentation

dfchain

PyPI - VersionStatic BadgeStatic Badge

A small library for composing Pandas ETL task functions involving a DataFrame.

Overview

  • Library dfchain provides lightweight building blocks to implement ETL operations as Python functions and run them in a sequential pipeline.

Requirements

  • Python 3.12 or newer

    • pandas >= 2

    • SQLAlchemy >= 2

Installation

From the project root:

  1. Install Python 3.12 or newer. Using Anaconda,

    conda create --name [env-name] python=3.12
    
  2. Create and activate a virtual environment (recommended),

    python -m venv .venv
    source .venv/bin/activate
    
  3. Install the package,

    python -m pip install .
    

High-level API (overview)

The library provides a few simple moving parts to compose ETL logic as reusable functions.

  • Pipeline

    • Class: dfchain.api.pipeline.Pipeline

    • Purpose: Compose and execute an ordered sequence of Transformations.

    • Usage: pipeline = Pipeline([step1, step2]); pipeline.run(executor)

    • Steps: callables following the Transformation protocol (see below).

  • Transformation

    • Protocol: any callable matching (df: pd.DataFrame, executor: Executor, **kwargs) -> pd.DataFrame

    • Note: The simple @task decorator adapts plain functions to this protocol.

  • task decorator

    • Function: dfchain.api.task.task

    • Purpose: Adapt a simple function that operates on a pandas.DataFrame so it can be used as a Pipeline step. Supports both single-DataFrame and iterator-of-chunks input shapes.

  • Executor (core abstraction)

    • Interface: dfchain.core.executor.Executor (abstract base)

    • Responsibilities:

      • Provide chunk iteration via iter_chunks() -> Iterator[(chunk_id, DataFrame)]

      • Optionally provide group semantics (groupkey, rebuild_groups, iter_groups)

      • Control eager vs non-eager update semantics via is_eager/is_inplace flags

      • Persist transformed chunks via write_chunk / update_group

  • Provided executors

    • PandasExecutor: In-memory executor backed by a pandas.DataFrame.

    • SqliteExecutor: SQLite-backed executor that stores pickled chunk blobs and per-group aggregated blobs to enable chunked processing with a small on-disk footprint.

  • Writing a pipeline

    • Define a list of step callables (functions decorated with @df_task or compatible with the Transformation protocol).

    • Construct a Pipeline and run it with a PandasExecutor.

    Example:

    from dfchain import SqliteExecutor, Pipeline, task
    import pandas as pd
    
    @task
    def add_dark_border_flag(
        df: pd.DataFrame,
        score_col: str = "border_darkness_score",
        threshold: float = 0.8,
        output_col: str = "has_dark_border",
    ) -> pd.DataFrame:
      """Adds a boolean column indicating whether the border is very dark.
    
      Args:
          df: DataFrame containing a border darkness score column.
          score_col: Name of the column with border darkness scores in [0, 1].
          threshold: Threshold above which the border is considered dark.
          output_col: Name of output boolean column.
    
      Returns:
          pandas.DataFrame: Copy of df with new boolean column.
      """
      df = df.copy()
      df[output_col] = df[score_col] > threshold
      return df
    
    
    executor = (
      SqliteExecutor(chunksize=1_000)
      .session()
      .textFileReader(pd.read_csv("api_rawitem.csv", chunksize=1_000))
    )
    
    pipeline = Pipeline([add_border_darkness_score, add_dark_border_flag])
    pipeline.run(executor)
    
    merged_df = pd.concat(
      (df for _, df in executor.iter_chunks()),
      ignore_index=True
    ) # This concatenates in memory. You do not need to do this to save the DataFrame as a CSV
    

    The resulting DataFrame can be streamed by iterating over executor.iter_chunks(), or saved with

    from dfchain import SqliteCsvWriter
    SqliteCsvWriter(executor).to_csv("api_rawitem_flags.csv", index=False)
    

Examples

Operating on a CSV file containing base64 encoding images is memory-intensive. Using distributed execution like with dfchain reduces active RAM usage by doing it in chunks. Here is the before and after of an image manipulation task that removes dark borders from base64 images in a CSV file, using the opencv library.

RAM Usage

By distributing pandas operations, only one chunk of a CSV needs to be held in memory. The graph below shows the maximum RAM usage of dfchain on a 4gb CSV file, depending on chunk_size. The orange line shows the usage when chunk_size is 100.