Source code for ursa.data_interface

"""User-facing class surface for Ursa (ENG-1108 / ENG-1119 / ENG-1120).

:class:`DataInterface` packages the three M2 read verbs (query / get /
download), the four register write verbs (participant / recording /
modality / event), and the typed per-table ``list_<table>`` /
``get_<table>`` lookups behind a single object that owns its
:class:`~ursa.catalog.Catalog` handles.

Credential isolation
--------------------

A fresh :class:`DataInterface` opens against the ``assets_ro``
credential role only. Calling any ``register_*`` method before
:meth:`enable_writes` raises :class:`~ursa.catalog.WritesNotEnabled` —
production callers must explicitly request the ``assets_rw`` role.
:meth:`enable_writes` lazily builds a second :class:`Catalog` handle
against ``assets_rw``. The public read surface (``query``, ``list_*``,
``get_*``, :attr:`catalog`) always uses the RO handle. ``register_*``
runs on the RW handle — including its internal idempotency ``get_*``
lookup, so the check-and-write happen under a single credential and
the read-modify-write race window stays tight.

The credential-boundary story is **R2-only**. ``profile="local"`` has
no credential boundary to enforce: the local lance dataset has a
single on-disk URI, so :meth:`enable_writes` is just a permission flag
flip — reads and writes share the same handle, which
:meth:`_write_catalog` resolves by falling back to :attr:`catalog`
when ``_rw_catalog`` is ``None``. Tests treat local as the fast
fixture for the idempotency / diff machinery; the credential split is
exercised against R2 stubs in ``tests/test_data_interface.py``.

Lazy construction
-----------------

R2 profiles do not resolve 1Password credentials at construction; the
underlying :class:`Catalog` is built on first access to :attr:`catalog`
(reads) or :meth:`enable_writes` (writes). Offline / sandboxed
environments (CI, doc builds) can import and instantiate
:class:`DataInterface` without an ``op`` session. Argument validation
(``profile`` value, ``path`` constraints) runs eagerly so caller
mistakes surface immediately.

File layout
-----------

* DataInterface class + public API
* CatalogWriter Protocol (private, test seam for tests/register/conftest.py)
* Query implementation (inlined from former ursa.query)
* Get implementation (inlined from former ursa.get)
* Download implementation (inlined from former ursa.download)
* Register implementations (inlined from former ursa.register)
"""

from __future__ import annotations

import os
import warnings
from collections.abc import Iterable, Mapping
from datetime import datetime, timedelta
from pathlib import Path
from types import MappingProxyType
from typing import Any, Literal, NamedTuple, Protocol, cast, overload, runtime_checkable
from urllib.parse import urlparse

from ursa._query_types import QueryResult, QueryResultList, QuerySpec
from ursa.catalog import (
    BenchmarkResultRow,
    BenchmarkSuiteRow,
    Catalog,
    CatalogRow,
    CatalogRowExists,
    CheckpointRow,
    EmbeddingRow,
    EventRow,
    IngestionStatus,
    ModalityRow,
    ParticipantRow,
    RecordingRow,
    StorageFormat,
    VirgoAssetRow,
    WritesNotEnabled,
)
from ursa.catalog.schemas import CatalogID, EpochNs, MetadataDict, ModalityName
from ursa.layout import active_buckets, validate_storage_uri
from ursa.raw import RawBytes
from ursa.store import ObjectStore, get_store, parse_storage_uri
from ursa.temporal import Data

__all__ = ["DataInterface"]


_Profile = Literal["r2", "r2-test", "local"]


# ENG-1082 covers all post-Virgo query features (temporal filters, streaming,
# pipeline_version joins). Referenced from the query() gate's NotImplementedError.
_BACKFILL_TICKET = "ENG-1082"

# Kwargs that require the processed store. Any non-None / non-empty value here
# combined with an ingestion_status="raw" modality trips the gate.
_PROCESSED_KWARGS = (
    "time_range",
    "pipeline_version",
    "time_filters",
    "metadata_filters",
    "derived",
)

# Forward-reference markers used in error messages so the next-step
# ticket numbers live in one place.
_PROCESSED_PATH_TICKET = "ENG-1093"

_LayoutMode = Literal["by_recording", "by_modality", "flat"]
_VALID_LAYOUTS: frozenset[str] = frozenset({"by_recording", "by_modality", "flat"})

# Chunk size for the streaming copy. 1 MiB balances syscall overhead
# against memory pressure; even a 10 GB video walks 10240 chunks.
_STREAM_CHUNK = 1 << 20


#: Fields shared 1:1 between :class:`QueryResult` and :class:`RecordingRow`.
#: Explicit projection so any future field rename breaks here loudly.
_RECORDING_ROW_FIELDS: tuple[str, ...] = (
    "recording_hash",
    "participant_ids",
    "start_time",
    "duration",
    "device_info",
    "metadata",
)


