"""Public Pydantic types returned by :meth:`DataInterface.query`.
Extracted from the former ``ursa.query`` module as part of ENG-1120: the
``query()`` function and its helpers were inlined into
:mod:`ursa.data_interface`, but the result-shape types stay public so
callers can ``from ursa import QueryResult, QueryResultList, QuerySpec``.
The leading underscore on the module name is intentional — the types
themselves are public, but the module path is internal. Always import
from the top-level ``ursa`` namespace.
"""
from __future__ import annotations
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any, SupportsIndex, overload
from pydantic import BaseModel, ConfigDict, Field, model_validator
from typing_extensions import Self
from ursa.catalog.schemas import (
CatalogID,
EpochNs,
IngestionStatus,
MetadataDict,
ModalityName,
ModalityRow,
)
if TYPE_CHECKING:
import pandas as pd
__all__ = ["QueryResult", "QueryResultList", "QuerySpec"]
# ---------------------------------------------------------------------------
# QueryResult
# ---------------------------------------------------------------------------
[docs]
class QueryResult(BaseModel):
"""One recording matched by :meth:`DataInterface.query`.
Named after the verb that produced it: ``query()`` is the single selection
surface, and ``get()`` / ``download()`` (ENG-890) consume a ``QueryResult``
without further narrowing arguments.
Phase 1a carries catalog projections only — no array bytes, no
``temporaldata.Data``. ``ModalityRow.raw_storage_uri`` is the cold-bucket
pointer :meth:`~ursa.DataInterface.get` reads from (originally ENG-890,
inlined in ENG-1119). ENG-899 (M3) replaces the ``modalities`` values
with array-bearing ``temporaldata`` subclasses once the processed store
exists.
``.modalities`` is narrowed to the ``modalities=[...]`` argument passed to
:meth:`DataInterface.query`. With ``modalities=None`` every modality
registered for the recording is present (an empty ``.modalities`` dict
here means the catalog has no modality rows for this recording — surfaced
as-is rather than hidden). When a non-``None`` filter is passed, the dict
keys are a non-empty subset of the requested list: recordings whose
intersection with the filter is empty are dropped from the result list
rather than returned with an empty ``.modalities`` dict.
Frozen for accidental-mutation safety; not hashable (the ``dict``/``list``
fields prevent it). Use ``recording_hash`` as the cache key.
"""
model_config = ConfigDict(frozen=True, extra="forbid")
recording_hash: CatalogID
participant_ids: list[CatalogID]
start_time: EpochNs
duration: timedelta
device_info: MetadataDict
metadata: MetadataDict
modalities: dict[ModalityName, ModalityRow]
@property
def start_time_dt(self) -> datetime:
"""Aware UTC datetime view of ``start_time`` (µs-lossy)."""
from ursa.time import ns_to_datetime
return ns_to_datetime(self.start_time)
[docs]
def __repr__(self) -> str:
mods = sorted(self.modalities.keys())
return (
f"QueryResult(recording_hash={self.recording_hash!r}, "
f"participants={self.participant_ids!r}, "
f"start={self.start_time_dt.isoformat()}, "
f"duration={self.duration!s}, "
f"modalities={mods!r})"
)
# Pydantic v2's BaseModel.__str__ formats fields as ``k=v`` without a
# class name, which would diverge from our compact ``__repr__``. Route
# ``str()`` through ``__repr__`` so the discovery surface renders
# identically in REPL / log output.
[docs]
def __str__(self) -> str:
return self.__repr__()
[docs]
def _to_row_dict(self) -> dict[str, Any]:
"""Single source-of-truth row projection used by both ``_repr_html_``
and :meth:`QueryResultList.to_dataframe`. Keeping it on the model
guarantees the two renderers never drift.
"""
return {
"recording_hash": self.recording_hash,
"participant_ids": list(self.participant_ids),
"start_time": self.start_time_dt,
"duration": self.duration,
"ingestion_status": _aggregate_ingestion_status(self.modalities),
"modalities": sorted(self.modalities.keys()),
}
[docs]
def _repr_html_(self) -> str:
import pandas as pd
return str(pd.DataFrame([self._to_row_dict()])._repr_html_())
[docs]
def _aggregate_ingestion_status(
modalities: dict[ModalityName, ModalityRow],
) -> str | None:
"""Aggregate ``ingestion_status`` across a recording's modalities for the
DataFrame projection. ``None`` (rendered as ``NaN`` in pandas) when the
modality set is empty — preserves the distinction between "no modality
rows" and "all processed" that :class:`QueryResult`'s docstring calls out.
"""
if not modalities:
return None
if any(m.ingestion_status is IngestionStatus.RAW for m in modalities.values()):
return "raw"
return "processed"
_DATAFRAME_COLUMNS: tuple[str, ...] = (
"recording_hash",
"participant_ids",
"start_time",
"duration",
"ingestion_status",
"modalities",
)
# ---------------------------------------------------------------------------
# QueryResultList
# ---------------------------------------------------------------------------
[docs]
class QueryResultList(list[QueryResult]):
"""List of :class:`QueryResult` with notebook/DataFrame rendering.
Returned by :meth:`DataInterface.query`. Subclasses :class:`list` so
existing ``list[QueryResult]`` annotations and ``isinstance(x, list)``
checks keep working unchanged. Slicing (``results[:5]``), concatenation
(``results + more``), :py:meth:`copy`, and multiplication preserve the
subtype so the render methods stay reachable.
``sorted(results, ...)`` is the one common path that drops back to a plain
:class:`list` (CPython implementation detail). Wrap with
``QueryResultList(sorted(results, ...))`` if you need ``.to_dataframe()``
on a sorted view.
"""
[docs]
def to_dataframe(self) -> pd.DataFrame:
"""Return a :class:`pandas.DataFrame` with one row per result.
Columns mirror :class:`QueryResult` attribute names so
``df.modalities`` and ``qr.modalities`` read the same. The
``ingestion_status`` column is aggregated across each recording's
modalities — ``"raw"`` if any are raw, else ``"processed"``,
``NaN`` if the recording has no modality rows.
"""
import pandas as pd
# M2 catalogs are small (<10k recordings); the full materialization
# per ``_repr_html_`` call is fine. When M3 catalog growth crosses
# ~10k rows, bound construction at N rows here — likely as part of
# the ENG-1066 / ENG-1088 metadata-pushdown work.
rows = [r._to_row_dict() for r in self]
return pd.DataFrame(rows, columns=list(_DATAFRAME_COLUMNS))
[docs]
def _repr_html_(self) -> str:
return str(self.to_dataframe()._repr_html_())
[docs]
def __repr__(self) -> str:
n = len(self)
word = "recording" if n == 1 else "recordings"
if n == 0:
return "QueryResultList(0 recordings)"
union: set[str] = set()
for r in self:
union.update(r.modalities.keys())
names = sorted(union)
if len(names) > 5:
inner = ", ".join(names[:5]) + ", ..."
else:
inner = ", ".join(names)
return f"QueryResultList({n} {word}, modalities={{{inner}}})"
# --- list-subclass type preservation -----------------------------------
# CPython's built-in slicing / concatenation / copy return plain ``list``,
# which would break ``results[:5].to_dataframe()``. Re-wrap explicitly.
@overload
def __getitem__(self, key: SupportsIndex) -> QueryResult: ...
@overload
def __getitem__(self, key: slice) -> QueryResultList: ...
[docs]
def __getitem__(self, key: SupportsIndex | slice) -> QueryResult | QueryResultList:
if isinstance(key, slice):
return QueryResultList(list.__getitem__(self, key))
return list.__getitem__(self, key)
[docs]
def __add__(self, other: list[QueryResult]) -> QueryResultList: # type: ignore[override]
return QueryResultList(list.__add__(self, list(other)))
[docs]
def __radd__(self, other: list[QueryResult]) -> QueryResultList:
return QueryResultList(list(other) + list(self))
[docs]
def __mul__(self, n: SupportsIndex) -> QueryResultList:
return QueryResultList(list.__mul__(self, n))
[docs]
def copy(self) -> QueryResultList:
return QueryResultList(self)
# ---------------------------------------------------------------------------
# QuerySpec
# ---------------------------------------------------------------------------
[docs]
class QuerySpec(BaseModel):
"""Pydantic spec for :meth:`DataInterface.query`.
Phase 1a — implemented over the raw catalog: ``participants``,
``modalities``, ``recording_hash``, ``metadata``.
Phase 1a — accepted at the signature level, raising
:class:`NotImplementedError` (referencing ENG-1082) when a matched
modality has ``ingestion_status="raw"``: ``time_range``,
``pipeline_version``, ``time_filters``, ``metadata_filters``,
``derived``.
The complex-list fields use ``list[Any]`` as a deliberate forward-compat
escape hatch; ENG-1082 will replace each with a typed model
(``AroundEvent | TimeWindow``, ``Filter``, ``DerivedSelector``).
"""
model_config = ConfigDict(frozen=True, extra="forbid")
# Phase 1a — implemented
participants: list[CatalogID] | None = None
modalities: list[ModalityName] | None = None
recording_hash: CatalogID | None = None
metadata: MetadataDict | None = None
# Phase 1a — type-stable, gated at execution time
time_range: tuple[EpochNs, EpochNs] | None = None
pipeline_version: str | None = None
time_filters: list[Any] = Field(default_factory=list)
metadata_filters: list[Any] = Field(default_factory=list)
derived: list[Any] = Field(default_factory=list)
[docs]
@model_validator(mode="after")
def _validate(self) -> Self:
if self.time_range is not None and self.time_range[1] <= self.time_range[0]:
raise ValueError(
f"time_range end ({self.time_range[1]}) must exceed start ({self.time_range[0]})"
)
return self
[docs]
def has_processed_kwarg(self) -> bool:
"""True iff any kwarg requiring the processed store is set."""
if self.time_range is not None or self.pipeline_version is not None:
return True
return bool(self.time_filters or self.metadata_filters or self.derived)