"""Top-level read API for Ursa (ENG-889 + ENG-1081).
Phase 1a (architecture v0.4 §7) ships a *raw-store* query: callers filter the
catalog by participant, modality, recording hash, and metadata equality, and
get back :class:`RecordingResult` objects pointing at whole-file raw modality
URIs. No Zarr, no temporal slicing, no ``temporaldata`` integration — those
arrive after Virgo's M2 ingestion node lands and are tracked by ENG-1082.
The advanced kwargs (``time_range``, ``pipeline_version``, ``time_filters``,
``metadata_filters``, ``derived``) are part of the long-term §3.5 surface and
are accepted at the type-system level so the public signature stays stable.
Passing any of them while a matched modality has ``ingestion_status="raw"``
raises :class:`NotImplementedError` with a pointer to ENG-1082. Same precedent
as :meth:`Catalog.delete` (ENG-1069).
The return type **diverges from §3.5's "list[temporaldata.Data]" wording.**
Phase 1a forbids ``temporaldata`` integration outright, and ``ModalityRow``
on a raw recording carries no domain (``domain_intervals`` is null until
ingestion). Wrapping a half-populated ``temporaldata.Data`` would be
type-lying; instead we return :class:`RecordingResult` carrying the catalog
projection. ENG-899 (M3) replaces the ``modalities`` values with
``temporaldata`` subclasses once the processed store exists, and the
existing wrapper composes naturally with that.
"""
from __future__ import annotations
import warnings
from collections.abc import Mapping
from datetime import timedelta
from typing import TYPE_CHECKING, Any
from pydantic import BaseModel, ConfigDict, Field, model_validator
from typing_extensions import Self
from ursa.catalog.schemas import (
CatalogID,
IngestionStatus,
MetadataDict,
ModalityName,
ModalityRow,
RecordingRow,
UTCDatetime,
)
if TYPE_CHECKING:
from ursa.catalog import Catalog
__all__ = ["QuerySpec", "RecordingResult", "query"]
# ENG-1082 covers all post-Virgo query features (temporal filters, streaming,
# pipeline_version joins). Referenced from the gate's NotImplementedError.
_BACKFILL_TICKET = "ENG-1082"
# Kwargs that require the processed store. Any non-None / non-empty value here
# combined with an ingestion_status="raw" modality trips the gate.
_PROCESSED_KWARGS = (
"time_range",
"pipeline_version",
"time_filters",
"metadata_filters",
"derived",
)
# ---------------------------------------------------------------------------
# Public models
# ---------------------------------------------------------------------------
[docs]
class RecordingResult(BaseModel):
"""One recording matched by :func:`ursa.query`.
Phase 1a carries catalog projections only — no array bytes, no
``temporaldata.Data``. ``ModalityRow.raw_storage_uri`` is the cold-bucket
pointer ENG-890 (`ursa.get`) reads from. ENG-899 (M3) replaces the
``modalities`` values with array-bearing ``temporaldata`` subclasses once
the processed store exists.
Frozen for accidental-mutation safety; not hashable (the ``dict``/``list``
fields prevent it). Use ``recording_hash`` as the cache key.
"""
model_config = ConfigDict(frozen=True, extra="forbid")
recording_hash: CatalogID
participant_ids: list[CatalogID]
start_time: UTCDatetime
duration: timedelta
device_info: MetadataDict
metadata: MetadataDict
modalities: dict[ModalityName, ModalityRow]
[docs]
class QuerySpec(BaseModel):
"""Pydantic spec for :func:`ursa.query`.
Phase 1a — implemented over the raw catalog: ``participants``, ``modalities``,
``recording_hash``, ``metadata``.
Phase 1a — accepted at the signature level, raising
:class:`NotImplementedError` (referencing ENG-1082) when a matched modality
has ``ingestion_status="raw"``: ``time_range``, ``pipeline_version``,
``time_filters``, ``metadata_filters``, ``derived``.
The complex-list fields use ``list[Any]`` as a deliberate forward-compat
escape hatch; ENG-1082 will replace each with a typed model
(``AroundEvent | TimeWindow``, ``Filter``, ``DerivedSelector``).
"""
model_config = ConfigDict(frozen=True, extra="forbid")
# Phase 1a — implemented
participants: list[CatalogID] | None = None
modalities: list[ModalityName] | None = None
recording_hash: CatalogID | None = None
metadata: MetadataDict | None = None
# Phase 1a — type-stable, gated at execution time
time_range: tuple[UTCDatetime, UTCDatetime] | None = None
pipeline_version: str | None = None
time_filters: list[Any] = Field(default_factory=list)
metadata_filters: list[Any] = Field(default_factory=list)
derived: list[Any] = Field(default_factory=list)
[docs]
@model_validator(mode="after")
def _validate(self) -> Self:
if self.time_range is not None and self.time_range[1] <= self.time_range[0]:
raise ValueError(
f"time_range end ({self.time_range[1]}) must exceed start ({self.time_range[0]})"
)
return self
[docs]
def has_processed_kwarg(self) -> bool:
"""True iff any kwarg requiring the processed store is set."""
if self.time_range is not None or self.pipeline_version is not None:
return True
return bool(self.time_filters or self.metadata_filters or self.derived)
# ---------------------------------------------------------------------------
# query()
# ---------------------------------------------------------------------------
[docs]
def query(
spec: QuerySpec | None = None,
*,
catalog: Catalog,
participants: list[CatalogID] | None = None,
modalities: list[ModalityName] | None = None,
recording_hash: CatalogID | None = None,
metadata: MetadataDict | None = None,
time_range: tuple[UTCDatetime, UTCDatetime] | None = None,
pipeline_version: str | None = None,
time_filters: list[Any] | None = None,
metadata_filters: list[Any] | None = None,
derived: list[Any] | None = None,
limit: int | None = None,
) -> list[RecordingResult]:
"""Filter the Ursa catalog and return matching recordings.
Two surfaces:
1. **Common case** — pass kwargs directly: ``ursa.query(catalog=cat,
participants=[...], modalities=[...])``. The kwargs auto-validate
against :class:`QuerySpec`.
2. **Complex case** — build a :class:`QuerySpec` and pass it as the first
positional arg: ``ursa.query(spec, catalog=cat)``.
Phase 1a (architecture v0.4) returns :class:`RecordingResult` carrying
catalog projections only — no array bytes. Use :func:`ursa.get` (ENG-890)
to read the raw modality file. Temporal kwargs and ``pipeline_version``
raise :class:`NotImplementedError` when a matched modality has
``ingestion_status="raw"``; the green path ships with ENG-1082 once Virgo
M2's ingestion node populates the processed store.
Parameters
----------
spec
Pre-built :class:`QuerySpec`. If given, all filter kwargs must be
unset (``catalog`` and ``limit`` may still be passed); else
:class:`TypeError`.
catalog
Required. Use ``Catalog.local(...)`` for tests / scripting or
``Catalog.from_store(get_store(...))`` for R2-backed catalogs.
limit
Caps the returned list. **Not** a scan cap — when ``metadata=`` is
also set we scan the full recordings table and truncate at the end.
ENG-1088 lifts this once Lance MapType pushdown lands (ENG-1066).
Returns
-------
list[RecordingResult]
Empty list if nothing matches. An empty query (no filters, no
``limit``) returns every recording in the catalog — caller's
responsibility to gate that on production-scale catalogs.
"""
if spec is not None:
_reject_kwargs_with_spec(
participants=participants,
modalities=modalities,
recording_hash=recording_hash,
metadata=metadata,
time_range=time_range,
pipeline_version=pipeline_version,
time_filters=time_filters,
metadata_filters=metadata_filters,
derived=derived,
)
else:
spec = QuerySpec(
participants=participants,
modalities=modalities,
recording_hash=recording_hash,
metadata=metadata,
time_range=time_range,
pipeline_version=pipeline_version,
time_filters=time_filters or [],
metadata_filters=metadata_filters or [],
derived=derived or [],
)
if spec.metadata is not None and limit is not None:
warnings.warn(
"ursa.query(metadata=..., limit=...) performs a full catalog scan in M2; "
"metadata pushdown lands with ENG-1066, full-scan removal with ENG-1088.",
UserWarning,
stacklevel=2,
)
# Step 1: catalog pushdown for the safe scalar predicate (recording_hash).
# participants/modalities/metadata are NOT pushed down: participant_ids is
# a list column (ENG-1087), modalities lives on a different table, and
# metadata pushdown is ENG-1066.
recording_where: dict[str, Any] = {}
if spec.recording_hash is not None:
recording_where["recording_hash"] = spec.recording_hash
recordings = catalog.list_recordings(where=recording_where or None)
# Step 2: Python-side filters on recordings.
if spec.participants is not None:
wanted = set(spec.participants)
recordings = [r for r in recordings if wanted.intersection(r.participant_ids)]
if spec.metadata is not None:
recordings = [r for r in recordings if _metadata_matches(r.metadata, spec.metadata)]
if not recordings:
return []
# Step 3: modality scan + Phase-1a temporal/processed gate.
by_recording = _collect_modalities(catalog, recordings, spec.modalities)
# Drop recordings whose modality set is empty after the optional
# ``modalities=`` filter; that's a "no match" signal, not a half-result.
if spec.modalities is not None:
recordings = [r for r in recordings if by_recording.get(r.recording_hash)]
if spec.has_processed_kwarg():
_enforce_processed_gate(spec, by_recording)
# Step 4: hydrate and truncate.
results = [
RecordingResult(
recording_hash=r.recording_hash,
participant_ids=list(r.participant_ids),
start_time=r.start_time,
duration=r.duration,
device_info=dict(r.device_info),
metadata=dict(r.metadata),
modalities={m.modality: m for m in by_recording.get(r.recording_hash, [])},
)
for r in recordings
]
if limit is not None:
results = results[:limit]
return results
# ---------------------------------------------------------------------------
# Internals
# ---------------------------------------------------------------------------
_FILTER_KWARG_NAMES = (
"participants",
"modalities",
"recording_hash",
"metadata",
"time_range",
"pipeline_version",
"time_filters",
"metadata_filters",
"derived",
)
[docs]
def _reject_kwargs_with_spec(**kwargs: Any) -> None:
"""If ``spec`` is supplied, every filter kwarg must be unset.
Mixing the two would force us to merge a spec with kwargs, and the merge
semantics (override vs. union) are unobvious enough that being strict is
cheaper than picking a rule.
"""
set_kwargs = [name for name in _FILTER_KWARG_NAMES if kwargs.get(name) is not None]
if set_kwargs:
raise TypeError(
f"ursa.query(spec=..., {set_kwargs[0]}=...): pass either a QuerySpec or "
f"individual filter kwargs, not both. Offending kwargs: {set_kwargs!r}."
)
[docs]
def _collect_modalities(
catalog: Catalog,
recordings: list[RecordingRow],
modality_filter: list[ModalityName] | None,
) -> dict[CatalogID, list[ModalityRow]]:
"""Fetch the modality rows for ``recordings``, grouped by recording_hash.
Single ``list_modalities`` call (no batching in Phase 1a; ENG-1089 covers
growth). Empty input short-circuits to avoid an empty-IN-list edge case.
"""
if not recordings:
return {}
where: dict[str, Any] = {"recording_hash": [r.recording_hash for r in recordings]}
if modality_filter is not None:
where["modality"] = list(modality_filter)
rows = catalog.list_modalities(where=where)
grouped: dict[CatalogID, list[ModalityRow]] = {}
for row in rows:
grouped.setdefault(row.recording_hash, []).append(row)
return grouped
[docs]
def _enforce_processed_gate(
spec: QuerySpec,
by_recording: dict[CatalogID, list[ModalityRow]],
) -> None:
"""Raise if any matched modality is still raw and the spec asked for
processed-only behavior. The first offending (modality, recording, kwarg)
is reported so the message points at a concrete row.
"""
offending_kwargs = _processed_kwargs_set(spec)
for hash_, rows in by_recording.items():
for row in rows:
if row.ingestion_status is IngestionStatus.RAW:
kwarg = next(iter(offending_kwargs))
raise NotImplementedError(
f"ursa.query({kwarg}=...) requires processed modalities; "
f"modality {row.modality!r} on recording {hash_!r} has "
f"ingestion_status='raw'. Backfilled by {_BACKFILL_TICKET} "
"once Virgo M2 ships the ingestion node."
)
[docs]
def _processed_kwargs_set(spec: QuerySpec) -> list[str]:
"""Names of the processed-only kwargs that are non-None / non-empty.
Truthiness uniformly handles both shapes ``_PROCESSED_KWARGS`` covers:
``None``-or-value scalars (``time_range``, ``pipeline_version``) and
list defaults (``time_filters``, ``metadata_filters``, ``derived``).
"""
return [k for k in _PROCESSED_KWARGS if getattr(spec, k)]