Source code for ursa.get

"""Materialize :class:`QueryResult` rows into in-memory :class:`ursa.Data`
objects (ENG-890).

Second verb in the M2 three-verb read API (``query`` → ``get`` →
``download``): ``query()`` selects, ``get()`` reads bytes into memory,
and ``download()`` (ENG-1091) writes them to disk. All filtering lives
in ``query()`` — ``get()`` accepts a :class:`QueryResult` (or iterable
of them) plus a single ``concat`` flag and nothing else.

Phase 1a (M2) — raw path only
-----------------------------

For each modality with ``ingestion_status="raw"``, list the segment
files under ``ModalityRow.raw_storage_uri`` (a prefix), fetch each
segment's bytes, and surface them on the returned ``Data`` as a
:class:`ursa.RawBytes` carrier under ``data.modalities[name]``. No
parsing, no time domain. ``data.metadata`` is the synthesized
:class:`RecordingRow` (ENG-1090 contract).

All raw formats — including ``RAW_VIDEO`` / ``RAW_AUDIO`` — materialize
in full. Multi-hour video recordings can easily exceed several GB;
memory-constrained callers should use :func:`ursa.download` instead.

Phase 1b / M3 — processed path
------------------------------

Deferred to ENG-1093. Once a modality has ``ingestion_status="processed"``,
the modality field becomes an array-bearing :class:`RegularTimeSeries` /
:class:`IrregularTimeSeries` subclass. ``get()`` currently raises
:class:`NotImplementedError` for processed inputs as a defensive guard;
the runtime path arrives with ENG-1093.

Concat
------

``concat=True`` over multiple recordings would require aligned time
domains, which only the processed path establishes — so the M2 stub
validates alignment (raises :class:`ValueError` on mismatched modality
sets) and then raises :class:`NotImplementedError` pointing at
ENG-1093. The single-input case ignores ``concat`` (ticket spec:
"single-input ergonomic case … regardless of ``concat``").
"""

from __future__ import annotations

from collections.abc import Iterable
from types import MappingProxyType
from typing import overload

from ursa.catalog.schemas import (
    IngestionStatus,
    ModalityRow,
    RecordingRow,
)
from ursa.query import QueryResult
from ursa.raw import RawBytes
from ursa.store import ObjectStore, get_store, parse_storage_uri
from ursa.temporal import Data

__all__ = ["get"]


# Forward-reference markers used in error messages so the next-step
# ticket numbers live in one place.
_PROCESSED_PATH_TICKET = "ENG-1093"


#: Fields shared 1:1 between :class:`QueryResult` and :class:`RecordingRow`.
#: Explicit projection (vs. ``model_dump(exclude={"modalities"})``) so any
#: future field rename breaks here loudly, not silently inside Pydantic.
_RECORDING_ROW_FIELDS: tuple[str, ...] = (
    "recording_hash",
    "participant_ids",
    "start_time",
    "duration",
    "device_info",
    "metadata",
)


# ---------------------------------------------------------------------------
# Public surface
# ---------------------------------------------------------------------------


@overload
def get(target: QueryResult, *, concat: bool = False) -> Data: ...
@overload
def get(target: Iterable[QueryResult], *, concat: bool = False) -> list[Data] | Data: ...
[docs] def get( target: QueryResult | Iterable[QueryResult], *, concat: bool = False, ) -> Data | list[Data]: """Materialize ``target`` into in-memory :class:`ursa.Data` objects. Parameters ---------- target A single :class:`QueryResult` or any iterable of them. The single-input form returns a single :class:`Data`; the iterable form returns a ``list[Data]`` (including a one-element list when the iterable has one item — disambiguation is by type, not by length). concat When ``True`` and ``target`` is an iterable, concatenate the per-recording outputs along the time axis. M2 only — alignment is validated and ``NotImplementedError`` is raised because raw modalities carry no time domain. When ``target`` is a single :class:`QueryResult`, ``concat`` is ignored. Returns ------- Data or list[Data] Each returned :class:`Data` has its ``metadata`` slot populated with a :class:`RecordingRow` synthesized from the source :class:`QueryResult`. Modality payloads are accessible as ``data.modalities[name]`` and are :class:`ursa.RawBytes` carriers in M2. Raises ------ NotImplementedError If any matched modality has ``ingestion_status="processed"`` (deferred to ENG-1093) or if ``concat=True`` is requested over an aligned iterable. ValueError If ``concat=True`` is requested over recordings with mismatched modality sets. Notes ----- All raw formats — including ``RAW_VIDEO`` / ``RAW_AUDIO`` — are materialized in full into memory. A multi-hour video recording can easily exceed several GB; callers running on memory-constrained hosts should use :func:`ursa.download` instead. """ if isinstance(target, QueryResult): return _get_one(target) results = [_get_one(qr) for qr in target] if not concat: return results # concat path _validate_alignment(results) raise NotImplementedError( f"concat over raw modalities is deferred to {_PROCESSED_PATH_TICKET} " "(M3 processed path); cross-recording concat requires aligned time " "domains, which only the processed path establishes." )
# --------------------------------------------------------------------------- # Internals # ---------------------------------------------------------------------------
[docs] def _get_one(qr: QueryResult) -> Data: """Build a single :class:`Data` from one :class:`QueryResult`.""" rec_row = _recording_row_from(qr) modalities: dict[str, RawBytes] = { name: _materialize_modality(name, mrow) for name, mrow in qr.modalities.items() } data = Data(metadata=rec_row) # ``temporaldata.Data.__setattr__`` only blocks time-based types # (Interval / IrregularTimeSeries / RegularTimeSeries / Data with # domain) when the parent has no domain. ``MappingProxyType`` is a # plain mapping and passes through. ``Data.keys()`` will report # ``"modalities"`` as a child — intentional, since it is. data.modalities = MappingProxyType(modalities) return data
[docs] def _recording_row_from(qr: QueryResult) -> RecordingRow: """Project ``qr`` onto :class:`RecordingRow`'s field set. Uses an explicit field tuple so a future :class:`QueryResult` schema change breaks here at validation time, not silently inside Pydantic. """ return RecordingRow.model_validate( {field: getattr(qr, field) for field in _RECORDING_ROW_FIELDS} )
[docs] def _materialize_modality(name: str, mrow: ModalityRow) -> RawBytes: """Read all segment files under ``mrow.raw_storage_uri`` into a :class:`RawBytes`.""" # Defensive guard — processed modalities go through ENG-1093, not here. if mrow.ingestion_status is not IngestionStatus.RAW: raise NotImplementedError( f"modality {name!r} on recording {mrow.recording_hash!r} has " f"ingestion_status={mrow.ingestion_status.value!r}; processed-path " f"read deferred to {_PROCESSED_PATH_TICKET}." ) role, key_prefix = parse_storage_uri(mrow.raw_storage_uri) store: ObjectStore = get_store(role) # Explicit sort by key — do not rely on backend listing order. # Lexicographic order matches what data-engine's per-modality segment # naming convention produces (zero-padded sequential), so this also # corresponds to upload order in practice. metas = sorted(store.list(prefix=key_prefix), key=lambda m: m.key) segments = tuple((meta.key, store.get(meta.key)) for meta in metas) return RawBytes(segments=segments, metadata=mrow)
[docs] def _validate_alignment(results: list[Data]) -> None: """Raise ``ValueError`` if recordings have mismatched modality sets.""" sets = [frozenset(d.modalities.keys()) for d in results] if len(set(sets)) > 1: raise ValueError( f"concat requires aligned modalities across recordings; got " f"{[sorted(s) for s in sets]!r}" )