ursa.catalog.catalog¶
Verb taxonomy¶
- meth:
Catalog.scan— raw :class:pyarrow.Table. Zero-copy escape hatch for callers that want to handle conversion themselves.
- meth:
Catalog.query— generic typed rows. Use when the table name is dynamic.
list_<table>/get_<table>— sugar overquery/getwithrow_clsbaked in. Default for new code.add_<table>/add_<table>s— typed write entry. Raises- class:
CatalogPKConflicton duplicate primary key.
upsert_<table>— overwrite by primary key. The only path that mutates an existing row.- meth:
Catalog.append_unchecked— explicit opt-in for ingestion fast-paths that have already deduped. Use only if you know what you’re doing.
- meth:
Catalog.delete— raisesNotImplementedErrorin M2; driven by ENG-1069 (lifecycle GC +ursa.querydeletion semantics).
Single-writer¶
The catalog assumes one writer per URI. Concurrent first-time
- meth:
Catalog.local/ :meth:Catalog.from_storeagainst an empty catalog may race during table creation. M2 has one ingestion process so this isn’t a live bug; multi-writer support is a follow-up if it becomes a need.
Serialization¶
Rows are dumped with model_dump(mode="python") end-to-end so
datetime and timedelta stay typed (no ISO-string churn at the
Arrow boundary). Top-level Pydantic extras ride in a JSON
__pydantic_extra__ column. Nested submodels (:class:TimeWindow,
- class:
EmbeddingSource) reject extras at write time even though Pydantic allows them — Arrow struct columns are fixed-schema, so unknown nested keys cannot be persisted.
Metadata pushdown¶
MetadataDict columns (metadata, device_info, channel_spec)
serialize to JSON strings in M2. Filters of the form
{"metadata.<key>": ...} raise
- class:
MetadataPushdownNotImplemented; tracked by ENG-1066 (LanceMapType+ hot-key promotion).
Lance-backed catalog of Ursa’s nine tables.
The :class:Catalog exposes a generic core (:meth:Catalog.add,
- meth:
Catalog.upsert, :meth:Catalog.append_unchecked,- meth:
Catalog.get, :meth:Catalog.scan, :meth:Catalog.query,- meth:
Catalog.count, :meth:Catalog.delete, :meth:Catalog.tables) under thin per-table typed wrappers (add_recording,list_recordings, etc.). The wrappers exist for ergonomics; the generic surface is the path :mod:ursa.query(ENG-889) sits on.
Module Contents¶
Classes¶
Lance-backed catalog. See module docstring for the verb taxonomy. |
Functions¶
Return the JSON-encoded MetadataDict columns for a row class. |
|
Render a WHERE clause matching any PK tuple in |
|
Best-effort JSON encoder for values that |
Data¶
API¶
- ursa.catalog.catalog.__all__¶
[‘Catalog’]
- ursa.catalog.catalog.T¶
‘TypeVar(…)’
- ursa.catalog.catalog._FIXED_STRUCT_SUBMODELS: tuple[type[pydantic.BaseModel], ...]¶
()
- class ursa.catalog.catalog.Catalog(db: Any, *, uri: str)[source]¶
Lance-backed catalog. See module docstring for the verb taxonomy.
Initialization
- TABLES: tuple[str, ...]¶
None
- classmethod local(root: str | pathlib.Path) ursa.catalog.catalog.Catalog[source]¶
Open (or create) a catalog under a local directory.
rootis the root of the catalog itself — tables land at<root>/<table_name>.lance. The directory is created on first use; subsequent opens are idempotent.
- classmethod from_store(store: ursa.store.base.ObjectStore) ursa.catalog.catalog.Catalog[source]¶
Open a catalog backed by an :class:
ObjectStore.Reads
(uri, storage_options)from- Meth:
ObjectStore.lance_connectionand connects lancedb at<uri>/catalog. The catalog is the only consumer of this sub-namespace; other Ursa data (Zarr arrays, raw blobs) lives under sibling prefixes within the same store.
- property uri: str¶
The lancedb URI this catalog is rooted at.
- add(table: str, rows: collections.abc.Iterable[ursa.catalog.schemas.CatalogRow]) int[source]¶
Append rows. Raises :class:
CatalogPKConflicton duplicate PK.
- upsert(table: str, rows: collections.abc.Iterable[ursa.catalog.schemas.CatalogRow]) int[source]¶
Overwrite by primary key. The only path that mutates an existing row.
- append_unchecked(table: str, rows: collections.abc.Iterable[ursa.catalog.schemas.CatalogRow]) int[source]¶
Skip the PK check and append. Use only if the caller has already deduped — duplicates corrupt the catalog’s lineage assumptions.
- get(table: str, **pk: Any) ursa.catalog.schemas.CatalogRow | None[source]¶
Fetch the row matching the given primary-key fields, or
None.
- scan(table: str, *, where: str | collections.abc.Mapping[str, Any] | None = None, columns: list[str] | None = None, limit: int | None = None) pyarrow.Table[source]¶
Return a raw :class:
pyarrow.Table. Zero-copy escape hatch.
- query(table: str, *, row_cls: type[ursa.catalog.catalog.T], where: str | collections.abc.Mapping[str, Any] | None = None, limit: int | None = None) list[ursa.catalog.catalog.T][source]¶
Return validated rows. Use when the table name is dynamic.
- count(table: str, *, where: str | collections.abc.Mapping[str, Any] | None = None) int[source]¶
Count rows, optionally filtered.
- _scan_arrow(table: str, *, where: str | collections.abc.Mapping[str, Any] | None = None, columns: list[str] | None = None, limit: int | None = None) pyarrow.Table[source]¶
- _rows_to_arrow(table: str, rows: list[ursa.catalog.schemas.CatalogRow]) pyarrow.Table[source]¶
- _arrow_to_rows(arrow: pyarrow.Table, row_cls: type[ursa.catalog.catalog.T]) list[ursa.catalog.catalog.T][source]¶
- _reject_existing_pks(table: str, rows: list[ursa.catalog.schemas.CatalogRow]) None[source]¶
- static _reject_nested_extras(row: ursa.catalog.schemas.CatalogRow) None[source]¶
Walk a row’s declared submodel fields and reject Pydantic extras.
Arrow struct columns are fixed-schema; submodels with unknown keys can’t be persisted. Raised as :class:
CatalogSchemaErrorrather than letting PyArrow surface a less specificArrowInvalid.Iterates
model_fieldsrather thanvars(row)so we don’t accidentally walk Pydantic internals (__pydantic_extra__,__pydantic_fields_set__) or any non-model attribute attached to the instance — only fields declared on the row class.
- add_participant(row: ursa.catalog.schemas.ParticipantRow) None[source]¶
- add_participants(rows: collections.abc.Iterable[ursa.catalog.schemas.ParticipantRow]) None[source]¶
- upsert_participant(row: ursa.catalog.schemas.ParticipantRow) None[source]¶
- upsert_participants(rows: collections.abc.Iterable[ursa.catalog.schemas.ParticipantRow]) None[source]¶
- get_participant(participant_id: str) ursa.catalog.schemas.ParticipantRow | None[source]¶
- list_participants(*, where: collections.abc.Mapping[str, Any] | None = None, limit: int | None = None) list[ursa.catalog.schemas.ParticipantRow][source]¶
- add_recording(row: ursa.catalog.schemas.RecordingRow) None[source]¶
- add_recordings(rows: collections.abc.Iterable[ursa.catalog.schemas.RecordingRow]) None[source]¶
- upsert_recording(row: ursa.catalog.schemas.RecordingRow) None[source]¶
- upsert_recordings(rows: collections.abc.Iterable[ursa.catalog.schemas.RecordingRow]) None[source]¶
- get_recording(recording_hash: str) ursa.catalog.schemas.RecordingRow | None[source]¶
- list_recordings(*, where: collections.abc.Mapping[str, Any] | None = None, limit: int | None = None) list[ursa.catalog.schemas.RecordingRow][source]¶
- add_modality(row: ursa.catalog.schemas.ModalityRow) None[source]¶
- add_modalities(rows: collections.abc.Iterable[ursa.catalog.schemas.ModalityRow]) None[source]¶
- upsert_modality(row: ursa.catalog.schemas.ModalityRow) None[source]¶
- upsert_modalities(rows: collections.abc.Iterable[ursa.catalog.schemas.ModalityRow]) None[source]¶
- get_modality(recording_hash: str, modality: str) ursa.catalog.schemas.ModalityRow | None[source]¶
- list_modalities(*, where: collections.abc.Mapping[str, Any] | None = None, limit: int | None = None) list[ursa.catalog.schemas.ModalityRow][source]¶
- add_event(row: ursa.catalog.schemas.EventRow) None[source]¶
- add_events(rows: collections.abc.Iterable[ursa.catalog.schemas.EventRow]) None[source]¶
- upsert_event(row: ursa.catalog.schemas.EventRow) None[source]¶
- upsert_events(rows: collections.abc.Iterable[ursa.catalog.schemas.EventRow]) None[source]¶
- get_event(event_id: str) ursa.catalog.schemas.EventRow | None[source]¶
- list_events(*, where: collections.abc.Mapping[str, Any] | None = None, limit: int | None = None) list[ursa.catalog.schemas.EventRow][source]¶
- add_embedding(row: ursa.catalog.schemas.EmbeddingRow) None[source]¶
- add_embeddings(rows: collections.abc.Iterable[ursa.catalog.schemas.EmbeddingRow]) None[source]¶
- upsert_embedding(row: ursa.catalog.schemas.EmbeddingRow) None[source]¶
- upsert_embeddings(rows: collections.abc.Iterable[ursa.catalog.schemas.EmbeddingRow]) None[source]¶
- get_embedding(embedding_id: str) ursa.catalog.schemas.EmbeddingRow | None[source]¶
- list_embeddings(*, where: collections.abc.Mapping[str, Any] | None = None, limit: int | None = None) list[ursa.catalog.schemas.EmbeddingRow][source]¶
- add_virgo_asset(row: ursa.catalog.schemas.VirgoAssetRow) None[source]¶
- add_virgo_assets(rows: collections.abc.Iterable[ursa.catalog.schemas.VirgoAssetRow]) None[source]¶
- upsert_virgo_asset(row: ursa.catalog.schemas.VirgoAssetRow) None[source]¶
- upsert_virgo_assets(rows: collections.abc.Iterable[ursa.catalog.schemas.VirgoAssetRow]) None[source]¶
- get_virgo_asset(asset_id: str) ursa.catalog.schemas.VirgoAssetRow | None[source]¶
- list_virgo_assets(*, where: collections.abc.Mapping[str, Any] | None = None, limit: int | None = None) list[ursa.catalog.schemas.VirgoAssetRow][source]¶
- add_checkpoint(row: ursa.catalog.schemas.CheckpointRow) None[source]¶
- add_checkpoints(rows: collections.abc.Iterable[ursa.catalog.schemas.CheckpointRow]) None[source]¶
- upsert_checkpoint(row: ursa.catalog.schemas.CheckpointRow) None[source]¶
- upsert_checkpoints(rows: collections.abc.Iterable[ursa.catalog.schemas.CheckpointRow]) None[source]¶
- get_checkpoint(checkpoint_id: str) ursa.catalog.schemas.CheckpointRow | None[source]¶
- list_checkpoints(*, where: collections.abc.Mapping[str, Any] | None = None, limit: int | None = None) list[ursa.catalog.schemas.CheckpointRow][source]¶
- add_benchmark_suite(row: ursa.catalog.schemas.BenchmarkSuiteRow) None[source]¶
- add_benchmark_suites(rows: collections.abc.Iterable[ursa.catalog.schemas.BenchmarkSuiteRow]) None[source]¶
- upsert_benchmark_suite(row: ursa.catalog.schemas.BenchmarkSuiteRow) None[source]¶
- upsert_benchmark_suites(rows: collections.abc.Iterable[ursa.catalog.schemas.BenchmarkSuiteRow]) None[source]¶
- get_benchmark_suite(suite_name: str, suite_version: int) ursa.catalog.schemas.BenchmarkSuiteRow | None[source]¶
- list_benchmark_suites(*, where: collections.abc.Mapping[str, Any] | None = None, limit: int | None = None) list[ursa.catalog.schemas.BenchmarkSuiteRow][source]¶
- add_benchmark_result(row: ursa.catalog.schemas.BenchmarkResultRow) None[source]¶
- add_benchmark_results(rows: collections.abc.Iterable[ursa.catalog.schemas.BenchmarkResultRow]) None[source]¶
- upsert_benchmark_result(row: ursa.catalog.schemas.BenchmarkResultRow) None[source]¶
- upsert_benchmark_results(rows: collections.abc.Iterable[ursa.catalog.schemas.BenchmarkResultRow]) None[source]¶
- get_benchmark_result(result_id: str) ursa.catalog.schemas.BenchmarkResultRow | None[source]¶
- list_benchmark_results(*, where: collections.abc.Mapping[str, Any] | None = None, limit: int | None = None) list[ursa.catalog.schemas.BenchmarkResultRow][source]¶
- ursa.catalog.catalog._JSON_COLS_BY_CLASS: dict[type[ursa.catalog.schemas.CatalogRow], frozenset[str]]¶
None
- ursa.catalog.catalog._json_cols_for_class(row_cls: type[ursa.catalog.schemas.CatalogRow]) frozenset[str][source]¶
Return the JSON-encoded MetadataDict columns for a row class.
- ursa.catalog.catalog._pk_in_clause(pk_cols: list[str], keys: list[tuple[Any, ...]]) str[source]¶
Render a WHERE clause matching any PK tuple in
keys.Single-column PKs render as
col IN (lit, lit, ...); composite PKs render as(c1 = v1 AND c2 = v2) OR (...) ....
- ursa.catalog.catalog._json_default(value: Any) Any[source]¶
Best-effort JSON encoder for values that
json.dumpsdoesn’t natively handle. Used only on the__pydantic_extra__column, where extras from forward-compat reads may carry oddball types.Encoding conventions (decoders must mirror these):
datetime/date→ ISO 8601 string.timedelta→ integer microseconds. Usetimedelta(microseconds=n)to decode. Microseconds rather thantotal_seconds()so sub-second precision survives the JSON round-trip without floating-point churn.Pydantic
BaseModel→model_dump(mode="json").