ursa.data_interface

Credential isolation

A fresh :class:DataInterface opens against the assets_ro credential role only. Calling any register_* method before

meth:

enable_writes raises :class:~ursa.catalog.WritesNotEnabled — production callers must explicitly request the assets_rw role.

meth:

enable_writes lazily builds a second :class:Catalog handle against assets_rw. The public read surface (query, list_*, get_*, :attr:catalog) always uses the RO handle. register_* runs on the RW handle — including its internal idempotency get_* lookup, so the check-and-write happen under a single credential and the read-modify-write race window stays tight.

The credential-boundary story is R2-only. profile="local" has no credential boundary to enforce: the local lance dataset has a single on-disk URI, so :meth:enable_writes is just a permission flag flip — reads and writes share the same handle, which

meth:

_write_catalog resolves by falling back to :attr:catalog when _rw_catalog is None. Tests treat local as the fast fixture for the idempotency / diff machinery; the credential split is exercised against R2 stubs in tests/test_data_interface.py.

Lazy construction

R2 profiles do not resolve 1Password credentials at construction; the underlying :class:Catalog is built on first access to :attr:catalog (reads) or :meth:enable_writes (writes). Offline / sandboxed environments (CI, doc builds) can import and instantiate

class:

DataInterface without an op session. Argument validation (profile value, path constraints) runs eagerly so caller mistakes surface immediately.

File layout

  • DataInterface class + public API

  • CatalogWriter Protocol (private, test seam for tests/register/conftest.py)

  • Query implementation (inlined from former ursa.query)

  • Get implementation (inlined from former ursa.get)

  • Download implementation (inlined from former ursa.download)

  • Register implementations (inlined from former ursa.register)

User-facing class surface for Ursa (ENG-1108 / ENG-1119 / ENG-1120).

class:

DataInterface packages the three M2 read verbs (query / get / download), the four register write verbs (participant / recording / modality / event), and the typed per-table list_<table> / get_<table> lookups behind a single object that owns its

class:

~ursa.catalog.Catalog handles.

Module Contents

Classes

DataInterface

Class-based entry point for Ursa’s read + register API.

CatalogWriter

Get + insert contract that :class:ursa.catalog.Catalog satisfies.

_PlannedWrite

One (store, source key, destination path) tuple produced by the plan phase and consumed by the execute phase.

Functions

_query

Filter the Ursa catalog and return matching recordings.

_reject_kwargs_with_spec

If spec is supplied, every filter kwarg must be unset.

_metadata_matches

Equality on every key in expected; missing keys do not match.

_collect_modalities

Fetch the modality rows for recordings, grouped by recording_hash.

_enforce_processed_gate

Raise if any matched modality is still raw and the spec asked for processed-only behavior. The first offending (modality, recording, kwarg) is reported so the message points at a concrete row.

_processed_kwargs_set

Names of the processed-only kwargs that are non-None / non-empty.

_get

Materialize target into in-memory :class:Data objects.

_get_one

Build a single :class:Data from one :class:QueryResult.

_recording_row_from

Project qr onto :class:RecordingRow’s field set.

_materialize_modality

Read all segment files under mrow.raw_storage_uri into :class:RawBytes.

_validate_alignment

Raise ValueError if recordings have mismatched modality sets.

_download

Stream raw-modality bytes for target to disk under dest.

_plan_writes

Enumerate every write before any I/O.

_check_modality_eligibility

Raise if mrow is not eligible for M2 raw-path download.

_dest_path

Compute the on-disk destination for one source segment.

_stream_to_disk

Copy bytes from store[key] to dest via a temp-file rename.

_pk_tuple_from_row

Build the (field, value) tuple CatalogRowExists expects from a row.

_diff_rows

Field-by-field diff of two same-class rows. Empty dict if equal.

_register_participant

Register a new participant. Idempotent on re-register.

_register_recording

Register a recording session. Idempotent on re-register.

_register_modality

Register a data stream within a recording. Idempotent on re-register.

_register_event

Register a time-stamped event within a recording. Idempotent on re-register.

