Source code for ursa.query

"""Top-level read API for Ursa (ENG-889 + ENG-1081).

Phase 1a (architecture v0.4 §7) ships a *raw-store* query: callers filter the
catalog by participant, modality, recording hash, and metadata equality, and
get back :class:`RecordingResult` objects pointing at whole-file raw modality
URIs. No Zarr, no temporal slicing, no ``temporaldata`` integration — those
arrive after Virgo's M2 ingestion node lands and are tracked by ENG-1082.

The advanced kwargs (``time_range``, ``pipeline_version``, ``time_filters``,
``metadata_filters``, ``derived``) are part of the long-term §3.5 surface and
are accepted at the type-system level so the public signature stays stable.
Passing any of them while a matched modality has ``ingestion_status="raw"``
raises :class:`NotImplementedError` with a pointer to ENG-1082. Same precedent
as :meth:`Catalog.delete` (ENG-1069).

The return type **diverges from §3.5's "list[temporaldata.Data]" wording.**
Phase 1a forbids ``temporaldata`` integration outright, and ``ModalityRow``
on a raw recording carries no domain (``domain_intervals`` is null until
ingestion). Wrapping a half-populated ``temporaldata.Data`` would be
type-lying; instead we return :class:`RecordingResult` carrying the catalog
projection. ENG-899 (M3) replaces the ``modalities`` values with
``temporaldata`` subclasses once the processed store exists, and the
existing wrapper composes naturally with that.
"""

from __future__ import annotations

import warnings
from collections.abc import Mapping
from datetime import timedelta
from typing import TYPE_CHECKING, Any

from pydantic import BaseModel, ConfigDict, Field, model_validator
from typing_extensions import Self

from ursa.catalog.schemas import (
    CatalogID,
    IngestionStatus,
    MetadataDict,
    ModalityName,
    ModalityRow,
    RecordingRow,
    UTCDatetime,
)

if TYPE_CHECKING:
    from ursa.catalog import Catalog


__all__ = ["QuerySpec", "RecordingResult", "query"]


# ENG-1082 covers all post-Virgo query features (temporal filters, streaming,
# pipeline_version joins). Referenced from the gate's NotImplementedError.
_BACKFILL_TICKET = "ENG-1082"

# Kwargs that require the processed store. Any non-None / non-empty value here
# combined with an ingestion_status="raw" modality trips the gate.
_PROCESSED_KWARGS = (
    "time_range",
    "pipeline_version",
    "time_filters",
    "metadata_filters",
    "derived",
)


# ---------------------------------------------------------------------------
# Public models
# ---------------------------------------------------------------------------


