"""Materialize :class:`QueryResult` rows into in-memory :class:`ursa.Data`
objects (ENG-890).
Second verb in the M2 three-verb read API (``query`` → ``get`` →
``download``): ``query()`` selects, ``get()`` reads bytes into memory,
and ``download()`` (ENG-1091) writes them to disk. All filtering lives
in ``query()`` — ``get()`` accepts a :class:`QueryResult` (or iterable
of them) plus a single ``concat`` flag and nothing else.
Phase 1a (M2) — raw path only
-----------------------------
For each modality with ``ingestion_status="raw"``, list the segment
files under ``ModalityRow.raw_storage_uri`` (a prefix), fetch each
segment's bytes, and surface them on the returned ``Data`` as a
:class:`ursa.RawBytes` carrier under ``data.modalities[name]``. No
parsing, no time domain. ``data.metadata`` is the synthesized
:class:`RecordingRow` (ENG-1090 contract).
All raw formats — including ``RAW_VIDEO`` / ``RAW_AUDIO`` — materialize
in full. Multi-hour video recordings can easily exceed several GB;
memory-constrained callers should use :func:`ursa.download` instead.
Phase 1b / M3 — processed path
------------------------------
Deferred to ENG-1093. Once a modality has ``ingestion_status="processed"``,
the modality field becomes an array-bearing :class:`RegularTimeSeries` /
:class:`IrregularTimeSeries` subclass. ``get()`` currently raises
:class:`NotImplementedError` for processed inputs as a defensive guard;
the runtime path arrives with ENG-1093.
Concat
------
``concat=True`` over multiple recordings would require aligned time
domains, which only the processed path establishes — so the M2 stub
validates alignment (raises :class:`ValueError` on mismatched modality
sets) and then raises :class:`NotImplementedError` pointing at
ENG-1093. The single-input case ignores ``concat`` (ticket spec:
"single-input ergonomic case … regardless of ``concat``").
"""
from __future__ import annotations
from collections.abc import Iterable
from types import MappingProxyType
from typing import overload
from ursa.catalog.schemas import (
IngestionStatus,
ModalityRow,
RecordingRow,
)
from ursa.query import QueryResult
from ursa.raw import RawBytes
from ursa.store import ObjectStore, get_store, parse_storage_uri
from ursa.temporal import Data
__all__ = ["get"]
# Forward-reference markers used in error messages so the next-step
# ticket numbers live in one place.
_PROCESSED_PATH_TICKET = "ENG-1093"
#: Fields shared 1:1 between :class:`QueryResult` and :class:`RecordingRow`.
#: Explicit projection (vs. ``model_dump(exclude={"modalities"})``) so any
#: future field rename breaks here loudly, not silently inside Pydantic.
_RECORDING_ROW_FIELDS: tuple[str, ...] = (
"recording_hash",
"participant_ids",
"start_time",
"duration",
"device_info",
"metadata",
)
# ---------------------------------------------------------------------------
# Public surface
# ---------------------------------------------------------------------------
@overload
def get(target: QueryResult, *, concat: bool = False) -> Data: ...
@overload
def get(target: Iterable[QueryResult], *, concat: bool = False) -> list[Data] | Data: ...
[docs]
def get(
target: QueryResult | Iterable[QueryResult],
*,
concat: bool = False,
) -> Data | list[Data]:
"""Materialize ``target`` into in-memory :class:`ursa.Data` objects.
Parameters
----------
target
A single :class:`QueryResult` or any iterable of them. The
single-input form returns a single :class:`Data`; the iterable
form returns a ``list[Data]`` (including a one-element list when
the iterable has one item — disambiguation is by type, not by
length).
concat
When ``True`` and ``target`` is an iterable, concatenate the
per-recording outputs along the time axis. M2 only — alignment
is validated and ``NotImplementedError`` is raised because raw
modalities carry no time domain. When ``target`` is a single
:class:`QueryResult`, ``concat`` is ignored.
Returns
-------
Data or list[Data]
Each returned :class:`Data` has its ``metadata`` slot populated
with a :class:`RecordingRow` synthesized from the source
:class:`QueryResult`. Modality payloads are accessible as
``data.modalities[name]`` and are :class:`ursa.RawBytes` carriers
in M2.
Raises
------
NotImplementedError
If any matched modality has ``ingestion_status="processed"``
(deferred to ENG-1093) or if ``concat=True`` is requested over
an aligned iterable.
ValueError
If ``concat=True`` is requested over recordings with mismatched
modality sets.
Notes
-----
All raw formats — including ``RAW_VIDEO`` / ``RAW_AUDIO`` — are
materialized in full into memory. A multi-hour video recording can
easily exceed several GB; callers running on memory-constrained
hosts should use :func:`ursa.download` instead.
"""
if isinstance(target, QueryResult):
return _get_one(target)
results = [_get_one(qr) for qr in target]
if not concat:
return results
# concat path
_validate_alignment(results)
raise NotImplementedError(
f"concat over raw modalities is deferred to {_PROCESSED_PATH_TICKET} "
"(M3 processed path); cross-recording concat requires aligned time "
"domains, which only the processed path establishes."
)
# ---------------------------------------------------------------------------
# Internals
# ---------------------------------------------------------------------------
[docs]
def _get_one(qr: QueryResult) -> Data:
"""Build a single :class:`Data` from one :class:`QueryResult`."""
rec_row = _recording_row_from(qr)
modalities: dict[str, RawBytes] = {
name: _materialize_modality(name, mrow) for name, mrow in qr.modalities.items()
}
data = Data(metadata=rec_row)
# ``temporaldata.Data.__setattr__`` only blocks time-based types
# (Interval / IrregularTimeSeries / RegularTimeSeries / Data with
# domain) when the parent has no domain. ``MappingProxyType`` is a
# plain mapping and passes through. ``Data.keys()`` will report
# ``"modalities"`` as a child — intentional, since it is.
data.modalities = MappingProxyType(modalities)
return data
[docs]
def _recording_row_from(qr: QueryResult) -> RecordingRow:
"""Project ``qr`` onto :class:`RecordingRow`'s field set.
Uses an explicit field tuple so a future :class:`QueryResult` schema
change breaks here at validation time, not silently inside Pydantic.
"""
return RecordingRow.model_validate(
{field: getattr(qr, field) for field in _RECORDING_ROW_FIELDS}
)
[docs]
def _materialize_modality(name: str, mrow: ModalityRow) -> RawBytes:
"""Read all segment files under ``mrow.raw_storage_uri`` into a :class:`RawBytes`."""
# Defensive guard — processed modalities go through ENG-1093, not here.
if mrow.ingestion_status is not IngestionStatus.RAW:
raise NotImplementedError(
f"modality {name!r} on recording {mrow.recording_hash!r} has "
f"ingestion_status={mrow.ingestion_status.value!r}; processed-path "
f"read deferred to {_PROCESSED_PATH_TICKET}."
)
role, key_prefix = parse_storage_uri(mrow.raw_storage_uri)
store: ObjectStore = get_store(role)
# Explicit sort by key — do not rely on backend listing order.
# Lexicographic order matches what data-engine's per-modality segment
# naming convention produces (zero-padded sequential), so this also
# corresponds to upload order in practice.
metas = sorted(store.list(prefix=key_prefix), key=lambda m: m.key)
segments = tuple((meta.key, store.get(meta.key)) for meta in metas)
return RawBytes(segments=segments, metadata=mrow)
[docs]
def _validate_alignment(results: list[Data]) -> None:
"""Raise ``ValueError`` if recordings have mismatched modality sets."""
sets = [frozenset(d.modalities.keys()) for d in results]
if len(set(sets)) > 1:
raise ValueError(
f"concat requires aligned modalities across recordings; got "
f"{[sorted(s) for s in sets]!r}"
)