Data

API

ursa.data_interface.__all__

[‘DataInterface’]

ursa.data_interface._Profile

None

ursa.data_interface._BACKFILL_TICKET

‘ENG-1082’

ursa.data_interface._PROCESSED_KWARGS

(‘time_range’, ‘pipeline_version’, ‘time_filters’, ‘metadata_filters’, ‘derived’)

ursa.data_interface._PROCESSED_PATH_TICKET

‘ENG-1093’

ursa.data_interface._LayoutMode

None

ursa.data_interface._VALID_LAYOUTS: frozenset[str]

‘frozenset(…)’

ursa.data_interface._STREAM_CHUNK

None

ursa.data_interface._RECORDING_ROW_FIELDS: tuple[str, ...]

(‘recording_hash’, ‘participant_ids’, ‘start_time’, ‘duration’, ‘device_info’, ‘metadata’)

class ursa.data_interface.DataInterface(profile: str = 'r2', *, path: str | pathlib.Path | None = None)[source]

Class-based entry point for Ursa’s read + register API.

Construction validates arguments but defers credential resolution (R2 profiles) until first use. Call :meth:enable_writes once before invoking any register_* method; that promotion is idempotent.

Parameters

profile "r2" (default) for the production catalog, "r2-test" for the testing bucket, "local" for an on-disk lance directory. Typed str (not Literal) so callers can pass values read from os.environ / config files without a cast or # type: ignore — invalid strings raise :class:ValueError at construction. path Required when profile="local". Forbidden for R2 profiles — their URI is resolved from the configured object store.

Initialization

property catalog: ursa.catalog.Catalog

The read-only catalog handle.

Constructed on first access; subsequent accesses return the cached handle. Always returns the RO catalog handle for R2 profiles, even after :meth:enable_writes — credential isolation is enforced and register_* methods use a separate write handle internally. For profile="local" there is no credential boundary to enforce; reads and writes share this single handle (see :meth:_write_catalog).

property writes_enabled: bool
enable_writes() None[source]

Open the assets_rw write handle.

Idempotent — a second call is a no-op. For profile="local", local lance has no credential distinction, so the bool flip is the only observable effect; register_* methods fall back to the read handle.

_write_catalog() ursa.catalog.Catalog[source]

The write-capable catalog handle.

Raises :class:WritesNotEnabled if :meth:enable_writes hasn’t been called. For profile="local", falls back to the same underlying handle as :attr:catalog — there is no credential distinction to enforce locally.

register_* callers route both the idempotency get_* check and the add_* write through this handle so the check-and-write run under one credential.

Tests that bypass the register-layer idempotency for fixture-setup reasons may also use this handle for raw seeding via add_* — this is a deliberate test seam, not a public API. Stable across :class:DataInterface evolution only insofar as it’s required by the in-repo register-test suite.

_require_writes() None[source]
query(spec: ursa._query_types.QuerySpec | None = None, **kwargs: Any) ursa._query_types.QueryResultList[source]

Filter the Ursa catalog and return matching recordings.

See :func:_query for parameter documentation. This method delegates to the inlined implementation with the RO catalog handle.

get(target: ursa._query_types.QueryResult | collections.abc.Iterable[ursa._query_types.QueryResult], *, concat: bool = False) ursa.temporal.Data | list[ursa.temporal.Data][source]

Materialize target into in-memory :class:Data objects.

Threads this interface’s profile to the segment-store resolution. Overloaded so static callers get Data for a single :class:QueryResult and list[Data] for an iterable — preserving the typing contract from the deleted ursa.get.

download(target: ursa._query_types.QueryResult | collections.abc.Iterable[ursa._query_types.QueryResult], dest: str | os.PathLike[str], *, layout: ursa.data_interface._LayoutMode = 'by_recording', overwrite: bool = False) list[pathlib.Path][source]

Stream raw-modality bytes for target to disk under dest.

Threads this interface’s profile to the segment-store resolution.

