ursa.catalog.catalog

Verb taxonomy

  • meth:

    Catalog.scan — raw :class:pyarrow.Table. Zero-copy escape hatch for callers that want to handle conversion themselves.

  • meth:

    Catalog.query — generic typed rows. Use when the table name is dynamic.

  • list_<table> / get_<table> — sugar over query/get with row_cls baked in. Default for new code.

  • add_<table> / add_<table>s — typed write entry. Raises

    class:

    CatalogPKConflict on duplicate primary key.

  • upsert_<table> — overwrite by primary key. The only path that mutates an existing row.

  • meth:

    Catalog.append_unchecked — explicit opt-in for ingestion fast-paths that have already deduped. Use only if you know what you’re doing.

  • meth:

    Catalog.delete — raises NotImplementedError in M2; driven by ENG-1069 (lifecycle GC + ursa.query deletion semantics).

Single-writer

The catalog assumes one writer per URI. Concurrent first-time

meth:

Catalog.local / :meth:Catalog.from_store against an empty catalog may race during table creation. M2 has one ingestion process so this isn’t a live bug; multi-writer support is a follow-up if it becomes a need.

Serialization

Rows are dumped with model_dump(mode="python") end-to-end so datetime and timedelta stay typed (no ISO-string churn at the Arrow boundary). Top-level Pydantic extras ride in a JSON __pydantic_extra__ column. Nested submodels (:class:TimeWindow,

class:

EmbeddingSource) reject extras at write time even though Pydantic allows them — Arrow struct columns are fixed-schema, so unknown nested keys cannot be persisted.

Metadata pushdown

MetadataDict columns (metadata, device_info, channel_spec) serialize to JSON strings in M2. Filters of the form {"metadata.<key>": ...} raise

class:

MetadataPushdownNotImplemented; tracked by ENG-1066 (Lance MapType + hot-key promotion).

Lance-backed catalog of Ursa’s nine tables.

The :class:Catalog exposes a generic core (:meth:Catalog.add,

meth:

Catalog.upsert, :meth:Catalog.append_unchecked,

meth:

Catalog.get, :meth:Catalog.scan, :meth:Catalog.query,

meth:

Catalog.count, :meth:Catalog.delete, :meth:Catalog.tables) under thin per-table typed wrappers (add_recording, list_recordings, etc.). The wrappers exist for ergonomics; the generic surface is the path :mod:ursa.query (ENG-889) sits on.

Module Contents

Classes

Catalog

Lance-backed catalog. See module docstring for the verb taxonomy.

Functions

_json_cols_for_class

Return the JSON-encoded MetadataDict columns for a row class.

_pk_in_clause

Render a WHERE clause matching any PK tuple in keys.

_json_default

Best-effort JSON encoder for values that json.dumps doesn’t natively handle. Used only on the __pydantic_extra__ column, where extras from forward-compat reads may carry oddball types.

Data

API

ursa.catalog.catalog.__all__

[‘Catalog’]

ursa.catalog.catalog.T

‘TypeVar(…)’

ursa.catalog.catalog._FIXED_STRUCT_SUBMODELS: tuple[type[pydantic.BaseModel], ...]

()

class ursa.catalog.catalog.Catalog(db: Any, *, uri: str)[source]

Lance-backed catalog. See module docstring for the verb taxonomy.

Initialization

TABLES: tuple[str, ...]

None

classmethod local(root: str | pathlib.Path) ursa.catalog.catalog.Catalog[source]

Open (or create) a catalog under a local directory.

root is the root of the catalog itself — tables land at <root>/<table_name>.lance. The directory is created on first use; subsequent opens are idempotent.

classmethod from_store(store: ursa.store.base.ObjectStore) ursa.catalog.catalog.Catalog[source]

Open a catalog backed by an :class:ObjectStore.

Reads (uri, storage_options) from

Meth:

ObjectStore.lance_connection and connects lancedb at <uri>/catalog. The catalog is the only consumer of this sub-namespace; other Ursa data (Zarr arrays, raw blobs) lives under sibling prefixes within the same store.

_ensure_tables() None[source]

Idempotently create every table at its declared Arrow schema.

property uri: str

The lancedb URI this catalog is rooted at.

tables() list[str][source]

Return the canonical list of catalog table names.

add(table: str, rows: collections.abc.Iterable[ursa.catalog.schemas.CatalogRow]) int[source]

Append rows. Raises :class:CatalogPKConflict on duplicate PK.

upsert(table: str, rows: collections.abc.Iterable[ursa.catalog.schemas.CatalogRow]) int[source]

Overwrite by primary key. The only path that mutates an existing row.

append_unchecked(table: str, rows: collections.abc.Iterable[ursa.catalog.schemas.CatalogRow]) int[source]

Skip the PK check and append. Use only if the caller has already deduped — duplicates corrupt the catalog’s lineage assumptions.

