Source code for dfchain.backends.pandas.executor_impl

"""Pandas backend executor implementation.

This module provides :class:`PandasExecutor`, an in‑memory implementation of
:class:`dfchain.core.executor.Executor` that wraps a single
:pandas:`pandas.DataFrame` instance and exposes grouping and chunking hooks
used by higher‑level APIs.
"""

from __future__ import annotations

from collections.abc import Hashable
from dataclasses import dataclass
from typing import Any, Iterable

import pandas
from pandas.core.groupby.generic import DataFrameGroupBy

from dfchain.core.executor._abc import Executor


[docs] @dataclass class PandasExecutor(Executor): """Executor implementation backed by an in‑memory :class:`pandas.DataFrame`. ``PandasExecutor`` is a lightweight in‑memory executor that wraps a :class:`pandas.DataFrame` and implements the grouping and chunking hooks defined by :class:`dfchain.core.executor.PartitionAble`. Attributes: _df (:obj:`pandas.DataFrame` or `None`, default `None`): The wrapped dataframe. It can be provided at construction time or later via :meth:`df`. is_eager (bool, default `False`): Hint for task execution mode. When ``True``, tasks may execute eagerly rather than building a deferred plan. The exact semantics are defined by higher‑level APIs. is_inplace (bool, default `False`): When ``True``, task functions are expected to mutate ``_df`` in place. When ``False``, tasks should treat ``_df`` as immutable and reassign a new dataframe instead. chunksize (int or `None`, default `None`) Optional hint used by higher‑level code to determine how many rows to process per chunk when streaming or partitioning the data. Note: The pandas backend is designed for in‑memory use and does not maintain an index by group key. As a result, methods that would write changes back to specific groups (``update_group``, ``clear_groups``, ``rebuild_groups``) raise :class:`NotImplementedError`. """
[docs] def df(self, df): """Set the wrapped dataframe and return self for fluent construction. Parameters: df (:class:`pandas.DataFrame`): Dataframe to wrap. Returns: `self` (:obj:`PandasExecutor`): The executor instance. """ self._df = df return self
# --- grouping ------------------------------------------------- def _groupby(self, *args: Any, **kwargs: Any) -> DataFrameGroupBy: """Low‑level groupby implementation. This simply delegates to :meth:`pandas.DataFrame.groupby` with the provided arguments and returns a pandas :class:`DataFrameGroupBy`. """ return self._df.groupby(*args, **kwargs)
[docs] def iter_groups(self) -> Iterable[tuple[Hashable, pandas.DataFrame]]: """Iterate grouped data as ``(key, group_df)`` pairs. If :attr:`_groupkey` is ``None``, yield a single pair ``(None, self._df)`` containing the whole dataframe. Otherwise, perform ``self._df.groupby(self._groupkey)`` and yield the resulting ``(key, group)`` pairs produced by pandas. """ if self._groupkey is None: # No grouping defined; yield the whole dataframe as a single group yield (None, self._df) return for key, group in self._df.groupby(self._groupkey): yield key, group
[docs] def update_group(self, df: pandas.DataFrame) -> None: """Update the current group with the provided dataframe. The pandas backend does not maintain an index by group key, so there is no safe default way to update a single group in place. This method therefore raises :class:`NotImplementedError`. Backends that support indexed group updates (for example, a database backend) should provide an implementation. """ # Without a defined grouping key or current group context, we cannot # implement a safe default. This method is therefore a no‑op by # default and should be overridden where group updates are needed. raise NotImplementedError("PandasExecutor does not index by groupkeys")
[docs] def clear_groups(self) -> None: """Clear any cached grouping state. The pandas executor does not cache grouped state keyed by a group index, so this method raises :class:`NotImplementedError`. """ raise NotImplementedError("PandasExecutor does not index by groupkeys")
[docs] def rebuild_groups(self, flush_every: int = 1): """Rebuild or re‑materialize groups. Parameters: flush_every (`int`, optional): Hint controlling how often to flush intermediate state. Not implemented for the in‑memory pandas backend. """ raise NotImplementedError("PandasExecutor does not index by groupkeys")
# --- chunking -------------------------------------------------
[docs] def iter_chunks(self) -> Iterable[pandas.DataFrame]: """Iterate dataframe chunks. The default in‑memory strategy yields a single chunk containing the entire dataframe. Callers that require more advanced chunking behaviour should subclass :class:`PandasExecutor` or use a different :class:`~dfchain.core.executor.Executor` implementation. Note: The default implementation yields a ``(key, chunk)`` pair where the ``key`` is ``0`` and ``chunk`` is the full dataframe. Higher‑level code should account for this convention when consuming the iterator. """ # Simple default: a single chunk containing the whole dataframe. # Callers that need more advanced behavior can override this in a # subclass or provide a different Executor implementation. yield (0, self._df)
[docs] def write_chunk(self, key, val): """Write a processed chunk back to the executor. The pandas backend treats the entire dataframe as one chunk; this method therefore replaces ``self._df`` with ``val`` and ignores the provided ``key``. """ self._df = val