dfchain.backends.sqlite package

Subpackages

Submodules

dfchain.backends.sqlite.executor_impl module

class dfchain.backends.sqlite.executor_impl.SqliteExecutor(_groupkey: Hashable | None = None, _df: DataFrameLike | None = None, is_eager: bool = False, is_inplace: bool = False, chunksize: int | None = None, _session: Session | None = None)[source]

Bases: Executor

SQLite backed executor storing pickled DataFrame chunks and groups.

This class exposes a small builder API for configuration: session(...), groupkey(...) and eager() all return self so they can be chained.

_session

SQLAlchemy session used to persist chunks and groups. If not set, build() will create a temporary engine and session.

Type:

Session | None

_groupby_cache

Internal cache mapping groupby call signatures to DistributedGroupBy instances.

Type:

dict

Example:

executor = (
    SqliteExecutor(chunksize=1_000)
    .session()
    .textFileReader(pd.read_csv("input_file.csv", chunksize=1_000))
    .build()
)
clear_cache() None[source]

Clear internal groupby cache.

clear_groups() None[source]

Clear stored groups in the database and clear caches.

This deletes all DfGroup rows from the configured session and resets the in-memory groupby cache.

df(df: DataFrame)[source]

Set an in-memory DataFrame to be persisted when build() is run.

Parameters:

df (pandas.DataFrame) – DataFrame to partition and store as chunks.

Raises:

TypeError – if df is not a pandas DataFrame

Returns:

self for chaining.

Return type:

SqliteExecutor

eager() SqliteExecutor[source]

Enable eager execution and return self (builder-style).

Eager execution changes internal flags and can affect how groupby operations are represented. See class documentation for details.

Returns:

self for chaining.

Return type:

SqliteExecutor

get_session() Session[source]

Return the configured SQLAlchemy session.

Returns:

SQLAlchemy session used by the executor.

Return type:

Session

groupkey(groupkey: str | None) SqliteExecutor[source]

Set the grouping key used by group operations and return self.

This is a builder-style setter; read the configured value using Executor.get_groupkey().

Parameters:

groupkey (str | None) – Column name (or None) to use for grouping.

Returns:

self for chaining.

Return type:

SqliteExecutor

iter_chunks()[source]

Yield stored chunk id and unpickled DataFrame for each stored chunk.

Yields:

Iterator[tuple[int, pandas.DataFrame]] – (chunk_id, DataFrame)

iter_groups()[source]

Yield stored group key and unpickled grouped DataFrame blob.

Yields:

Iterator[tuple[str | None, pandas.DataFrame]] – (group_key, DataFrame)

read_csv(*ac, **av)[source]

Read CSV via the sqlite-backed CSV reader.

This returns the reader instance’s read_csv callable which mirrors pandas.read_csv semantics but stores partitions directly into the sqlite-backed executor.

rebuild_groups(flush_every: int = 1) None[source]

Rebuild DfGroup rows from stored chunks.

This reads all stored chunks, groups rows by the configured group key, and writes combined group blobs into the DfGroup table. flush_every controls how often buffered per-group frames are flushed to the DB to limit memory usage.

Parameters:

flush_every (int) – How many chunks to process before flushing per-group buffers to the database.

Raises:

RuntimeError – if the session is not set on the executor.

session(session: Session | None = None) SqliteExecutor[source]

Set the SQLAlchemy session and return self (builder-style). If no session is configured, a temporary on-disk SQLite engine and session will be created.

Parameters:

session (Session | None) – SQLAlchemy session to use.

Returns:

self for chaining.

Return type:

SqliteExecutor

textFileReader(textFileReader: Iterator[DataFrame])[source]

Consume an iterator of DataFrame chunks and persist them.

Parameters:

textFileReader (Iterator[pandas.DataFrame]) – Iterator yielding DataFrame partitions (e.g. pandas read_csv with chunksize).

Returns:

self for chaining.

Return type:

SqliteExecutor

update_group(df: DataFrame)[source]

Update stored per-group aggregates using df.