get(table: str, **pk: Any) ursa.catalog.schemas.CatalogRow | None[source]

Fetch the row matching the given primary-key fields, or None.

scan(table: str, *, where: str | collections.abc.Mapping[str, Any] | None = None, columns: list[str] | None = None, limit: int | None = None) pyarrow.Table[source]

Return a raw :class:pyarrow.Table. Zero-copy escape hatch.

query(table: str, *, row_cls: type[ursa.catalog.catalog.T], where: str | collections.abc.Mapping[str, Any] | None = None, limit: int | None = None) list[ursa.catalog.catalog.T][source]

Return validated rows. Use when the table name is dynamic.

count(table: str, *, where: str | collections.abc.Mapping[str, Any] | None = None) int[source]

Count rows, optionally filtered.

abstractmethod delete(table: str, **pk: Any) None[source]
_scan_arrow(table: str, *, where: str | collections.abc.Mapping[str, Any] | None = None, columns: list[str] | None = None, limit: int | None = None) pyarrow.Table[source]
_rows_to_arrow(table: str, rows: list[ursa.catalog.schemas.CatalogRow]) pyarrow.Table[source]
_arrow_to_rows(arrow: pyarrow.Table, row_cls: type[ursa.catalog.catalog.T]) list[ursa.catalog.catalog.T][source]
_reject_existing_pks(table: str, rows: list[ursa.catalog.schemas.CatalogRow]) None[source]
static _reject_nested_extras(row: ursa.catalog.schemas.CatalogRow) None[source]

Walk a row’s declared submodel fields and reject Pydantic extras.

Arrow struct columns are fixed-schema; submodels with unknown keys can’t be persisted. Raised as :class:CatalogSchemaError rather than letting PyArrow surface a less specific ArrowInvalid.

Iterates model_fields rather than vars(row) so we don’t accidentally walk Pydantic internals (__pydantic_extra__, __pydantic_fields_set__) or any non-model attribute attached to the instance — only fields declared on the row class.