[docs] class RecordingResult(BaseModel): """One recording matched by :func:`ursa.query`. Phase 1a carries catalog projections only — no array bytes, no ``temporaldata.Data``. ``ModalityRow.raw_storage_uri`` is the cold-bucket pointer ENG-890 (`ursa.get`) reads from. ENG-899 (M3) replaces the ``modalities`` values with array-bearing ``temporaldata`` subclasses once the processed store exists. Frozen for accidental-mutation safety; not hashable (the ``dict``/``list`` fields prevent it). Use ``recording_hash`` as the cache key. """ model_config = ConfigDict(frozen=True, extra="forbid") recording_hash: CatalogID participant_ids: list[CatalogID] start_time: UTCDatetime duration: timedelta device_info: MetadataDict metadata: MetadataDict modalities: dict[ModalityName, ModalityRow]
[docs] class QuerySpec(BaseModel): """Pydantic spec for :func:`ursa.query`. Phase 1a — implemented over the raw catalog: ``participants``, ``modalities``, ``recording_hash``, ``metadata``. Phase 1a — accepted at the signature level, raising :class:`NotImplementedError` (referencing ENG-1082) when a matched modality has ``ingestion_status="raw"``: ``time_range``, ``pipeline_version``, ``time_filters``, ``metadata_filters``, ``derived``. The complex-list fields use ``list[Any]`` as a deliberate forward-compat escape hatch; ENG-1082 will replace each with a typed model (``AroundEvent | TimeWindow``, ``Filter``, ``DerivedSelector``). """ model_config = ConfigDict(frozen=True, extra="forbid") # Phase 1a — implemented participants: list[CatalogID] | None = None modalities: list[ModalityName] | None = None recording_hash: CatalogID | None = None metadata: MetadataDict | None = None # Phase 1a — type-stable, gated at execution time time_range: tuple[UTCDatetime, UTCDatetime] | None = None pipeline_version: str | None = None time_filters: list[Any] = Field(default_factory=list) metadata_filters: list[Any] = Field(default_factory=list) derived: list[Any] = Field(default_factory=list)
[docs] @model_validator(mode="after") def _validate(self) -> Self: if self.time_range is not None and self.time_range[1] <= self.time_range[0]: raise ValueError( f"time_range end ({self.time_range[1]}) must exceed start ({self.time_range[0]})" ) return self
[docs] def has_processed_kwarg(self) -> bool: """True iff any kwarg requiring the processed store is set.""" if self.time_range is not None or self.pipeline_version is not None: return True return bool(self.time_filters or self.metadata_filters or self.derived)
# --------------------------------------------------------------------------- # query() # ---------------------------------------------------------------------------
[docs] def query( spec: QuerySpec | None = None, *, catalog: Catalog, participants: list[CatalogID] | None = None, modalities: list[ModalityName] | None = None, recording_hash: CatalogID | None = None, metadata: MetadataDict | None = None, time_range: tuple[UTCDatetime, UTCDatetime] | None = None, pipeline_version: str | None = None, time_filters: list[Any] | None = None, metadata_filters: list[Any] | None = None, derived: list[Any] | None = None, limit: int | None = None, ) -> list[RecordingResult]: """Filter the Ursa catalog and return matching recordings. Two surfaces: 1. **Common case** — pass kwargs directly: ``ursa.query(catalog=cat, participants=[...], modalities=[...])``. The kwargs auto-validate against :class:`QuerySpec`. 2. **Complex case** — build a :class:`QuerySpec` and pass it as the first positional arg: ``ursa.query(spec, catalog=cat)``. Phase 1a (architecture v0.4) returns :class:`RecordingResult` carrying catalog projections only — no array bytes. Use :func:`ursa.get` (ENG-890) to read the raw modality file. Temporal kwargs and ``pipeline_version`` raise :class:`NotImplementedError` when a matched modality has ``ingestion_status="raw"``; the green path ships with ENG-1082 once Virgo M2's ingestion node populates the processed store. Parameters ---------- spec Pre-built :class:`QuerySpec`. If given, all filter kwargs must be unset (``catalog`` and ``limit`` may still be passed); else :class:`TypeError`. catalog Required. Use ``Catalog.local(...)`` for tests / scripting or ``Catalog.from_store(get_store(...))`` for R2-backed catalogs. limit Caps the returned list. **Not** a scan cap — when ``metadata=`` is also set we scan the full recordings table and truncate at the end. ENG-1088 lifts this once Lance MapType pushdown lands (ENG-1066). Returns ------- list[RecordingResult] Empty list if nothing matches. An empty query (no filters, no ``limit``) returns every recording in the catalog — caller's responsibility to gate that on production-scale catalogs. """ if spec is not None: _reject_kwargs_with_spec( participants=participants, modalities=modalities, recording_hash=recording_hash, metadata=metadata, time_range=time_range, pipeline_version=pipeline_version, time_filters=time_filters, metadata_filters=metadata_filters, derived=derived, ) else: spec = QuerySpec( participants=participants, modalities=modalities, recording_hash=recording_hash, metadata=metadata, time_range=time_range, pipeline_version=pipeline_version, time_filters=time_filters or [], metadata_filters=metadata_filters or [], derived=derived or [], ) if spec.metadata is not None and limit is not None: warnings.warn( "ursa.query(metadata=..., limit=...) performs a full catalog scan in M2; " "metadata pushdown lands with ENG-1066, full-scan removal with ENG-1088.", UserWarning, stacklevel=2, ) # Step 1: catalog pushdown for the safe scalar predicate (recording_hash). # participants/modalities/metadata are NOT pushed down: participant_ids is # a list column (ENG-1087), modalities lives on a different table, and # metadata pushdown is ENG-1066. recording_where: dict[str, Any] = {} if spec.recording_hash is not None: recording_where["recording_hash"] = spec.recording_hash recordings = catalog.list_recordings(where=recording_where or None) # Step 2: Python-side filters on recordings. if spec.participants is not None: wanted = set(spec.participants) recordings = [r for r in recordings if wanted.intersection(r.participant_ids)] if spec.metadata is not None: recordings = [r for r in recordings if _metadata_matches(r.metadata, spec.metadata)] if not recordings: return [] # Step 3: modality scan + Phase-1a temporal/processed gate. by_recording = _collect_modalities(catalog, recordings, spec.modalities) # Drop recordings whose modality set is empty after the optional # ``modalities=`` filter; that's a "no match" signal, not a half-result. if spec.modalities is not None: recordings = [r for r in recordings if by_recording.get(r.recording_hash)] if spec.has_processed_kwarg(): _enforce_processed_gate(spec, by_recording) # Step 4: hydrate and truncate. results = [ RecordingResult( recording_hash=r.recording_hash, participant_ids=list(r.participant_ids), start_time=r.start_time, duration=r.duration, device_info=dict(r.device_info), metadata=dict(r.metadata), modalities={m.modality: m for m in by_recording.get(r.recording_hash, [])}, ) for r in recordings ] if limit is not None: results = results[:limit] return results
# --------------------------------------------------------------------------- # Internals # --------------------------------------------------------------------------- _FILTER_KWARG_NAMES = ( "participants", "modalities", "recording_hash", "metadata", "time_range", "pipeline_version", "time_filters", "metadata_filters", "derived", )
[docs] def _reject_kwargs_with_spec(**kwargs: Any) -> None: """If ``spec`` is supplied, every filter kwarg must be unset. Mixing the two would force us to merge a spec with kwargs, and the merge semantics (override vs. union) are unobvious enough that being strict is cheaper than picking a rule. """ set_kwargs = [name for name in _FILTER_KWARG_NAMES if kwargs.get(name) is not None] if set_kwargs: raise TypeError( f"ursa.query(spec=..., {set_kwargs[0]}=...): pass either a QuerySpec or " f"individual filter kwargs, not both. Offending kwargs: {set_kwargs!r}." )
[docs] def _metadata_matches(actual: Mapping[str, Any], expected: Mapping[str, Any]) -> bool: """Equality on every key in ``expected``; missing keys do not match.""" return all(actual.get(k) == v for k, v in expected.items())
[docs] def _collect_modalities( catalog: Catalog, recordings: list[RecordingRow], modality_filter: list[ModalityName] | None, ) -> dict[CatalogID, list[ModalityRow]]: """Fetch the modality rows for ``recordings``, grouped by recording_hash. Single ``list_modalities`` call (no batching in Phase 1a; ENG-1089 covers growth). Empty input short-circuits to avoid an empty-IN-list edge case. """ if not recordings: return {} where: dict[str, Any] = {"recording_hash": [r.recording_hash for r in recordings]} if modality_filter is not None: where["modality"] = list(modality_filter) rows = catalog.list_modalities(where=where) grouped: dict[CatalogID, list[ModalityRow]] = {} for row in rows: grouped.setdefault(row.recording_hash, []).append(row) return grouped
[docs] def _enforce_processed_gate( spec: QuerySpec, by_recording: dict[CatalogID, list[ModalityRow]], ) -> None: """Raise if any matched modality is still raw and the spec asked for processed-only behavior. The first offending (modality, recording, kwarg) is reported so the message points at a concrete row. """ offending_kwargs = _processed_kwargs_set(spec) for hash_, rows in by_recording.items(): for row in rows: if row.ingestion_status is IngestionStatus.RAW: kwarg = next(iter(offending_kwargs)) raise NotImplementedError( f"ursa.query({kwarg}=...) requires processed modalities; " f"modality {row.modality!r} on recording {hash_!r} has " f"ingestion_status='raw'. Backfilled by {_BACKFILL_TICKET} " "once Virgo M2 ships the ingestion node." )
[docs] def _processed_kwargs_set(spec: QuerySpec) -> list[str]: """Names of the processed-only kwargs that are non-None / non-empty. Truthiness uniformly handles both shapes ``_PROCESSED_KWARGS`` covers: ``None``-or-value scalars (``time_range``, ``pipeline_version``) and list defaults (``time_filters``, ``metadata_filters``, ``derived``). """ return [k for k in _PROCESSED_KWARGS if getattr(spec, k)]