ursa.data_interface¶
Credential isolation¶
A fresh :class:DataInterface opens against the assets_ro
credential role only. Calling any register_* method before
- meth:
enable_writesraises :class:~ursa.catalog.WritesNotEnabled— production callers must explicitly request theassets_rwrole.- meth:
enable_writeslazily builds a second :class:Cataloghandle againstassets_rw. The public read surface (query,list_*,get_*, :attr:catalog) always uses the RO handle.register_*runs on the RW handle — including its internal idempotencyget_*lookup, so the check-and-write happen under a single credential and the read-modify-write race window stays tight.
The credential-boundary story is R2-only. profile="local" has
no credential boundary to enforce: the local lance dataset has a
single on-disk URI, so :meth:enable_writes is just a permission flag
flip — reads and writes share the same handle, which
- meth:
_write_catalogresolves by falling back to :attr:catalogwhen_rw_catalogisNone. Tests treat local as the fast fixture for the idempotency / diff machinery; the credential split is exercised against R2 stubs intests/test_data_interface.py.
Lazy construction¶
R2 profiles do not resolve 1Password credentials at construction; the
underlying :class:Catalog is built on first access to :attr:catalog
(reads) or :meth:enable_writes (writes). Offline / sandboxed
environments (CI, doc builds) can import and instantiate
- class:
DataInterfacewithout anopsession. Argument validation (profilevalue,pathconstraints) runs eagerly so caller mistakes surface immediately.
File layout¶
DataInterface class + public API
CatalogWriter Protocol (private, test seam for tests/register/conftest.py)
Query implementation (inlined from former ursa.query)
Get implementation (inlined from former ursa.get)
Download implementation (inlined from former ursa.download)
Register implementations (inlined from former ursa.register)
User-facing class surface for Ursa (ENG-1108 / ENG-1119 / ENG-1120).
- class:
DataInterfacepackages the three M2 read verbs (query / get / download), the four register write verbs (participant / recording / modality / event), and the typed per-tablelist_<table>/get_<table>lookups behind a single object that owns its- class:
~ursa.catalog.Cataloghandles.
Module Contents¶
Classes¶
Class-based entry point for Ursa’s read + register API. |
|
Get + insert contract that :class: |
|
One (store, source key, destination path) tuple produced by the plan phase and consumed by the execute phase. |
Functions¶
Filter the Ursa catalog and return matching recordings. |
|
If |
|
Equality on every key in |
|
Fetch the modality rows for |
|
Raise if any matched modality is still raw and the spec asked for processed-only behavior. The first offending (modality, recording, kwarg) is reported so the message points at a concrete row. |
|
Names of the processed-only kwargs that are non-None / non-empty. |
|
Materialize |
|
Build a single :class: |
|
Project |
|
Read all segment files under |
|
Raise |
|
Stream raw-modality bytes for |
|
Enumerate every write before any I/O. |
|
Raise if |
|
Compute the on-disk destination for one source segment. |
|
Copy bytes from |
|
Build the (field, value) tuple |
|
Field-by-field diff of two same-class rows. Empty dict if equal. |
|
Register a new participant. Idempotent on re-register. |
|
Register a recording session. Idempotent on re-register. |
|
Register a data stream within a recording. Idempotent on re-register. |
|
Register a time-stamped event within a recording. Idempotent on re-register. |
Data¶
API¶
- ursa.data_interface.__all__¶
[‘DataInterface’]
- ursa.data_interface._Profile¶
None
- ursa.data_interface._BACKFILL_TICKET¶
‘ENG-1082’
- ursa.data_interface._PROCESSED_KWARGS¶
(‘time_range’, ‘pipeline_version’, ‘time_filters’, ‘metadata_filters’, ‘derived’)
- ursa.data_interface._PROCESSED_PATH_TICKET¶
‘ENG-1093’
- ursa.data_interface._LayoutMode¶
None
- ursa.data_interface._VALID_LAYOUTS: frozenset[str]¶
‘frozenset(…)’
- ursa.data_interface._STREAM_CHUNK¶
None
- ursa.data_interface._RECORDING_ROW_FIELDS: tuple[str, ...]¶
(‘recording_hash’, ‘participant_ids’, ‘start_time’, ‘duration’, ‘device_info’, ‘metadata’)
- class ursa.data_interface.DataInterface(profile: str = 'r2', *, path: str | pathlib.Path | None = None)[source]¶
Class-based entry point for Ursa’s read + register API.
Construction validates arguments but defers credential resolution (R2 profiles) until first use. Call :meth:
enable_writesonce before invoking anyregister_*method; that promotion is idempotent.Parameters
profile
"r2"(default) for the production catalog,"r2-test"for the testing bucket,"local"for an on-disk lance directory. Typedstr(notLiteral) so callers can pass values read fromos.environ/ config files without acastor# type: ignore— invalid strings raise :class:ValueErrorat construction. path Required whenprofile="local". Forbidden for R2 profiles — their URI is resolved from the configured object store.Initialization
- property catalog: ursa.catalog.Catalog¶
The read-only catalog handle.
Constructed on first access; subsequent accesses return the cached handle. Always returns the RO catalog handle for R2 profiles, even after :meth:
enable_writes— credential isolation is enforced andregister_*methods use a separate write handle internally. Forprofile="local"there is no credential boundary to enforce; reads and writes share this single handle (see :meth:_write_catalog).
- property writes_enabled: bool¶
- enable_writes() None[source]¶
Open the
assets_rwwrite handle.Idempotent — a second call is a no-op. For
profile="local", local lance has no credential distinction, so the bool flip is the only observable effect;register_*methods fall back to the read handle.
- _write_catalog() ursa.catalog.Catalog[source]¶
The write-capable catalog handle.
Raises :class:
WritesNotEnabledif :meth:enable_writeshasn’t been called. Forprofile="local", falls back to the same underlying handle as :attr:catalog— there is no credential distinction to enforce locally.register_*callers route both the idempotencyget_*check and theadd_*write through this handle so the check-and-write run under one credential.Tests that bypass the register-layer idempotency for fixture-setup reasons may also use this handle for raw seeding via
add_*— this is a deliberate test seam, not a public API. Stable across :class:DataInterfaceevolution only insofar as it’s required by the in-repo register-test suite.
- query(spec: ursa._query_types.QuerySpec | None = None, **kwargs: Any) ursa._query_types.QueryResultList[source]¶
Filter the Ursa catalog and return matching recordings.
See :func:
_queryfor parameter documentation. This method delegates to the inlined implementation with the RO catalog handle.
- get(target: ursa._query_types.QueryResult | collections.abc.Iterable[ursa._query_types.QueryResult], *, concat: bool = False) ursa.temporal.Data | list[ursa.temporal.Data][source]¶
Materialize
targetinto in-memory :class:Dataobjects.Threads this interface’s
profileto the segment-store resolution. Overloaded so static callers getDatafor a single :class:QueryResultandlist[Data]for an iterable — preserving the typing contract from the deletedursa.get.
- download(target: ursa._query_types.QueryResult | collections.abc.Iterable[ursa._query_types.QueryResult], dest: str | os.PathLike[str], *, layout: ursa.data_interface._LayoutMode = 'by_recording', overwrite: bool = False) list[pathlib.Path][source]¶
Stream raw-modality bytes for
targetto disk underdest.Threads this interface’s
profileto the segment-store resolution.
- register_participant(*, participant_id: str, enrolled_at: int | datetime.datetime, metadata: dict[str, Any] | None = None) ursa.catalog.ParticipantRow[source]¶
- register_recording(*, recording_hash: str, participant_ids: list[str], start_time: int | datetime.datetime, duration: datetime.timedelta, device_info: dict[str, Any] | None = None, metadata: dict[str, Any] | None = None) ursa.catalog.RecordingRow[source]¶
- register_modality(*, recording_hash: str, modality: str, raw_storage_uri: str, storage_uri: str | None = None, ingestion_status: ursa.catalog.IngestionStatus = IngestionStatus.RAW, format: ursa.catalog.StorageFormat | None = None, domain_intervals: list[tuple[float, float]] | None = None, sampling_rate: float | None = None, channel_spec: dict[str, Any] | None = None, metadata: dict[str, Any] | None = None) ursa.catalog.ModalityRow[source]¶
- register_event(*, event_id: str, recording_hash: str, event_time: float, event_type: str, prompt: str | None = None, response: str | None = None, metadata: dict[str, Any] | None = None) ursa.catalog.EventRow[source]¶
- class ursa.data_interface.CatalogWriter[source]¶
Bases:
typing.ProtocolGet + insert contract that :class:
ursa.catalog.Catalogsatisfies.@runtime_checkableso the structural-conformance test intests/register/test_catalog_protocol_structural.pycanisinstance(catalog, CatalogWriter)— catches anyadd_*/get_*rename in :class:Catalogat unit-test time.
- ursa.data_interface._FILTER_KWARG_NAMES¶
(‘participants’, ‘modalities’, ‘recording_hash’, ‘metadata’, ‘time_range’, ‘pipeline_version’, ‘time…
- ursa.data_interface._query(spec: ursa._query_types.QuerySpec | None = None, *, catalog: ursa.catalog.Catalog, participants: list[ursa.catalog.schemas.CatalogID] | None = None, modalities: list[ursa.catalog.schemas.ModalityName] | None = None, recording_hash: ursa.catalog.schemas.CatalogID | None = None, metadata: ursa.catalog.schemas.MetadataDict | None = None, time_range: tuple[ursa.catalog.schemas.EpochNs, ursa.catalog.schemas.EpochNs] | None = None, pipeline_version: str | None = None, time_filters: list[Any] | None = None, metadata_filters: list[Any] | None = None, derived: list[Any] | None = None, limit: int | None = None) ursa._query_types.QueryResultList[source]¶
Filter the Ursa catalog and return matching recordings.
Two surfaces:
Common case — pass kwargs directly:
data.query(participants=[...], modalities=[...]). The kwargs auto-validate against :class:QuerySpec.Complex case — build a :class:
QuerySpecand pass it as the first positional arg:data.query(spec).
Phase 1a returns :class:
QueryResultcarrying catalog projections only — no array bytes. Temporal kwargs andpipeline_versionraise :class:NotImplementedErrorwhen a matched modality hasingestion_status="raw"; the green path ships with ENG-1082.
- ursa.data_interface._reject_kwargs_with_spec(**kwargs: Any) None[source]¶
If
specis supplied, every filter kwarg must be unset.Mixing the two would force us to merge a spec with kwargs, and the merge semantics (override vs. union) are unobvious enough that being strict is cheaper than picking a rule.
- ursa.data_interface._metadata_matches(actual: collections.abc.Mapping[str, Any], expected: collections.abc.Mapping[str, Any]) bool[source]¶
Equality on every key in
expected; missing keys do not match.
- ursa.data_interface._collect_modalities(catalog: ursa.catalog.Catalog, recordings: list[ursa.catalog.RecordingRow], modality_filter: list[ursa.catalog.schemas.ModalityName] | None) dict[ursa.catalog.schemas.CatalogID, list[ursa.catalog.ModalityRow]][source]¶
Fetch the modality rows for
recordings, grouped by recording_hash.Single
list_modalitiescall (no batching in Phase 1a; ENG-1089 covers growth). Empty input short-circuits to avoid an empty-IN-list edge case.
- ursa.data_interface._enforce_processed_gate(spec: ursa._query_types.QuerySpec, by_recording: dict[ursa.catalog.schemas.CatalogID, list[ursa.catalog.ModalityRow]]) None[source]¶
Raise if any matched modality is still raw and the spec asked for processed-only behavior. The first offending (modality, recording, kwarg) is reported so the message points at a concrete row.
- ursa.data_interface._processed_kwargs_set(spec: ursa._query_types.QuerySpec) list[str][source]¶
Names of the processed-only kwargs that are non-None / non-empty.
- ursa.data_interface._get(target: ursa._query_types.QueryResult | collections.abc.Iterable[ursa._query_types.QueryResult], *, concat: bool = False, profile: str | None = None) ursa.temporal.Data | list[ursa.temporal.Data][source]¶
Materialize
targetinto in-memory :class:Dataobjects.
- ursa.data_interface._get_one(qr: ursa._query_types.QueryResult, *, profile: str | None = None) ursa.temporal.Data[source]¶
Build a single :class:
Datafrom one :class:QueryResult.
- ursa.data_interface._recording_row_from(qr: ursa._query_types.QueryResult) ursa.catalog.RecordingRow[source]¶
Project
qronto :class:RecordingRow’s field set.Uses an explicit field tuple so a future :class:
QueryResultschema change breaks here at validation time, not silently inside Pydantic.
- ursa.data_interface._materialize_modality(name: str, mrow: ursa.catalog.ModalityRow, *, profile: str | None = None) ursa.raw.RawBytes[source]¶
Read all segment files under
mrow.raw_storage_uriinto :class:RawBytes.
- ursa.data_interface._validate_alignment(results: list[ursa.temporal.Data]) None[source]¶
Raise
ValueErrorif recordings have mismatched modality sets.
- class ursa.data_interface._PlannedWrite[source]¶
Bases:
typing.NamedTupleOne (store, source key, destination path) tuple produced by the plan phase and consumed by the execute phase.
- store: ursa.store.ObjectStore¶
None
- source_key: str¶
None
- dest_path: pathlib.Path¶
None
- meta_size: int¶
None
- ursa.data_interface._download(target: ursa._query_types.QueryResult | collections.abc.Iterable[ursa._query_types.QueryResult], dest: str | os.PathLike[str], *, layout: ursa.data_interface._LayoutMode = 'by_recording', overwrite: bool = False, profile: str | None = None) list[pathlib.Path][source]¶
Stream raw-modality bytes for
targetto disk underdest.
- ursa.data_interface._plan_writes(qrs: list[ursa._query_types.QueryResult], dest_root: pathlib.Path, *, layout: ursa.data_interface._LayoutMode, overwrite: bool, profile: str | None = None) list[ursa.data_interface._PlannedWrite][source]¶
Enumerate every write before any I/O.
One pass over the inputs produces:
the full ordered list of :class:
_PlannedWritetuples,an intra-call duplicate-destination check that catches collisions across all layouts,
a pre-existing-destination check that batches every collision into one :class:
FileExistsErrorwhenoverwrite=False.
- ursa.data_interface._check_modality_eligibility(modality_name: str, mrow: ursa.catalog.ModalityRow, *, layout: ursa.data_interface._LayoutMode) None[source]¶
Raise if
mrowis not eligible for M2 raw-path download.
- ursa.data_interface._dest_path(dest_root: pathlib.Path, *, layout: ursa.data_interface._LayoutMode, recording_hash: str, modality: str, rel_key: str) pathlib.Path[source]¶
Compute the on-disk destination for one source segment.
- ursa.data_interface._stream_to_disk(store: ursa.store.ObjectStore, key: str, dest: pathlib.Path) None[source]¶
Copy bytes from
store[key]todestvia a temp-file rename.Uses :meth:
ObjectStore.open(forward-only) so multi-GB raw video/audio never sits in memory. A.partrename ensures partial writes are not visible at the canonical destination.
- ursa.data_interface._MISSING: object¶
‘object(…)’
- ursa.data_interface._pk_tuple_from_row(row: ursa.catalog.CatalogRow) tuple[tuple[str, object], ...][source]¶
Build the (field, value) tuple
CatalogRowExistsexpects from a row.
- ursa.data_interface._diff_rows(existing: ursa.catalog.CatalogRow, candidate: ursa.catalog.CatalogRow) dict[str, tuple[object, object]][source]¶
Field-by-field diff of two same-class rows. Empty dict if equal.
- ursa.data_interface._register_participant(*, participant_id: str, enrolled_at: int | datetime.datetime, metadata: dict[str, Any] | None = None, catalog: ursa.data_interface.CatalogWriter) ursa.catalog.ParticipantRow[source]¶
Register a new participant. Idempotent on re-register.
- ursa.data_interface._register_recording(*, recording_hash: str, participant_ids: list[str], start_time: int | datetime.datetime, duration: datetime.timedelta, device_info: dict[str, Any] | None = None, metadata: dict[str, Any] | None = None, catalog: ursa.data_interface.CatalogWriter) ursa.catalog.RecordingRow[source]¶
Register a recording session. Idempotent on re-register.
- ursa.data_interface._register_modality(*, recording_hash: str, modality: str, raw_storage_uri: str, storage_uri: str | None = None, ingestion_status: ursa.catalog.IngestionStatus = IngestionStatus.RAW, format: ursa.catalog.StorageFormat | None = None, domain_intervals: list[tuple[float, float]] | None = None, sampling_rate: float | None = None, channel_spec: dict[str, Any] | None = None, metadata: dict[str, Any] | None = None, catalog: ursa.data_interface.CatalogWriter) ursa.catalog.ModalityRow[source]¶
Register a data stream within a recording. Idempotent on re-register.
- ursa.data_interface._register_event(*, event_id: str, recording_hash: str, event_time: float, event_type: str, prompt: str | None = None, response: str | None = None, metadata: dict[str, Any] | None = None, catalog: ursa.data_interface.CatalogWriter) ursa.catalog.EventRow[source]¶
Register a time-stamped event within a recording. Idempotent on re-register.