register_participant(*, participant_id: str, enrolled_at: int | datetime.datetime, metadata: dict[str, Any] | None = None) ursa.catalog.ParticipantRow[source]
register_recording(*, recording_hash: str, participant_ids: list[str], start_time: int | datetime.datetime, duration: datetime.timedelta, device_info: dict[str, Any] | None = None, metadata: dict[str, Any] | None = None) ursa.catalog.RecordingRow[source]
register_modality(*, recording_hash: str, modality: str, raw_storage_uri: str, storage_uri: str | None = None, ingestion_status: ursa.catalog.IngestionStatus = IngestionStatus.RAW, format: ursa.catalog.StorageFormat | None = None, domain_intervals: list[tuple[float, float]] | None = None, sampling_rate: float | None = None, channel_spec: dict[str, Any] | None = None, metadata: dict[str, Any] | None = None) ursa.catalog.ModalityRow[source]
register_event(*, event_id: str, recording_hash: str, event_time: float, event_type: str, prompt: str | None = None, response: str | None = None, metadata: dict[str, Any] | None = None) ursa.catalog.EventRow[source]
list_participants(**kwargs: Any) list[ursa.catalog.ParticipantRow][source]
list_recordings(**kwargs: Any) list[ursa.catalog.RecordingRow][source]
list_modalities(**kwargs: Any) list[ursa.catalog.ModalityRow][source]
list_events(**kwargs: Any) list[ursa.catalog.EventRow][source]
list_embeddings(**kwargs: Any) list[ursa.catalog.EmbeddingRow][source]
list_virgo_assets(**kwargs: Any) list[ursa.catalog.VirgoAssetRow][source]
list_checkpoints(**kwargs: Any) list[ursa.catalog.CheckpointRow][source]
list_benchmark_suites(**kwargs: Any) list[ursa.catalog.BenchmarkSuiteRow][source]
list_benchmark_results(**kwargs: Any) list[ursa.catalog.BenchmarkResultRow][source]
get_participant(participant_id: str) ursa.catalog.ParticipantRow | None[source]
get_recording(recording_hash: str) ursa.catalog.RecordingRow | None[source]
get_modality(recording_hash: str, modality: str) ursa.catalog.ModalityRow | None[source]
get_event(event_id: str) ursa.catalog.EventRow | None[source]
get_embedding(embedding_id: str) ursa.catalog.EmbeddingRow | None[source]
get_virgo_asset(asset_id: str) ursa.catalog.VirgoAssetRow | None[source]
get_checkpoint(checkpoint_id: str) ursa.catalog.CheckpointRow | None[source]
get_benchmark_suite(suite_name: str, suite_version: int) ursa.catalog.BenchmarkSuiteRow | None[source]
get_benchmark_result(result_id: str) ursa.catalog.BenchmarkResultRow | None[source]
class ursa.data_interface.CatalogWriter[source]

Bases: typing.Protocol

Get + insert contract that :class:ursa.catalog.Catalog satisfies.

@runtime_checkable so the structural-conformance test in tests/register/test_catalog_protocol_structural.py can isinstance(catalog, CatalogWriter) — catches any add_* / get_* rename in :class:Catalog at unit-test time.

add_participant(row: ursa.catalog.ParticipantRow) None[source]
add_recording(row: ursa.catalog.RecordingRow) None[source]
add_modality(row: ursa.catalog.ModalityRow) None[source]
add_event(row: ursa.catalog.EventRow) None[source]
get_participant(participant_id: str) ursa.catalog.ParticipantRow | None[source]
get_recording(recording_hash: str) ursa.catalog.RecordingRow | None[source]
get_modality(recording_hash: str, modality: str) ursa.catalog.ModalityRow | None[source]
get_event(event_id: str) ursa.catalog.EventRow | None[source]
ursa.data_interface._FILTER_KWARG_NAMES