[docs] class DataInterface: """Class-based entry point for Ursa's read + register API. Construction validates arguments but defers credential resolution (R2 profiles) until first use. Call :meth:`enable_writes` once before invoking any ``register_*`` method; that promotion is idempotent. Parameters ---------- profile ``"r2"`` (default) for the production catalog, ``"r2-test"`` for the testing bucket, ``"local"`` for an on-disk lance directory. Typed ``str`` (not ``Literal``) so callers can pass values read from ``os.environ`` / config files without a ``cast`` or ``# type: ignore`` — invalid strings raise :class:`ValueError` at construction. path Required when ``profile="local"``. Forbidden for R2 profiles — their URI is resolved from the configured object store. """ def __init__( self, profile: str = "r2", *, path: str | Path | None = None, ) -> None: if profile == "local": if path is None: raise ValueError( "profile='local' requires path= (catalog root on local disk); " "use DataInterface(profile='local', path=...)" ) elif profile in ("r2", "r2-test"): if path is not None: raise ValueError( f"profile={profile!r} resolves its lancedb URI from the object " f"store; path= is only valid with profile='local'" ) else: raise ValueError(f"profile must be one of 'r2', 'r2-test', 'local'; got {profile!r}") # Validation above narrows to a Literal; cast keeps the internal # invariant typed without forcing callers through a Literal at the # call site. self._profile: _Profile = cast(_Profile, profile) self._path = path self._ro_catalog: Catalog | None = None # lazy self._rw_catalog: Catalog | None = None # lazy, set by enable_writes() self._writes_enabled: bool = False # ------------------------------------------------------------------ # Handles # ------------------------------------------------------------------ @property def catalog(self) -> Catalog: """The read-only catalog handle. Constructed on first access; subsequent accesses return the cached handle. **Always returns the RO catalog handle for R2 profiles**, even after :meth:`enable_writes` — credential isolation is enforced and ``register_*`` methods use a separate write handle internally. For ``profile="local"`` there is no credential boundary to enforce; reads and writes share this single handle (see :meth:`_write_catalog`). """ if self._ro_catalog is None: self._ro_catalog = Catalog.open(profile=self._profile, path=self._path) return self._ro_catalog @property def writes_enabled(self) -> bool: return self._writes_enabled
[docs] def enable_writes(self) -> None: """Open the ``assets_rw`` write handle. Idempotent — a second call is a no-op. For ``profile="local"``, local lance has no credential distinction, so the bool flip is the only observable effect; ``register_*`` methods fall back to the read handle. """ if self._writes_enabled: return if self._profile == "local": # No credential split on local lance — register_* falls back # to ``self.catalog`` (the RO handle is the only handle). self._writes_enabled = True return rw_store = get_store("assets_rw", profile=self._profile) self._rw_catalog = Catalog._from_store(rw_store) self._writes_enabled = True
[docs] def _write_catalog(self) -> Catalog: """The write-capable catalog handle. Raises :class:`WritesNotEnabled` if :meth:`enable_writes` hasn't been called. For ``profile="local"``, falls back to the same underlying handle as :attr:`catalog` — there is no credential distinction to enforce locally. ``register_*`` callers route both the idempotency ``get_*`` check and the ``add_*`` write through this handle so the check-and-write run under one credential. Tests that bypass the register-layer idempotency for fixture-setup reasons may also use this handle for raw seeding via ``add_*`` — this is a deliberate test seam, not a public API. Stable across :class:`DataInterface` evolution only insofar as it's required by the in-repo register-test suite. """ self._require_writes() if self._rw_catalog is not None: return self._rw_catalog return self.catalog
[docs] def _require_writes(self) -> None: if not self._writes_enabled: raise WritesNotEnabled( "DataInterface is read-only. Call data.enable_writes() before " "calling register_*. Writes require the assets_rw credential role." )
# ------------------------------------------------------------------ # Read verbs (query → get → download) # ------------------------------------------------------------------
[docs] def query( self, spec: QuerySpec | None = None, **kwargs: Any, ) -> QueryResultList: """Filter the Ursa catalog and return matching recordings. See :func:`_query` for parameter documentation. This method delegates to the inlined implementation with the RO catalog handle. """ return _query(spec, catalog=self.catalog, **kwargs)
@overload def get(self, target: QueryResult, *, concat: bool = False) -> Data: ... @overload def get(self, target: Iterable[QueryResult], *, concat: bool = False) -> list[Data] | Data: ...
[docs] def get( self, target: QueryResult | Iterable[QueryResult], *, concat: bool = False, ) -> Data | list[Data]: """Materialize ``target`` into in-memory :class:`Data` objects. Threads this interface's ``profile`` to the segment-store resolution. Overloaded so static callers get ``Data`` for a single :class:`QueryResult` and ``list[Data]`` for an iterable — preserving the typing contract from the deleted ``ursa.get``. """ return _get(target, concat=concat, profile=self._profile)
[docs] def download( self, target: QueryResult | Iterable[QueryResult], dest: str | os.PathLike[str], *, layout: _LayoutMode = "by_recording", overwrite: bool = False, ) -> list[Path]: """Stream raw-modality bytes for ``target`` to disk under ``dest``. Threads this interface's ``profile`` to the segment-store resolution. """ return _download(target, dest, layout=layout, overwrite=overwrite, profile=self._profile)
# ------------------------------------------------------------------ # Register verbs (gated on enable_writes()) # ------------------------------------------------------------------
[docs] def register_participant( self, *, participant_id: str, enrolled_at: int | datetime, metadata: dict[str, Any] | None = None, ) -> ParticipantRow: return _register_participant( participant_id=participant_id, enrolled_at=enrolled_at, metadata=metadata, catalog=self._write_catalog(), )
[docs] def register_recording( self, *, recording_hash: str, participant_ids: list[str], start_time: int | datetime, duration: timedelta, device_info: dict[str, Any] | None = None, metadata: dict[str, Any] | None = None, ) -> RecordingRow: return _register_recording( recording_hash=recording_hash, participant_ids=participant_ids, start_time=start_time, duration=duration, device_info=device_info, metadata=metadata, catalog=self._write_catalog(), )
[docs] def register_modality( self, *, recording_hash: str, modality: str, raw_storage_uri: str, storage_uri: str | None = None, ingestion_status: IngestionStatus = IngestionStatus.RAW, format: StorageFormat | None = None, domain_intervals: list[tuple[float, float]] | None = None, sampling_rate: float | None = None, channel_spec: dict[str, Any] | None = None, metadata: dict[str, Any] | None = None, ) -> ModalityRow: return _register_modality( recording_hash=recording_hash, modality=modality, raw_storage_uri=raw_storage_uri, storage_uri=storage_uri, ingestion_status=ingestion_status, format=format, domain_intervals=domain_intervals, sampling_rate=sampling_rate, channel_spec=channel_spec, metadata=metadata, catalog=self._write_catalog(), )
[docs] def register_event( self, *, event_id: str, recording_hash: str, event_time: float, event_type: str, prompt: str | None = None, response: str | None = None, metadata: dict[str, Any] | None = None, ) -> EventRow: return _register_event( event_id=event_id, recording_hash=recording_hash, event_time=event_time, event_type=event_type, prompt=prompt, response=response, metadata=metadata, catalog=self._write_catalog(), )
# ------------------------------------------------------------------ # Per-table CRUD forwarders (read-only — always RO handle) # ------------------------------------------------------------------
[docs] def list_participants(self, **kwargs: Any) -> list[ParticipantRow]: return self.catalog.list_participants(**kwargs)
[docs] def list_recordings(self, **kwargs: Any) -> list[RecordingRow]: return self.catalog.list_recordings(**kwargs)
[docs] def list_modalities(self, **kwargs: Any) -> list[ModalityRow]: return self.catalog.list_modalities(**kwargs)
[docs] def list_events(self, **kwargs: Any) -> list[EventRow]: return self.catalog.list_events(**kwargs)
[docs] def list_embeddings(self, **kwargs: Any) -> list[EmbeddingRow]: return self.catalog.list_embeddings(**kwargs)
[docs] def list_virgo_assets(self, **kwargs: Any) -> list[VirgoAssetRow]: return self.catalog.list_virgo_assets(**kwargs)
[docs] def list_checkpoints(self, **kwargs: Any) -> list[CheckpointRow]: return self.catalog.list_checkpoints(**kwargs)
[docs] def list_benchmark_suites(self, **kwargs: Any) -> list[BenchmarkSuiteRow]: return self.catalog.list_benchmark_suites(**kwargs)
[docs] def list_benchmark_results(self, **kwargs: Any) -> list[BenchmarkResultRow]: return self.catalog.list_benchmark_results(**kwargs)
[docs] def get_participant(self, participant_id: str) -> ParticipantRow | None: return self.catalog.get_participant(participant_id)
[docs] def get_recording(self, recording_hash: str) -> RecordingRow | None: return self.catalog.get_recording(recording_hash)
[docs] def get_modality(self, recording_hash: str, modality: str) -> ModalityRow | None: return self.catalog.get_modality(recording_hash, modality)
[docs] def get_event(self, event_id: str) -> EventRow | None: return self.catalog.get_event(event_id)
[docs] def get_embedding(self, embedding_id: str) -> EmbeddingRow | None: return self.catalog.get_embedding(embedding_id)
[docs] def get_virgo_asset(self, asset_id: str) -> VirgoAssetRow | None: return self.catalog.get_virgo_asset(asset_id)
[docs] def get_checkpoint(self, checkpoint_id: str) -> CheckpointRow | None: return self.catalog.get_checkpoint(checkpoint_id)
[docs] def get_benchmark_suite(self, suite_name: str, suite_version: int) -> BenchmarkSuiteRow | None: return self.catalog.get_benchmark_suite(suite_name, suite_version)
[docs] def get_benchmark_result(self, result_id: str) -> BenchmarkResultRow | None: return self.catalog.get_benchmark_result(result_id)
# =========================================================================== # CatalogWriter Protocol (private, test seam for tests/register/conftest.py) # =========================================================================== # # This Protocol existed in the deleted ursa.register._writer module so test # code could swap in an in-memory writer with no Lance dependency. Tests # under tests/register/ continue to import this name from the new location: # ``from ursa.data_interface import CatalogWriter``. # # Method names mirror :class:`ursa.catalog.Catalog`'s typed wrappers # (``add_participant``/``get_participant`` etc.), so the real # :class:`Catalog` structurally satisfies this Protocol without an adapter # layer. # # ``add_*`` implementations MUST raise :class:`CatalogRowExists` (or any # subclass — :class:`~ursa.catalog.CatalogPKConflict` qualifies) when the # row's ``__primary_key__`` is already present in the target table. # Idempotency is a register-layer policy (fetch-and-compare before insert), # not a catalog-layer one.
[docs] @runtime_checkable class CatalogWriter(Protocol): """Get + insert contract that :class:`ursa.catalog.Catalog` satisfies. ``@runtime_checkable`` so the structural-conformance test in ``tests/register/test_catalog_protocol_structural.py`` can ``isinstance(catalog, CatalogWriter)`` — catches any ``add_*`` / ``get_*`` rename in :class:`Catalog` at unit-test time. """
[docs] def add_participant(self, row: ParticipantRow) -> None: ...
[docs] def add_recording(self, row: RecordingRow) -> None: ...
[docs] def add_modality(self, row: ModalityRow) -> None: ...
[docs] def add_event(self, row: EventRow) -> None: ...
[docs] def get_participant(self, participant_id: str) -> ParticipantRow | None: ...
[docs] def get_recording(self, recording_hash: str) -> RecordingRow | None: ...
[docs] def get_modality(self, recording_hash: str, modality: str) -> ModalityRow | None: ...
[docs] def get_event(self, event_id: str) -> EventRow | None: ...
# =========================================================================== # Query implementation (inlined from former ursa.query) # =========================================================================== # # Phase 1a (architecture v0.4 §7) ships a *raw-store* query: callers filter # the catalog by participant, modality, recording hash, and metadata # equality, and get back :class:`QueryResult` objects pointing at whole-file # raw modality URIs. No Zarr, no temporal slicing — those arrive after # Virgo's M2 ingestion node lands and are tracked by ENG-1082. # # The advanced kwargs are accepted at the type-system level so the public # signature stays stable; passing any of them while a matched modality has # ``ingestion_status="raw"`` raises ``NotImplementedError`` with a pointer # to ENG-1082. _FILTER_KWARG_NAMES = ( "participants", "modalities", "recording_hash", "metadata", "time_range", "pipeline_version", "time_filters", "metadata_filters", "derived", )
[docs] def _query( spec: QuerySpec | None = None, *, catalog: Catalog, participants: list[CatalogID] | None = None, modalities: list[ModalityName] | None = None, recording_hash: CatalogID | None = None, metadata: MetadataDict | None = None, time_range: tuple[EpochNs, EpochNs] | None = None, pipeline_version: str | None = None, time_filters: list[Any] | None = None, metadata_filters: list[Any] | None = None, derived: list[Any] | None = None, limit: int | None = None, ) -> QueryResultList: """Filter the Ursa catalog and return matching recordings. Two surfaces: 1. **Common case** — pass kwargs directly: ``data.query(participants=[...], modalities=[...])``. The kwargs auto-validate against :class:`QuerySpec`. 2. **Complex case** — build a :class:`QuerySpec` and pass it as the first positional arg: ``data.query(spec)``. Phase 1a returns :class:`QueryResult` carrying catalog projections only — no array bytes. Temporal kwargs and ``pipeline_version`` raise :class:`NotImplementedError` when a matched modality has ``ingestion_status="raw"``; the green path ships with ENG-1082. """ if spec is not None: _reject_kwargs_with_spec( participants=participants, modalities=modalities, recording_hash=recording_hash, metadata=metadata, time_range=time_range, pipeline_version=pipeline_version, time_filters=time_filters, metadata_filters=metadata_filters, derived=derived, ) else: spec = QuerySpec( participants=participants, modalities=modalities, recording_hash=recording_hash, metadata=metadata, time_range=time_range, pipeline_version=pipeline_version, time_filters=time_filters or [], metadata_filters=metadata_filters or [], derived=derived or [], ) if spec.metadata is not None and limit is not None: warnings.warn( "DataInterface.query(metadata=..., limit=...) performs a full catalog " "scan in M2; metadata pushdown lands with ENG-1066, full-scan removal " "with ENG-1088.", UserWarning, stacklevel=2, ) # Step 1: catalog pushdown for the safe scalar predicate (recording_hash). # participants/modalities/metadata are NOT pushed down: participant_ids is # a list column (ENG-1087), modalities lives on a different table, and # metadata pushdown is ENG-1066. recording_where: dict[str, Any] = {} if spec.recording_hash is not None: recording_where["recording_hash"] = spec.recording_hash recordings = catalog.list_recordings(where=recording_where or None) # Step 2: Python-side filters on recordings. if spec.participants is not None: wanted = set(spec.participants) recordings = [r for r in recordings if wanted.intersection(r.participant_ids)] if spec.metadata is not None: recordings = [r for r in recordings if _metadata_matches(r.metadata, spec.metadata)] if not recordings: return QueryResultList() # Step 3: modality scan + Phase-1a temporal/processed gate. by_recording = _collect_modalities(catalog, recordings, spec.modalities) # Drop recordings whose modality set is empty after the optional # ``modalities=`` filter; that's a "no match" signal, not a half-result. if spec.modalities is not None: recordings = [r for r in recordings if by_recording.get(r.recording_hash)] if spec.has_processed_kwarg(): _enforce_processed_gate(spec, by_recording) # Step 4: hydrate and truncate. results = [ QueryResult( recording_hash=r.recording_hash, participant_ids=list(r.participant_ids), start_time=r.start_time, duration=r.duration, device_info=dict(r.device_info), metadata=dict(r.metadata), modalities={m.modality: m for m in by_recording.get(r.recording_hash, [])}, ) for r in recordings ] if limit is not None: results = results[:limit] return QueryResultList(results)
[docs] def _reject_kwargs_with_spec(**kwargs: Any) -> None: """If ``spec`` is supplied, every filter kwarg must be unset. Mixing the two would force us to merge a spec with kwargs, and the merge semantics (override vs. union) are unobvious enough that being strict is cheaper than picking a rule. """ set_kwargs = [name for name in _FILTER_KWARG_NAMES if kwargs.get(name) is not None] if set_kwargs: raise TypeError( f"DataInterface.query(spec=..., {set_kwargs[0]}=...): pass either a " f"QuerySpec or individual filter kwargs, not both. Offending kwargs: " f"{set_kwargs!r}." )
[docs] def _metadata_matches(actual: Mapping[str, Any], expected: Mapping[str, Any]) -> bool: """Equality on every key in ``expected``; missing keys do not match.""" return all(actual.get(k) == v for k, v in expected.items())
[docs] def _collect_modalities( catalog: Catalog, recordings: list[RecordingRow], modality_filter: list[ModalityName] | None, ) -> dict[CatalogID, list[ModalityRow]]: """Fetch the modality rows for ``recordings``, grouped by recording_hash. Single ``list_modalities`` call (no batching in Phase 1a; ENG-1089 covers growth). Empty input short-circuits to avoid an empty-IN-list edge case. """ if not recordings: return {} where: dict[str, Any] = {"recording_hash": [r.recording_hash for r in recordings]} if modality_filter is not None: where["modality"] = list(modality_filter) rows = catalog.list_modalities(where=where) grouped: dict[CatalogID, list[ModalityRow]] = {} for row in rows: grouped.setdefault(row.recording_hash, []).append(row) return grouped
[docs] def _enforce_processed_gate( spec: QuerySpec, by_recording: dict[CatalogID, list[ModalityRow]], ) -> None: """Raise if any matched modality is still raw and the spec asked for processed-only behavior. The first offending (modality, recording, kwarg) is reported so the message points at a concrete row. """ offending_kwargs = _processed_kwargs_set(spec) for hash_, rows in by_recording.items(): for row in rows: if row.ingestion_status is IngestionStatus.RAW: kwarg = next(iter(offending_kwargs)) raise NotImplementedError( f"DataInterface.query({kwarg}=...) requires processed " f"modalities; modality {row.modality!r} on recording " f"{hash_!r} has ingestion_status='raw'. Backfilled by " f"{_BACKFILL_TICKET} once Virgo M2 ships the ingestion node." )
[docs] def _processed_kwargs_set(spec: QuerySpec) -> list[str]: """Names of the processed-only kwargs that are non-None / non-empty.""" return [k for k in _PROCESSED_KWARGS if getattr(spec, k)]
# =========================================================================== # Get implementation (inlined from former ursa.get) # =========================================================================== # # Phase 1a (M2) — raw path only. For each modality with # ``ingestion_status="raw"``, list the segment files under # ``ModalityRow.raw_storage_uri``, fetch each segment's bytes, and surface # them as a :class:`RawBytes` carrier under ``data.modalities[name]``. No # parsing, no time domain. # # Phase 1b / M3 — processed path deferred to ENG-1093. @overload def _get( target: QueryResult, *, concat: bool = False, profile: str | None = None, ) -> Data: ... @overload def _get( target: Iterable[QueryResult], *, concat: bool = False, profile: str | None = None, ) -> list[Data] | Data: ...
[docs] def _get( target: QueryResult | Iterable[QueryResult], *, concat: bool = False, profile: str | None = None, ) -> Data | list[Data]: """Materialize ``target`` into in-memory :class:`Data` objects.""" if isinstance(target, QueryResult): return _get_one(target, profile=profile) results = [_get_one(qr, profile=profile) for qr in target] if not concat: return results # concat path _validate_alignment(results) raise NotImplementedError( f"concat over raw modalities is deferred to {_PROCESSED_PATH_TICKET} " "(M3 processed path); cross-recording concat requires aligned time " "domains, which only the processed path establishes." )
[docs] def _get_one(qr: QueryResult, *, profile: str | None = None) -> Data: """Build a single :class:`Data` from one :class:`QueryResult`.""" rec_row = _recording_row_from(qr) modalities: dict[str, RawBytes] = { name: _materialize_modality(name, mrow, profile=profile) for name, mrow in qr.modalities.items() } data = Data(metadata=rec_row) # ``temporaldata.Data.__setattr__`` only blocks time-based types # (Interval / IrregularTimeSeries / RegularTimeSeries / Data with # domain) when the parent has no domain. ``MappingProxyType`` is a # plain mapping and passes through. data.modalities = MappingProxyType(modalities) return data
[docs] def _recording_row_from(qr: QueryResult) -> RecordingRow: """Project ``qr`` onto :class:`RecordingRow`'s field set. Uses an explicit field tuple so a future :class:`QueryResult` schema change breaks here at validation time, not silently inside Pydantic. """ return RecordingRow.model_validate( {field: getattr(qr, field) for field in _RECORDING_ROW_FIELDS} )
[docs] def _materialize_modality(name: str, mrow: ModalityRow, *, profile: str | None = None) -> RawBytes: """Read all segment files under ``mrow.raw_storage_uri`` into :class:`RawBytes`.""" # Defensive guard — processed modalities go through ENG-1093, not here. if mrow.ingestion_status is not IngestionStatus.RAW: raise NotImplementedError( f"modality {name!r} on recording {mrow.recording_hash!r} has " f"ingestion_status={mrow.ingestion_status.value!r}; processed-path " f"read deferred to {_PROCESSED_PATH_TICKET}." ) role, key_prefix = parse_storage_uri(mrow.raw_storage_uri) store: ObjectStore = get_store(role, profile=profile) # Explicit sort by key — do not rely on backend listing order. metas = sorted(store.list(prefix=key_prefix), key=lambda m: m.key) segments = tuple((meta.key, store.get(meta.key)) for meta in metas) return RawBytes(segments=segments, metadata=mrow)
[docs] def _validate_alignment(results: list[Data]) -> None: """Raise ``ValueError`` if recordings have mismatched modality sets.""" sets = [frozenset(d.modalities.keys()) for d in results] if len(set(sets)) > 1: raise ValueError( f"concat requires aligned modalities across recordings; got " f"{[sorted(s) for s in sets]!r}" )
# =========================================================================== # Download implementation (inlined from former ursa.download) # =========================================================================== # # Phase 1a (M2) — raw path only. Streams segment files under # ``ModalityRow.raw_storage_uri`` to disk under a per-modality directory. # ``ObjectStore.open()`` is used (not ``get()``) so multi-GB objects never # sit in memory. Phase 1b processed path is deferred to ENG-1093.
[docs] class _PlannedWrite(NamedTuple): """One (store, source key, destination path) tuple produced by the plan phase and consumed by the execute phase. """ store: ObjectStore source_key: str dest_path: Path meta_size: int
[docs] def _download( target: QueryResult | Iterable[QueryResult], dest: str | os.PathLike[str], *, layout: _LayoutMode = "by_recording", overwrite: bool = False, profile: str | None = None, ) -> list[Path]: """Stream raw-modality bytes for ``target`` to disk under ``dest``.""" if layout not in _VALID_LAYOUTS: raise ValueError(f"layout must be one of {sorted(_VALID_LAYOUTS)!r}; got {layout!r}") dest_root = Path(dest) qrs: list[QueryResult] = [target] if isinstance(target, QueryResult) else list(target) planned = _plan_writes(qrs, dest_root, layout=layout, overwrite=overwrite, profile=profile) written: list[Path] = [] for pw in planned: # No pre-unlink even when overwrite=True — _stream_to_disk writes # to a .part sibling and uses os.replace() for atomic swap. _stream_to_disk(pw.store, pw.source_key, pw.dest_path) written.append(pw.dest_path) return written
[docs] def _plan_writes( qrs: list[QueryResult], dest_root: Path, *, layout: _LayoutMode, overwrite: bool, profile: str | None = None, ) -> list[_PlannedWrite]: """Enumerate every write before any I/O. One pass over the inputs produces: * the full ordered list of :class:`_PlannedWrite` tuples, * an intra-call duplicate-destination check that catches collisions across all layouts, * a pre-existing-destination check that batches every collision into one :class:`FileExistsError` when ``overwrite=False``. """ planned: list[_PlannedWrite] = [] seen_dests: dict[Path, _PlannedWrite] = {} for qr in qrs: recording_hash = qr.recording_hash for modality_name, mrow in qr.modalities.items(): _check_modality_eligibility(modality_name, mrow, layout=layout) role, key_prefix = parse_storage_uri(mrow.raw_storage_uri) store: ObjectStore = get_store(role, profile=profile) metas = sorted(store.list(prefix=key_prefix), key=lambda m: m.key) if not metas: raise FileNotFoundError( f"no objects at {mrow.raw_storage_uri!r} for modality " f"{modality_name!r} on recording {recording_hash!r}; " "registered modality with zero segments is a catalog " "or upload bug." ) for meta in metas: rel_key = meta.key.removeprefix(key_prefix).lstrip("/") dest_path = _dest_path( dest_root, layout=layout, recording_hash=recording_hash, modality=modality_name, rel_key=rel_key, ) if dest_path in seen_dests: prior = seen_dests[dest_path] raise ValueError( f"layout={layout!r} produces duplicate destination " f"{dest_path}: would be written by both " f"{prior.source_key!r} and {meta.key!r}" ) pw = _PlannedWrite( store=store, source_key=meta.key, dest_path=dest_path, meta_size=meta.size, ) seen_dests[dest_path] = pw planned.append(pw) if not overwrite: collisions = [pw.dest_path for pw in planned if pw.dest_path.exists()] if collisions: shown = ", ".join(str(p) for p in collisions[:20]) more = f" …and {len(collisions) - 20} more" if len(collisions) > 20 else "" raise FileExistsError( f"refusing to overwrite {len(collisions)} existing file(s): " f"[{shown}{more}]; pass overwrite=True to replace." ) return planned
[docs] def _check_modality_eligibility( modality_name: str, mrow: ModalityRow, *, layout: _LayoutMode, ) -> None: """Raise if ``mrow`` is not eligible for M2 raw-path download.""" if mrow.ingestion_status is not IngestionStatus.RAW: raise NotImplementedError( f"modality {modality_name!r} on recording {mrow.recording_hash!r} " f"has ingestion_status={mrow.ingestion_status.value!r}; " f"download for the processed path (storage_uri) lands with " f"{_PROCESSED_PATH_TICKET} / Phase 1b. M2 supports raw " "modalities only." ) if layout == "flat": # Avoid ambiguous flattened directory names. CatalogID's regex # permits ``_`` so ``__`` is technically valid in a hash, and # ModalityName is NonEmptyString with no further constraints. if "__" in mrow.recording_hash: raise ValueError( f"layout='flat' requires recording_hash without '__'; got {mrow.recording_hash!r}" ) if "__" in modality_name: raise ValueError( f"layout='flat' requires modality names without '__'; got {modality_name!r}" )
[docs] def _dest_path( dest_root: Path, *, layout: _LayoutMode, recording_hash: str, modality: str, rel_key: str, ) -> Path: """Compute the on-disk destination for one source segment.""" if layout == "by_recording": return dest_root / recording_hash / modality / rel_key if layout == "by_modality": return dest_root / modality / recording_hash / rel_key # layout == "flat" return dest_root / f"{recording_hash}__{modality}" / rel_key
[docs] def _stream_to_disk(store: ObjectStore, key: str, dest: Path) -> None: """Copy bytes from ``store[key]`` to ``dest`` via a temp-file rename. Uses :meth:`ObjectStore.open` (forward-only) so multi-GB raw video/audio never sits in memory. A ``.part`` rename ensures partial writes are not visible at the canonical destination. """ dest.parent.mkdir(parents=True, exist_ok=True) tmp = dest.with_suffix(dest.suffix + ".part") try: with store.open(key) as src, tmp.open("wb") as out: while chunk := src.read(_STREAM_CHUNK): out.write(chunk) tmp.replace(dest) # atomic same-volume rename (os.replace) except BaseException: # Cover normal exceptions and KeyboardInterrupt / SystemExit alike. tmp.unlink(missing_ok=True) raise
# =========================================================================== # Register implementations (inlined from former ursa.register) # =========================================================================== # # Each function: # # 1. Builds the corresponding ``*Row`` from keyword args (Pydantic validates # types, ranges, and ``MetadataDict`` shape on construction). # 2. For ``_register_modality``, additionally validates that ``storage_uri`` # matches the bucket tier implied by ``format``. # 3. Looks up any existing row by primary key. If the stored row equals the # candidate, returns it as an idempotent no-op. If it differs, raises # :class:`CatalogRowExists` with a structured ``diff``. # 4. Otherwise inserts through the supplied :class:`CatalogWriter`. # # The catalog layer's strict ``add_*`` is unchanged; idempotency is a # register-layer policy. # # Single-writer assumption: the ``get_*``-then-``add_*`` sequence is **not # atomic**. Under concurrent registers, both callers could see ``existing # is None`` and one of them would receive a # :class:`~ursa.catalog.CatalogPKConflict` raised directly from the catalog. _MISSING: object = object()
[docs] def _pk_tuple_from_row(row: CatalogRow) -> tuple[tuple[str, object], ...]: """Build the (field, value) tuple ``CatalogRowExists`` expects from a row.""" return tuple((f, getattr(row, f)) for f in row.__primary_key__)
[docs] def _diff_rows(existing: CatalogRow, candidate: CatalogRow) -> dict[str, tuple[object, object]]: """Field-by-field diff of two same-class rows. Empty dict if equal.""" a = existing.model_dump(mode="python") b = candidate.model_dump(mode="python") return { k: (a.get(k, _MISSING), b.get(k, _MISSING)) for k in a.keys() | b.keys() if a.get(k, _MISSING) != b.get(k, _MISSING) }
[docs] def _register_participant( *, participant_id: str, enrolled_at: int | datetime, metadata: dict[str, Any] | None = None, catalog: CatalogWriter, ) -> ParticipantRow: """Register a new participant. Idempotent on re-register.""" row = ParticipantRow( participant_id=participant_id, enrolled_at=cast(int, enrolled_at), metadata=metadata or {}, ) existing = catalog.get_participant(participant_id) if existing is not None: if existing == row: return existing raise CatalogRowExists( "participants", _pk_tuple_from_row(row), diff=_diff_rows(existing, row), ) catalog.add_participant(row) return row
[docs] def _register_recording( *, recording_hash: str, participant_ids: list[str], start_time: int | datetime, duration: timedelta, device_info: dict[str, Any] | None = None, metadata: dict[str, Any] | None = None, catalog: CatalogWriter, ) -> RecordingRow: """Register a recording session. Idempotent on re-register.""" row = RecordingRow( recording_hash=recording_hash, participant_ids=participant_ids, start_time=cast(int, start_time), duration=duration, device_info=device_info or {}, metadata=metadata or {}, ) existing = catalog.get_recording(recording_hash) if existing is not None: if existing == row: return existing raise CatalogRowExists( "recordings", _pk_tuple_from_row(row), diff=_diff_rows(existing, row), ) catalog.add_recording(row) return row
[docs] def _register_modality( *, recording_hash: str, modality: str, raw_storage_uri: str, storage_uri: str | None = None, ingestion_status: IngestionStatus = IngestionStatus.RAW, format: StorageFormat | None = None, domain_intervals: list[tuple[float, float]] | None = None, sampling_rate: float | None = None, channel_spec: dict[str, Any] | None = None, metadata: dict[str, Any] | None = None, catalog: CatalogWriter, ) -> ModalityRow: """Register a data stream within a recording. Idempotent on re-register.""" # Default storage_uri to raw_storage_uri so raw-state callers don't # have to pass it twice. effective_storage_uri = storage_uri if storage_uri is not None else raw_storage_uri row = ModalityRow( recording_hash=recording_hash, modality=modality, ingestion_status=ingestion_status, storage_uri=effective_storage_uri, raw_storage_uri=raw_storage_uri, format=format, sampling_rate=sampling_rate, domain_intervals=domain_intervals, channel_spec=channel_spec, metadata=metadata or {}, ) if row.format is not None: validate_storage_uri(row.storage_uri, row.format) # Architecture v0.4 cold-bucket invariant: ``raw_storage_uri`` is the # immutable cold-bucket pointer regardless of ingestion_status. # Validate it independently of ``validate_storage_uri`` (which keys # off ``StorageFormat`` and gates the warm-bucket ``storage_uri``). # Flagged in PR #17 review (ENG-1083) — do not collapse the two # checks; they validate different invariants. parsed_raw = urlparse(row.raw_storage_uri) if parsed_raw.scheme != "file" and parsed_raw.netloc != active_buckets().raw: raise ValueError( f"raw_storage_uri {row.raw_storage_uri!r} points at bucket " f"{parsed_raw.netloc!r}, but must live under " f"{active_buckets().raw!r} (architecture v0.4 cold-bucket invariant)" ) existing = catalog.get_modality(recording_hash, modality) if existing is not None: if existing == row: return existing raise CatalogRowExists( "modalities", _pk_tuple_from_row(row), diff=_diff_rows(existing, row), ) catalog.add_modality(row) return row
[docs] def _register_event( *, event_id: str, recording_hash: str, event_time: float, event_type: str, prompt: str | None = None, response: str | None = None, metadata: dict[str, Any] | None = None, catalog: CatalogWriter, ) -> EventRow: """Register a time-stamped event within a recording. Idempotent on re-register.""" row = EventRow( event_id=event_id, recording_hash=recording_hash, event_time=event_time, event_type=event_type, prompt=prompt, response=response, metadata=metadata or {}, ) existing = catalog.get_event(event_id) if existing is not None: if existing == row: return existing raise CatalogRowExists( "events", _pk_tuple_from_row(row), diff=_diff_rows(existing, row), ) catalog.add_event(row) return row