static _validate_pk_kwargs(table: str, pk: collections.abc.Mapping[str, Any]) None[source]
add_participant(row: ursa.catalog.schemas.ParticipantRow) None[source]
add_participants(rows: collections.abc.Iterable[ursa.catalog.schemas.ParticipantRow]) None[source]
upsert_participant(row: ursa.catalog.schemas.ParticipantRow) None[source]
upsert_participants(rows: collections.abc.Iterable[ursa.catalog.schemas.ParticipantRow]) None[source]
get_participant(participant_id: str) ursa.catalog.schemas.ParticipantRow | None[source]
list_participants(*, where: collections.abc.Mapping[str, Any] | None = None, limit: int | None = None) list[ursa.catalog.schemas.ParticipantRow][source]
add_recording(row: ursa.catalog.schemas.RecordingRow) None[source]
add_recordings(rows: collections.abc.Iterable[ursa.catalog.schemas.RecordingRow]) None[source]
upsert_recording(row: ursa.catalog.schemas.RecordingRow) None[source]
upsert_recordings(rows: collections.abc.Iterable[ursa.catalog.schemas.RecordingRow]) None[source]
get_recording(recording_hash: str) ursa.catalog.schemas.RecordingRow | None[source]
list_recordings(*, where: collections.abc.Mapping[str, Any] | None = None, limit: int | None = None) list[ursa.catalog.schemas.RecordingRow][source]
add_modality(row: ursa.catalog.schemas.ModalityRow) None[source]
add_modalities(rows: collections.abc.Iterable[ursa.catalog.schemas.ModalityRow]) None[source]
upsert_modality(row: ursa.catalog.schemas.ModalityRow) None[source]
upsert_modalities(rows: collections.abc.Iterable[ursa.catalog.schemas.ModalityRow]) None[source]
get_modality(recording_hash: str, modality: str) ursa.catalog.schemas.ModalityRow | None[source]
list_modalities(*, where: collections.abc.Mapping[str, Any] | None = None, limit: int | None = None) list[ursa.catalog.schemas.ModalityRow][source]
add_event(row: ursa.catalog.schemas.EventRow) None[source]
add_events(rows: collections.abc.Iterable[ursa.catalog.schemas.EventRow]) None[source]
upsert_event(row: ursa.catalog.schemas.EventRow) None[source]
upsert_events(rows: collections.abc.Iterable[ursa.catalog.schemas.EventRow]) None[source]
get_event(event_id: str) ursa.catalog.schemas.EventRow | None[source]
list_events(*, where: collections.abc.Mapping[str, Any] | None = None, limit: int | None = None) list[ursa.catalog.schemas.EventRow][source]
add_embedding(row: ursa.catalog.schemas.EmbeddingRow) None[source]
add_embeddings(rows: collections.abc.Iterable[ursa.catalog.schemas.EmbeddingRow]) None[source]
upsert_embedding(row: ursa.catalog.schemas.EmbeddingRow) None[source]
upsert_embeddings(rows: collections.abc.Iterable[ursa.catalog.schemas.EmbeddingRow]) None[source]
get_embedding(embedding_id: str) ursa.catalog.schemas.EmbeddingRow | None[source]
list_embeddings(*, where: collections.abc.Mapping[str, Any] | None = None, limit: int | None = None) list[ursa.catalog.schemas.EmbeddingRow][source]
add_virgo_asset(row: ursa.catalog.schemas.VirgoAssetRow) None[source]
add_virgo_assets(rows: collections.abc.Iterable[ursa.catalog.schemas.VirgoAssetRow]) None[source]
upsert_virgo_asset(row: ursa.catalog.schemas.VirgoAssetRow) None[source]
upsert_virgo_assets(rows: collections.abc.Iterable[ursa.catalog.schemas.VirgoAssetRow]) None[source]
get_virgo_asset(asset_id: str) ursa.catalog.schemas.VirgoAssetRow | None[source]
list_virgo_assets(*, where: collections.abc.Mapping[str, Any] | None = None, limit: int | None = None) list[ursa.catalog.schemas.VirgoAssetRow][source]
add_checkpoint(row: ursa.catalog.schemas.CheckpointRow) None[source]
add_checkpoints(rows: collections.abc.Iterable[ursa.catalog.schemas.CheckpointRow]) None[source]
upsert_checkpoint(row: ursa.catalog.schemas.CheckpointRow) None[source]
upsert_checkpoints(rows: collections.abc.Iterable[ursa.catalog.schemas.CheckpointRow]) None[source]
get_checkpoint(checkpoint_id: str) ursa.catalog.schemas.CheckpointRow | None[source]
list_checkpoints(*, where: collections.abc.Mapping[str, Any] | None = None, limit: int | None = None) list[ursa.catalog.schemas.CheckpointRow][source]
add_benchmark_suite(row: ursa.catalog.schemas.BenchmarkSuiteRow) None[source]
add_benchmark_suites(rows: collections.abc.Iterable[ursa.catalog.schemas.BenchmarkSuiteRow]) None[source]
upsert_benchmark_suite(row: ursa.catalog.schemas.BenchmarkSuiteRow) None[source]
upsert_benchmark_suites(rows: collections.abc.Iterable[ursa.catalog.schemas.BenchmarkSuiteRow]) None[source]
get_benchmark_suite(suite_name: str, suite_version: int) ursa.catalog.schemas.BenchmarkSuiteRow | None[source]
list_benchmark_suites(*, where: collections.abc.Mapping[str, Any] | None = None, limit: int | None = None) list[ursa.catalog.schemas.BenchmarkSuiteRow][source]
add_benchmark_result(row: ursa.catalog.schemas.BenchmarkResultRow) None[source]
add_benchmark_results(rows: collections.abc.Iterable[ursa.catalog.schemas.BenchmarkResultRow]) None[source]
upsert_benchmark_result(row: ursa.catalog.schemas.BenchmarkResultRow) None[source]
upsert_benchmark_results(rows: collections.abc.Iterable[ursa.catalog.schemas.BenchmarkResultRow]) None[source]
get_benchmark_result(result_id: str) ursa.catalog.schemas.BenchmarkResultRow | None[source]
list_benchmark_results(*, where: collections.abc.Mapping[str, Any] | None = None, limit: int | None = None) list[ursa.catalog.schemas.BenchmarkResultRow][source]
ursa.catalog.catalog._JSON_COLS_BY_CLASS: dict[type[ursa.catalog.schemas.CatalogRow], frozenset[str]]

None

ursa.catalog.catalog._json_cols_for_class(row_cls: type[ursa.catalog.schemas.CatalogRow]) frozenset[str][source]

Return the JSON-encoded MetadataDict columns for a row class.

ursa.catalog.catalog._pk_in_clause(pk_cols: list[str], keys: list[tuple[Any, ...]]) str[source]

Render a WHERE clause matching any PK tuple in keys.

Single-column PKs render as col IN (lit, lit, ...); composite PKs render as (c1 = v1 AND c2 = v2) OR (...) ....

ursa.catalog.catalog._json_default(value: Any) Any[source]

Best-effort JSON encoder for values that json.dumps doesn’t natively handle. Used only on the __pydantic_extra__ column, where extras from forward-compat reads may carry oddball types.

Encoding conventions (decoders must mirror these):

  • datetime/date → ISO 8601 string.

  • timedelta → integer microseconds. Use timedelta(microseconds=n) to decode. Microseconds rather than total_seconds() so sub-second precision survives the JSON round-trip without floating-point churn.

  • Pydantic BaseModelmodel_dump(mode="json").