Source code for ursa._query_types

"""Public Pydantic types returned by :meth:`DataInterface.query`.

Extracted from the former ``ursa.query`` module as part of ENG-1120: the
``query()`` function and its helpers were inlined into
:mod:`ursa.data_interface`, but the result-shape types stay public so
callers can ``from ursa import QueryResult, QueryResultList, QuerySpec``.

The leading underscore on the module name is intentional — the types
themselves are public, but the module path is internal. Always import
from the top-level ``ursa`` namespace.
"""

from __future__ import annotations

from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any, SupportsIndex, overload

from pydantic import BaseModel, ConfigDict, Field, model_validator
from typing_extensions import Self

from ursa.catalog.schemas import (
    CatalogID,
    EpochNs,
    IngestionStatus,
    MetadataDict,
    ModalityName,
    ModalityRow,
)

if TYPE_CHECKING:
    import pandas as pd


__all__ = ["QueryResult", "QueryResultList", "QuerySpec"]


# ---------------------------------------------------------------------------
# QueryResult
# ---------------------------------------------------------------------------


[docs] class QueryResult(BaseModel): """One recording matched by :meth:`DataInterface.query`. Named after the verb that produced it: ``query()`` is the single selection surface, and ``get()`` / ``download()`` (ENG-890) consume a ``QueryResult`` without further narrowing arguments. Phase 1a carries catalog projections only — no array bytes, no ``temporaldata.Data``. ``ModalityRow.raw_storage_uri`` is the cold-bucket pointer :meth:`~ursa.DataInterface.get` reads from (originally ENG-890, inlined in ENG-1119). ENG-899 (M3) replaces the ``modalities`` values with array-bearing ``temporaldata`` subclasses once the processed store exists. ``.modalities`` is narrowed to the ``modalities=[...]`` argument passed to :meth:`DataInterface.query`. With ``modalities=None`` every modality registered for the recording is present (an empty ``.modalities`` dict here means the catalog has no modality rows for this recording — surfaced as-is rather than hidden). When a non-``None`` filter is passed, the dict keys are a non-empty subset of the requested list: recordings whose intersection with the filter is empty are dropped from the result list rather than returned with an empty ``.modalities`` dict. Frozen for accidental-mutation safety; not hashable (the ``dict``/``list`` fields prevent it). Use ``recording_hash`` as the cache key. """ model_config = ConfigDict(frozen=True, extra="forbid") recording_hash: CatalogID participant_ids: list[CatalogID] start_time: EpochNs duration: timedelta device_info: MetadataDict metadata: MetadataDict modalities: dict[ModalityName, ModalityRow] @property def start_time_dt(self) -> datetime: """Aware UTC datetime view of ``start_time`` (µs-lossy).""" from ursa.time import ns_to_datetime return ns_to_datetime(self.start_time)
[docs] def __repr__(self) -> str: mods = sorted(self.modalities.keys()) return ( f"QueryResult(recording_hash={self.recording_hash!r}, " f"participants={self.participant_ids!r}, " f"start={self.start_time_dt.isoformat()}, " f"duration={self.duration!s}, " f"modalities={mods!r})" )
# Pydantic v2's BaseModel.__str__ formats fields as ``k=v`` without a # class name, which would diverge from our compact ``__repr__``. Route # ``str()`` through ``__repr__`` so the discovery surface renders # identically in REPL / log output.
[docs] def __str__(self) -> str: return self.__repr__()
[docs] def _to_row_dict(self) -> dict[str, Any]: """Single source-of-truth row projection used by both ``_repr_html_`` and :meth:`QueryResultList.to_dataframe`. Keeping it on the model guarantees the two renderers never drift. """ return { "recording_hash": self.recording_hash, "participant_ids": list(self.participant_ids), "start_time": self.start_time_dt, "duration": self.duration, "ingestion_status": _aggregate_ingestion_status(self.modalities), "modalities": sorted(self.modalities.keys()), }
[docs] def _repr_html_(self) -> str: import pandas as pd return str(pd.DataFrame([self._to_row_dict()])._repr_html_())
[docs] def _aggregate_ingestion_status( modalities: dict[ModalityName, ModalityRow], ) -> str | None: """Aggregate ``ingestion_status`` across a recording's modalities for the DataFrame projection. ``None`` (rendered as ``NaN`` in pandas) when the modality set is empty — preserves the distinction between "no modality rows" and "all processed" that :class:`QueryResult`'s docstring calls out. """ if not modalities: return None if any(m.ingestion_status is IngestionStatus.RAW for m in modalities.values()): return "raw" return "processed"
_DATAFRAME_COLUMNS: tuple[str, ...] = ( "recording_hash", "participant_ids", "start_time", "duration", "ingestion_status", "modalities", ) # --------------------------------------------------------------------------- # QueryResultList # ---------------------------------------------------------------------------
[docs] class QueryResultList(list[QueryResult]): """List of :class:`QueryResult` with notebook/DataFrame rendering. Returned by :meth:`DataInterface.query`. Subclasses :class:`list` so existing ``list[QueryResult]`` annotations and ``isinstance(x, list)`` checks keep working unchanged. Slicing (``results[:5]``), concatenation (``results + more``), :py:meth:`copy`, and multiplication preserve the subtype so the render methods stay reachable. ``sorted(results, ...)`` is the one common path that drops back to a plain :class:`list` (CPython implementation detail). Wrap with ``QueryResultList(sorted(results, ...))`` if you need ``.to_dataframe()`` on a sorted view. """
[docs] def to_dataframe(self) -> pd.DataFrame: """Return a :class:`pandas.DataFrame` with one row per result. Columns mirror :class:`QueryResult` attribute names so ``df.modalities`` and ``qr.modalities`` read the same. The ``ingestion_status`` column is aggregated across each recording's modalities — ``"raw"`` if any are raw, else ``"processed"``, ``NaN`` if the recording has no modality rows. """ import pandas as pd # M2 catalogs are small (<10k recordings); the full materialization # per ``_repr_html_`` call is fine. When M3 catalog growth crosses # ~10k rows, bound construction at N rows here — likely as part of # the ENG-1066 / ENG-1088 metadata-pushdown work. rows = [r._to_row_dict() for r in self] return pd.DataFrame(rows, columns=list(_DATAFRAME_COLUMNS))
[docs] def _repr_html_(self) -> str: return str(self.to_dataframe()._repr_html_())
[docs] def __repr__(self) -> str: n = len(self) word = "recording" if n == 1 else "recordings" if n == 0: return "QueryResultList(0 recordings)" union: set[str] = set() for r in self: union.update(r.modalities.keys()) names = sorted(union) if len(names) > 5: inner = ", ".join(names[:5]) + ", ..." else: inner = ", ".join(names) return f"QueryResultList({n} {word}, modalities={{{inner}}})"
# --- list-subclass type preservation ----------------------------------- # CPython's built-in slicing / concatenation / copy return plain ``list``, # which would break ``results[:5].to_dataframe()``. Re-wrap explicitly. @overload def __getitem__(self, key: SupportsIndex) -> QueryResult: ... @overload def __getitem__(self, key: slice) -> QueryResultList: ...
[docs] def __getitem__(self, key: SupportsIndex | slice) -> QueryResult | QueryResultList: if isinstance(key, slice): return QueryResultList(list.__getitem__(self, key)) return list.__getitem__(self, key)
[docs] def __add__(self, other: list[QueryResult]) -> QueryResultList: # type: ignore[override] return QueryResultList(list.__add__(self, list(other)))
[docs] def __radd__(self, other: list[QueryResult]) -> QueryResultList: return QueryResultList(list(other) + list(self))
[docs] def __mul__(self, n: SupportsIndex) -> QueryResultList: return QueryResultList(list.__mul__(self, n))
[docs] def copy(self) -> QueryResultList: return QueryResultList(self)
# --------------------------------------------------------------------------- # QuerySpec # ---------------------------------------------------------------------------
[docs] class QuerySpec(BaseModel): """Pydantic spec for :meth:`DataInterface.query`. Phase 1a — implemented over the raw catalog: ``participants``, ``modalities``, ``recording_hash``, ``metadata``. Phase 1a — accepted at the signature level, raising :class:`NotImplementedError` (referencing ENG-1082) when a matched modality has ``ingestion_status="raw"``: ``time_range``, ``pipeline_version``, ``time_filters``, ``metadata_filters``, ``derived``. The complex-list fields use ``list[Any]`` as a deliberate forward-compat escape hatch; ENG-1082 will replace each with a typed model (``AroundEvent | TimeWindow``, ``Filter``, ``DerivedSelector``). """ model_config = ConfigDict(frozen=True, extra="forbid") # Phase 1a — implemented participants: list[CatalogID] | None = None modalities: list[ModalityName] | None = None recording_hash: CatalogID | None = None metadata: MetadataDict | None = None # Phase 1a — type-stable, gated at execution time time_range: tuple[EpochNs, EpochNs] | None = None pipeline_version: str | None = None time_filters: list[Any] = Field(default_factory=list) metadata_filters: list[Any] = Field(default_factory=list) derived: list[Any] = Field(default_factory=list)
[docs] @model_validator(mode="after") def _validate(self) -> Self: if self.time_range is not None and self.time_range[1] <= self.time_range[0]: raise ValueError( f"time_range end ({self.time_range[1]}) must exceed start ({self.time_range[0]})" ) return self
[docs] def has_processed_kwarg(self) -> bool: """True iff any kwarg requiring the processed store is set.""" if self.time_range is not None or self.pipeline_version is not None: return True return bool(self.time_filters or self.metadata_filters or self.derived)