dfchain package

Subpackages

Module contents

Top-level package for dfchain.

Example

Constructing a simple task, creating a SqliteExecutor that reads CSV chunks into a temporary SQLite-backed store, running a pipeline and then re-assembling the stored chunks

from dfchain import SqliteExecutor, Pipeline, task
import pandas as pd

@task
def add_dark_border_flag(
    df: pd.DataFrame,
    score_col: str = "border_darkness_score",
    threshold: float = 0.8,
    output_col: str = "has_dark_border",
) -> pd.DataFrame:
    df = df.copy()
    df[output_col] = df[score_col] > threshold
    return df


# Build an executor and stream CSV into it using chunksize
executor = (
    SqliteExecutor(chunksize=1_000)
    .session()
    .textFileReader(pd.read_csv("api_rawitem.csv", chunksize=1_000))
    .build()
)

# Construct a pipeline and run it
pipeline = Pipeline([add_border_darkness_score, add_dark_border_flag])
pipeline.run(executor)

# Re-assemble all stored chunks into a single DataFrame
merged_df = pd.concat(
    (df for _, df in executor.iter_chunks()),
    ignore_index=True
)