If the executor has no configured group key this is a no-op.

Parameters:

df (pandas.DataFrame) – DataFrame whose groups should be merged into the stored group rows.

write_chunk(key: int, val: DataFrame) None[source]

Write (or update) a chunk row in the DB.

Parameters:
  • key (int) – Chunk identifier.

  • val (pandas.DataFrame) – DataFrame chunk to store.

dfchain.backends.sqlite.executor_impl.logger = <Logger dfchain.backends.sqlite.executor_impl (WARNING)>

SQLite-backed Executor implementation.

This module implements SqliteExecutor, an Executor that stores pandas DataFrame chunks and grouped DataFrames as pickled blobs inside a SQLite database via SQLAlchemy. The implementation supports a small fluent builder API for configuration and exposes iteration and groupby semantics that are compatible with pandas-like workflows.

Key classes and functions - SqliteExecutor: main Executor implementation used by the sqlite backend. - update_group/_update_group_inner: helpers used to update stored group

aggregates from incoming chunk data.

The docstrings in this module follow the Google style so they can be rendered nicely by Sphinx (napoleon extension).

dfchain.backends.sqlite.executor_impl.update_group(session, groupkey, chunk)[source]

Update stored groups using an incoming chunk DataFrame.

The function will group the chunk by groupkey and merge each group into the DfGroup table using _update_group_inner.

Parameters:
  • session (Session) – SQLAlchemy session to use.

  • groupkey (str | None) – Column to group by. If None, the function is a no-op.

  • chunk (pandas.DataFrame) – Incoming chunk to merge.

dfchain.backends.sqlite.models module

class dfchain.backends.sqlite.models.Base(**kwargs: Any)[source]

Bases: DeclarativeBase

metadata: ClassVar[MetaData] = MetaData()

Refers to the _schema.MetaData collection that will be used for new _schema.Table objects.

See also

orm_declarative_metadata

registry: ClassVar[_RegistryType] = <sqlalchemy.orm.decl_api.registry object>

Refers to the _orm.registry in use where new _orm.Mapper objects will be associated.

class dfchain.backends.sqlite.models.DfChunk(**kwargs)[source]

Bases: Base

data: Mapped[bytes]
groupbykeys: Mapped[str | None]
id: Mapped[int]
class dfchain.backends.sqlite.models.DfGroup(**kwargs)[source]

Bases: Base

data: Mapped[bytes]
groupbykeys: Mapped[str]
id: Mapped[int]

dfchain.backends.sqlite.schema module

Docstring for dfchain.backends.sqlite.schema

dfchain.backends.sqlite.schema.session_factory()[source]

dfchain.backends.sqlite.util module

class dfchain.backends.sqlite.util.DistributedGroupBy(groupby_iter_factory: Callable[[], Iterator[DataFrameGroupBy]], _selection: str | list[str] | None = None)[source]

Bases: object

Docstring for DistributedGroupBy

Variables:
  • semantics – Description

  • pandas (returns) – Description

  • Placeholder (keep) – Description

agg(func: str | Callable | dict[str, Any] | list[Any] | tuple[Any, ...] | None, *args: Any, **kwargs: Any) Any[source]

Partition-local agg, then global combine.

Works like pandas: returns a Series/DataFrame depending on selection & func.

clear_cache() None[source]
groupby_iter_factory: Callable[[], Iterator[DataFrameGroupBy]]
dfchain.backends.sqlite.util.ensure_df_iter(obj: Any) Iterator[DataFrame][source]
dfchain.backends.sqlite.util.freeze_call(args: tuple[Any, ...], kwargs: dict[str, Any]) Hashable[source]

returns a cached/memoized call of groupby.method like agg or simply the items

dfchain.backends.sqlite.util.init_chunk(session, groupkey, chunk_id, chunk)[source]
dfchain.backends.sqlite.util.init_group(session, groupkey, chunk)[source]

Internal helper: merge chunk groups into stored DfGroup rows.

dfchain.backends.sqlite.util.norm_key(k) str[source]

Module contents