(‘participants’, ‘modalities’, ‘recording_hash’, ‘metadata’, ‘time_range’, ‘pipeline_version’, ‘time…

ursa.data_interface._query(spec: ursa._query_types.QuerySpec | None = None, *, catalog: ursa.catalog.Catalog, participants: list[ursa.catalog.schemas.CatalogID] | None = None, modalities: list[ursa.catalog.schemas.ModalityName] | None = None, recording_hash: ursa.catalog.schemas.CatalogID | None = None, metadata: ursa.catalog.schemas.MetadataDict | None = None, time_range: tuple[ursa.catalog.schemas.EpochNs, ursa.catalog.schemas.EpochNs] | None = None, pipeline_version: str | None = None, time_filters: list[Any] | None = None, metadata_filters: list[Any] | None = None, derived: list[Any] | None = None, limit: int | None = None) ursa._query_types.QueryResultList[source]

Filter the Ursa catalog and return matching recordings.

Two surfaces:

  1. Common case — pass kwargs directly: data.query(participants=[...], modalities=[...]). The kwargs auto-validate against :class:QuerySpec.

  2. Complex case — build a :class:QuerySpec and pass it as the first positional arg: data.query(spec).

Phase 1a returns :class:QueryResult carrying catalog projections only — no array bytes. Temporal kwargs and pipeline_version raise :class:NotImplementedError when a matched modality has ingestion_status="raw"; the green path ships with ENG-1082.

ursa.data_interface._reject_kwargs_with_spec(**kwargs: Any) None[source]

If spec is supplied, every filter kwarg must be unset.

Mixing the two would force us to merge a spec with kwargs, and the merge semantics (override vs. union) are unobvious enough that being strict is cheaper than picking a rule.

ursa.data_interface._metadata_matches(actual: collections.abc.Mapping[str, Any], expected: collections.abc.Mapping[str, Any]) bool[source]

Equality on every key in expected; missing keys do not match.

ursa.data_interface._collect_modalities(catalog: ursa.catalog.Catalog, recordings: list[ursa.catalog.RecordingRow], modality_filter: list[ursa.catalog.schemas.ModalityName] | None) dict[ursa.catalog.schemas.CatalogID, list[ursa.catalog.ModalityRow]][source]

Fetch the modality rows for recordings, grouped by recording_hash.

Single list_modalities call (no batching in Phase 1a; ENG-1089 covers growth). Empty input short-circuits to avoid an empty-IN-list edge case.

ursa.data_interface._enforce_processed_gate(spec: ursa._query_types.QuerySpec, by_recording: dict[ursa.catalog.schemas.CatalogID, list[ursa.catalog.ModalityRow]]) None[source]

Raise if any matched modality is still raw and the spec asked for processed-only behavior. The first offending (modality, recording, kwarg) is reported so the message points at a concrete row.

ursa.data_interface._processed_kwargs_set(spec: ursa._query_types.QuerySpec) list[str][source]

Names of the processed-only kwargs that are non-None / non-empty.

ursa.data_interface._get(target: ursa._query_types.QueryResult | collections.abc.Iterable[ursa._query_types.QueryResult], *, concat: bool = False, profile: str | None = None) ursa.temporal.Data | list[ursa.temporal.Data][source]

Materialize target into in-memory :class:Data objects.

ursa.data_interface._get_one(qr: ursa._query_types.QueryResult, *, profile: str | None = None) ursa.temporal.Data[source]

Build a single :class:Data from one :class:QueryResult.

ursa.data_interface._recording_row_from(qr: ursa._query_types.QueryResult) ursa.catalog.RecordingRow[source]

Project qr onto :class:RecordingRow’s field set.

Uses an explicit field tuple so a future :class:QueryResult schema change breaks here at validation time, not silently inside Pydantic.

ursa.data_interface._materialize_modality(name: str, mrow: ursa.catalog.ModalityRow, *, profile: str | None = None) ursa.raw.RawBytes[source]

Read all segment files under mrow.raw_storage_uri into :class:RawBytes.

ursa.data_interface._validate_alignment(results: list[ursa.temporal.Data]) None[source]

Raise ValueError if recordings have mismatched modality sets.

class ursa.data_interface._PlannedWrite[source]

Bases: typing.NamedTuple

One (store, source key, destination path) tuple produced by the plan phase and consumed by the execute phase.

store: ursa.store.ObjectStore

None

source_key: str

None

dest_path: pathlib.Path

None

meta_size: int

None

ursa.data_interface._download(target: ursa._query_types.QueryResult | collections.abc.Iterable[ursa._query_types.QueryResult], dest: str | os.PathLike[str], *, layout: ursa.data_interface._LayoutMode = 'by_recording', overwrite: bool = False, profile: str | None = None) list[pathlib.Path][source]

Stream raw-modality bytes for target to disk under dest.

ursa.data_interface._plan_writes(qrs: list[ursa._query_types.QueryResult], dest_root: pathlib.Path, *, layout: ursa.data_interface._LayoutMode, overwrite: bool, profile: str | None = None) list[ursa.data_interface._PlannedWrite][source]

Enumerate every write before any I/O.

One pass over the inputs produces:

  • the full ordered list of :class:_PlannedWrite tuples,

  • an intra-call duplicate-destination check that catches collisions across all layouts,

  • a pre-existing-destination check that batches every collision into one :class:FileExistsError when overwrite=False.

ursa.data_interface._check_modality_eligibility(modality_name: str, mrow: ursa.catalog.ModalityRow, *, layout: ursa.data_interface._LayoutMode) None[source]

Raise if mrow is not eligible for M2 raw-path download.

ursa.data_interface._dest_path(dest_root: pathlib.Path, *, layout: ursa.data_interface._LayoutMode, recording_hash: str, modality: str, rel_key: str) pathlib.Path[source]

Compute the on-disk destination for one source segment.

ursa.data_interface._stream_to_disk(store: ursa.store.ObjectStore, key: str, dest: pathlib.Path) None[source]

Copy bytes from store[key] to dest via a temp-file rename.

Uses :meth:ObjectStore.open (forward-only) so multi-GB raw video/audio never sits in memory. A .part rename ensures partial writes are not visible at the canonical destination.

ursa.data_interface._MISSING: object

‘object(…)’

ursa.data_interface._pk_tuple_from_row(row: ursa.catalog.CatalogRow) tuple[tuple[str, object], ...][source]

Build the (field, value) tuple CatalogRowExists expects from a row.

ursa.data_interface._diff_rows(existing: ursa.catalog.CatalogRow, candidate: ursa.catalog.CatalogRow) dict[str, tuple[object, object]][source]

Field-by-field diff of two same-class rows. Empty dict if equal.

ursa.data_interface._register_participant(*, participant_id: str, enrolled_at: int | datetime.datetime, metadata: dict[str, Any] | None = None, catalog: ursa.data_interface.CatalogWriter) ursa.catalog.ParticipantRow[source]

Register a new participant. Idempotent on re-register.

ursa.data_interface._register_recording(*, recording_hash: str, participant_ids: list[str], start_time: int | datetime.datetime, duration: datetime.timedelta, device_info: dict[str, Any] | None = None, metadata: dict[str, Any] | None = None, catalog: ursa.data_interface.CatalogWriter) ursa.catalog.RecordingRow[source]

Register a recording session. Idempotent on re-register.

ursa.data_interface._register_modality(*, recording_hash: str, modality: str, raw_storage_uri: str, storage_uri: str | None = None, ingestion_status: ursa.catalog.IngestionStatus = IngestionStatus.RAW, format: ursa.catalog.StorageFormat | None = None, domain_intervals: list[tuple[float, float]] | None = None, sampling_rate: float | None = None, channel_spec: dict[str, Any] | None = None, metadata: dict[str, Any] | None = None, catalog: ursa.data_interface.CatalogWriter) ursa.catalog.ModalityRow[source]

Register a data stream within a recording. Idempotent on re-register.

ursa.data_interface._register_event(*, event_id: str, recording_hash: str, event_time: float, event_type: str, prompt: str | None = None, response: str | None = None, metadata: dict[str, Any] | None = None, catalog: ursa.data_interface.CatalogWriter) ursa.catalog.EventRow[source]

Register a time-stamped event within a recording. Idempotent on re-register.