Source code for ursa.catalog.catalog

"""Lance-backed catalog of Ursa's nine tables.

The :class:`Catalog` exposes a generic core (:meth:`Catalog.add`,
:meth:`Catalog.upsert`, :meth:`Catalog.append_unchecked`,
:meth:`Catalog.get`, :meth:`Catalog.scan`, :meth:`Catalog.query`,
:meth:`Catalog.count`, :meth:`Catalog.delete`, :meth:`Catalog.tables`)
under thin per-table typed wrappers (``add_recording``,
``list_recordings``, etc.). The wrappers exist for ergonomics; the
generic surface is the path :mod:`ursa.query` (ENG-889) sits on.

Verb taxonomy
-------------

* :meth:`Catalog.scan` — raw :class:`pyarrow.Table`. Zero-copy escape
  hatch for callers that want to handle conversion themselves.
* :meth:`Catalog.query` — generic typed rows. Use when the table name
  is dynamic.
* ``list_<table>`` / ``get_<table>`` — sugar over ``query``/``get`` with
  ``row_cls`` baked in. **Default for new code.**
* ``add_<table>`` / ``add_<table>s`` — typed write entry. Raises
  :class:`CatalogPKConflict` on duplicate primary key.
* ``upsert_<table>`` — overwrite by primary key. The only path that
  mutates an existing row.
* :meth:`Catalog.append_unchecked` — explicit opt-in for ingestion
  fast-paths that have already deduped. Use only if you know what
  you're doing.
* :meth:`Catalog.delete` — raises ``NotImplementedError`` in M2;
  driven by ENG-1069 (lifecycle GC + ``ursa.query`` deletion semantics).

Single-writer
-------------

The catalog assumes one writer per URI. Concurrent first-time
:meth:`Catalog.local` / :meth:`Catalog.from_store` against an empty
catalog may race during table creation. M2 has one ingestion process so
this isn't a live bug; multi-writer support is a follow-up if it becomes
a need.

Serialization
-------------

Rows are dumped with ``model_dump(mode="python")`` end-to-end so
``datetime`` and ``timedelta`` stay typed (no ISO-string churn at the
Arrow boundary). Top-level Pydantic extras ride in a JSON
``__pydantic_extra__`` column. Nested submodels (:class:`TimeWindow`,
:class:`EmbeddingSource`) reject extras at write time even though
Pydantic allows them — Arrow struct columns are fixed-schema, so
unknown nested keys cannot be persisted.

Metadata pushdown
-----------------

``MetadataDict`` columns (``metadata``, ``device_info``, ``channel_spec``)
serialize to JSON strings in M2. Filters of the form
``{"metadata.<key>": ...}`` raise
:class:`MetadataPushdownNotImplemented`; tracked by ENG-1066 (Lance
``MapType`` + hot-key promotion).
"""

from __future__ import annotations

import json
from collections.abc import Iterable, Mapping
from pathlib import Path
from typing import TYPE_CHECKING, Any, TypeVar, cast

import pyarrow as pa
from pydantic import BaseModel

from ursa.catalog._arrow import (
    ARROW_SCHEMAS,
    EXTRAS_COLUMN,
    JSON_METADATA_COLUMNS,
    NULLABLE_JSON_METADATA_COLUMNS,
    PRIMARY_KEYS,
    ROW_CLASSES,
)
from ursa.catalog._filters import compile_where
from ursa.catalog.exceptions import (
    CatalogNotInitialized,
    CatalogPKConflict,
    CatalogSchemaError,
)
from ursa.catalog.schemas import (
    BenchmarkResultRow,
    BenchmarkSuiteRow,
    CatalogRow,
    CheckpointRow,
    EmbeddingRow,
    EmbeddingSource,
    EventRow,
    ModalityRow,
    ParticipantRow,
    RecordingRow,
    TimeWindow,
    VirgoAssetRow,
)
from ursa.layout import (
    ALL_CATALOG_TABLES,
    TABLE_BENCHMARK_RESULTS,
    TABLE_BENCHMARK_SUITES,
    TABLE_CHECKPOINTS,
    TABLE_EMBEDDINGS,
    TABLE_EVENTS,
    TABLE_MODALITIES,
    TABLE_PARTICIPANTS,
    TABLE_RECORDINGS,
    TABLE_VIRGO_ASSETS,
)

if TYPE_CHECKING:
    from ursa.store.base import ObjectStore


__all__ = ["Catalog"]


T = TypeVar("T", bound=CatalogRow)


# Submodels with fixed Arrow struct schemas. Extras on these are rejected
# at write time even when Pydantic allows them on the Python side.
_FIXED_STRUCT_SUBMODELS: tuple[type[BaseModel], ...] = (TimeWindow, EmbeddingSource)


[docs] class Catalog: """Lance-backed catalog. See module docstring for the verb taxonomy.""" # Public for tests / introspection. Mirrors ``ursa.layout.ALL_CATALOG_TABLES``. TABLES: tuple[str, ...] = ALL_CATALOG_TABLES def __init__(self, db: Any, *, uri: str) -> None: # Private. Use :meth:`Catalog.local` or :meth:`Catalog.from_store`. self._db = db self._uri = uri self._ensure_tables() # ------------------------------------------------------------------ # Construction # ------------------------------------------------------------------
[docs] @classmethod def local(cls, root: str | Path) -> Catalog: """Open (or create) a catalog under a local directory. ``root`` is the root of the catalog itself — tables land at ``<root>/<table_name>.lance``. The directory is created on first use; subsequent opens are idempotent. """ import lancedb # heavy import; lazy root_path = Path(root).expanduser().resolve() root_path.mkdir(parents=True, exist_ok=True) db = lancedb.connect(str(root_path)) return cls(db, uri=str(root_path))
[docs] @classmethod def from_store(cls, store: ObjectStore) -> Catalog: """Open a catalog backed by an :class:`ObjectStore`. Reads ``(uri, storage_options)`` from :meth:`ObjectStore.lance_connection` and connects lancedb at ``<uri>/catalog``. The catalog is the only consumer of this sub-namespace; other Ursa data (Zarr arrays, raw blobs) lives under sibling prefixes within the same store. """ import lancedb # heavy import; lazy base_uri, storage_options = store.lance_connection() catalog_uri = base_uri.rstrip("/") + "/catalog" db = lancedb.connect( catalog_uri, storage_options=storage_options or None, ) return cls(db, uri=catalog_uri)
# ------------------------------------------------------------------ # Schema management # ------------------------------------------------------------------
[docs] def _ensure_tables(self) -> None: """Idempotently create every table at its declared Arrow schema.""" if self._db is None: raise CatalogNotInitialized("Catalog has no underlying lancedb connection") # lancedb's list_tables() returns a paginated ListTablesResponse; # table_names() returns a flat list[str]. We use table_names() and # ignore the deprecation — list_tables() requires walking pages, # which is overkill for the catalog's nine-table fixed surface. import warnings with warnings.catch_warnings(): warnings.simplefilter("ignore", DeprecationWarning) existing = set(self._db.table_names()) for name in self.TABLES: if name in existing: continue empty = ARROW_SCHEMAS[name].empty_table() self._db.create_table(name, data=empty, schema=ARROW_SCHEMAS[name])
@property def uri(self) -> str: """The lancedb URI this catalog is rooted at.""" return self._uri
[docs] def tables(self) -> list[str]: """Return the canonical list of catalog table names.""" return list(self.TABLES)
# ------------------------------------------------------------------ # Generic core # ------------------------------------------------------------------
[docs] def add(self, table: str, rows: Iterable[CatalogRow]) -> int: """Append rows. Raises :class:`CatalogPKConflict` on duplicate PK.""" materialized = list(rows) if not materialized: return 0 self._reject_existing_pks(table, materialized) arrow = self._rows_to_arrow(table, materialized) self._db.open_table(table).add(arrow) return len(materialized)
[docs] def upsert(self, table: str, rows: Iterable[CatalogRow]) -> int: """Overwrite by primary key. The only path that mutates an existing row.""" materialized = list(rows) if not materialized: return 0 arrow = self._rows_to_arrow(table, materialized) tbl = self._db.open_table(table) ( tbl.merge_insert(PRIMARY_KEYS[table]) .when_matched_update_all() .when_not_matched_insert_all() .execute(arrow) ) return len(materialized)
[docs] def append_unchecked(self, table: str, rows: Iterable[CatalogRow]) -> int: """Skip the PK check and append. Use only if the caller has already deduped — duplicates corrupt the catalog's lineage assumptions. """ materialized = list(rows) if not materialized: return 0 arrow = self._rows_to_arrow(table, materialized) self._db.open_table(table).add(arrow) return len(materialized)
[docs] def get(self, table: str, **pk: Any) -> CatalogRow | None: """Fetch the row matching the given primary-key fields, or ``None``.""" self._validate_pk_kwargs(table, pk) arrow = self._scan_arrow(table, where=pk, limit=1) if arrow.num_rows == 0: return None row_cls = cast("type[CatalogRow]", ROW_CLASSES[table]) rows = self._arrow_to_rows(arrow, row_cls) return rows[0] if rows else None
[docs] def scan( self, table: str, *, where: str | Mapping[str, Any] | None = None, columns: list[str] | None = None, limit: int | None = None, ) -> pa.Table: """Return a raw :class:`pyarrow.Table`. Zero-copy escape hatch.""" return self._scan_arrow(table, where=where, columns=columns, limit=limit)
[docs] def query( self, table: str, *, row_cls: type[T], where: str | Mapping[str, Any] | None = None, limit: int | None = None, ) -> list[T]: """Return validated rows. Use when the table name is dynamic.""" arrow = self._scan_arrow(table, where=where, limit=limit) return cast("list[T]", self._arrow_to_rows(arrow, row_cls))
[docs] def count( self, table: str, *, where: str | Mapping[str, Any] | None = None, ) -> int: """Count rows, optionally filtered.""" tbl = self._db.open_table(table) compiled = compile_where(where) if compiled is None: return int(tbl.count_rows()) return int(tbl.count_rows(filter=compiled))
[docs] def delete(self, table: str, **pk: Any) -> None: raise NotImplementedError( "Catalog.delete() is driven by ENG-1069 (lifecycle GC + ENG-889 query semantics)" )
# ------------------------------------------------------------------ # Internals — Arrow ↔ Pydantic # ------------------------------------------------------------------
[docs] def _scan_arrow( self, table: str, *, where: str | Mapping[str, Any] | None = None, columns: list[str] | None = None, limit: int | None = None, ) -> pa.Table: if table not in ROW_CLASSES: raise KeyError(f"unknown catalog table {table!r}") tbl = self._db.open_table(table) compiled = compile_where(where) builder = tbl.search() if compiled is not None: builder = builder.where(compiled) if columns: builder = builder.select(columns) if limit is not None: builder = builder.limit(limit) return builder.to_arrow()
[docs] def _rows_to_arrow(self, table: str, rows: list[CatalogRow]) -> pa.Table: if table not in ARROW_SCHEMAS: raise KeyError(f"unknown catalog table {table!r}") schema = ARROW_SCHEMAS[table] row_cls = ROW_CLASSES[table] if any(not isinstance(r, row_cls) for r in rows): wrong = next(r for r in rows if not isinstance(r, row_cls)) raise CatalogSchemaError( f"table {table!r} expects {row_cls.__name__}, got {type(wrong).__name__}" ) json_cols = JSON_METADATA_COLUMNS.get(table, frozenset()) nullable_json_cols = NULLABLE_JSON_METADATA_COLUMNS.get(table, frozenset()) known_fields = set(row_cls.model_fields) # type: ignore[attr-defined] records: list[dict[str, Any]] = [] for row in rows: dump = row.model_dump(mode="python") self._reject_nested_extras(row) extras = {k: v for k, v in dump.items() if k not in known_fields} record: dict[str, Any] = {k: dump[k] for k in known_fields} for col in json_cols: value = record.get(col) if value is None and col in nullable_json_cols: record[col] = None else: record[col] = json.dumps(value or {}, sort_keys=True) # Architecture v0.4: ModalityRow.domain_intervals materializes as # list<struct<start, end>>. Pydantic dumps tuples as plain lists # (``[[s, e], ...]``); reshape into struct dicts before Arrow # construction. ``None`` stays ``None`` (column is nullable). if "domain_intervals" in record and record["domain_intervals"] is not None: record["domain_intervals"] = [ {"start": s, "end": e} for s, e in record["domain_intervals"] ] record[EXTRAS_COLUMN] = json.dumps(extras, sort_keys=True, default=_json_default) records.append(record) return pa.Table.from_pylist(records, schema=schema)
[docs] def _arrow_to_rows(self, arrow: pa.Table, row_cls: type[T]) -> list[T]: if arrow.num_rows == 0: return [] json_cols = _json_cols_for_class(row_cls) rows: list[T] = [] for record in arrow.to_pylist(): extras_blob = record.pop(EXTRAS_COLUMN, None) for col in json_cols: if col in record and isinstance(record[col], str): record[col] = json.loads(record[col]) # Architecture v0.4: ModalityRow.domain_intervals comes back as a # list of struct dicts (``[{"start": s, "end": e}, ...]``); # collapse to ``list[tuple[float, float]]`` to match Pydantic. if "domain_intervals" in record and record["domain_intervals"] is not None: record["domain_intervals"] = [ (d["start"], d["end"]) for d in record["domain_intervals"] ] if extras_blob: extras = json.loads(extras_blob) if extras: record.update(extras) rows.append(row_cls.model_validate(record)) return rows
[docs] def _reject_existing_pks(self, table: str, rows: list[CatalogRow]) -> None: pk_cols = PRIMARY_KEYS[table] seen: set[tuple[Any, ...]] = set() for row in rows: key = tuple(getattr(row, c) for c in pk_cols) if key in seen: raise CatalogPKConflict( f"duplicate PK {dict(zip(pk_cols, key, strict=True))!r} within batch for {table!r}" ) seen.add(key) if not seen: return clause = _pk_in_clause(pk_cols, list(seen)) existing = self._scan_arrow(table, where=clause, columns=list(pk_cols), limit=len(seen)) if existing.num_rows == 0: return existing_keys = {tuple(row[c] for c in pk_cols) for row in existing.to_pylist()} collisions = seen & existing_keys if collisions: sample = next(iter(collisions)) # Structured form so callers (notably ``ursa.register``) can read # ``exc.table`` / ``exc.primary_key`` without parsing the message. raise CatalogPKConflict( table, tuple(zip(pk_cols, sample, strict=True)), )
[docs] @staticmethod def _reject_nested_extras(row: CatalogRow) -> None: """Walk a row's declared submodel fields and reject Pydantic extras. Arrow struct columns are fixed-schema; submodels with unknown keys can't be persisted. Raised as :class:`CatalogSchemaError` rather than letting PyArrow surface a less specific ``ArrowInvalid``. Iterates ``model_fields`` rather than ``vars(row)`` so we don't accidentally walk Pydantic internals (``__pydantic_extra__``, ``__pydantic_fields_set__``) or any non-model attribute attached to the instance — only fields declared on the row class. """ for field_name in row.__class__.model_fields: value = getattr(row, field_name, None) if not isinstance(value, _FIXED_STRUCT_SUBMODELS): continue extras = getattr(value, "__pydantic_extra__", None) if extras: raise CatalogSchemaError( f"row {row.__class__.__name__}.{field_name}: " f"{type(value).__name__} carries extra keys {sorted(extras)!r}; " "nested struct columns are fixed-schema and cannot persist extras" )
[docs] @staticmethod def _validate_pk_kwargs(table: str, pk: Mapping[str, Any]) -> None: expected = set(PRIMARY_KEYS[table]) actual = set(pk) if actual != expected: raise ValueError( f"get({table!r}) requires PK kwargs {sorted(expected)!r}; got {sorted(actual)!r}" )
# ------------------------------------------------------------------ # Typed wrappers — hand-written, four (or more) per table. # ------------------------------------------------------------------ # ---- participants ------------------------------------------------
[docs] def add_participant(self, row: ParticipantRow) -> None: self.add(TABLE_PARTICIPANTS, [row])
[docs] def add_participants(self, rows: Iterable[ParticipantRow]) -> None: self.add(TABLE_PARTICIPANTS, rows)
[docs] def upsert_participant(self, row: ParticipantRow) -> None: self.upsert(TABLE_PARTICIPANTS, [row])
[docs] def upsert_participants(self, rows: Iterable[ParticipantRow]) -> None: self.upsert(TABLE_PARTICIPANTS, rows)
[docs] def get_participant(self, participant_id: str) -> ParticipantRow | None: return cast( "ParticipantRow | None", self.get(TABLE_PARTICIPANTS, participant_id=participant_id), )
[docs] def list_participants( self, *, where: Mapping[str, Any] | None = None, limit: int | None = None, ) -> list[ParticipantRow]: return self.query(TABLE_PARTICIPANTS, row_cls=ParticipantRow, where=where, limit=limit)
# ---- recordings --------------------------------------------------
[docs] def add_recording(self, row: RecordingRow) -> None: self.add(TABLE_RECORDINGS, [row])
[docs] def add_recordings(self, rows: Iterable[RecordingRow]) -> None: self.add(TABLE_RECORDINGS, rows)
[docs] def upsert_recording(self, row: RecordingRow) -> None: self.upsert(TABLE_RECORDINGS, [row])
[docs] def upsert_recordings(self, rows: Iterable[RecordingRow]) -> None: self.upsert(TABLE_RECORDINGS, rows)
[docs] def get_recording(self, recording_hash: str) -> RecordingRow | None: return cast( "RecordingRow | None", self.get(TABLE_RECORDINGS, recording_hash=recording_hash), )
[docs] def list_recordings( self, *, where: Mapping[str, Any] | None = None, limit: int | None = None, ) -> list[RecordingRow]: return self.query(TABLE_RECORDINGS, row_cls=RecordingRow, where=where, limit=limit)
# ---- modalities --------------------------------------------------
[docs] def add_modality(self, row: ModalityRow) -> None: self.add(TABLE_MODALITIES, [row])
[docs] def add_modalities(self, rows: Iterable[ModalityRow]) -> None: self.add(TABLE_MODALITIES, rows)
[docs] def upsert_modality(self, row: ModalityRow) -> None: self.upsert(TABLE_MODALITIES, [row])
[docs] def upsert_modalities(self, rows: Iterable[ModalityRow]) -> None: self.upsert(TABLE_MODALITIES, rows)
[docs] def get_modality(self, recording_hash: str, modality: str) -> ModalityRow | None: return cast( "ModalityRow | None", self.get(TABLE_MODALITIES, recording_hash=recording_hash, modality=modality), )
[docs] def list_modalities( self, *, where: Mapping[str, Any] | None = None, limit: int | None = None, ) -> list[ModalityRow]: return self.query(TABLE_MODALITIES, row_cls=ModalityRow, where=where, limit=limit)
# ---- events ------------------------------------------------------
[docs] def add_event(self, row: EventRow) -> None: self.add(TABLE_EVENTS, [row])
[docs] def add_events(self, rows: Iterable[EventRow]) -> None: self.add(TABLE_EVENTS, rows)
[docs] def upsert_event(self, row: EventRow) -> None: self.upsert(TABLE_EVENTS, [row])
[docs] def upsert_events(self, rows: Iterable[EventRow]) -> None: self.upsert(TABLE_EVENTS, rows)
[docs] def get_event(self, event_id: str) -> EventRow | None: return cast("EventRow | None", self.get(TABLE_EVENTS, event_id=event_id))
[docs] def list_events( self, *, where: Mapping[str, Any] | None = None, limit: int | None = None, ) -> list[EventRow]: return self.query(TABLE_EVENTS, row_cls=EventRow, where=where, limit=limit)
# ---- embeddings --------------------------------------------------
[docs] def add_embedding(self, row: EmbeddingRow) -> None: self.add(TABLE_EMBEDDINGS, [row])
[docs] def add_embeddings(self, rows: Iterable[EmbeddingRow]) -> None: self.add(TABLE_EMBEDDINGS, rows)
[docs] def upsert_embedding(self, row: EmbeddingRow) -> None: self.upsert(TABLE_EMBEDDINGS, [row])
[docs] def upsert_embeddings(self, rows: Iterable[EmbeddingRow]) -> None: self.upsert(TABLE_EMBEDDINGS, rows)
[docs] def get_embedding(self, embedding_id: str) -> EmbeddingRow | None: return cast( "EmbeddingRow | None", self.get(TABLE_EMBEDDINGS, embedding_id=embedding_id), )
[docs] def list_embeddings( self, *, where: Mapping[str, Any] | None = None, limit: int | None = None, ) -> list[EmbeddingRow]: return self.query(TABLE_EMBEDDINGS, row_cls=EmbeddingRow, where=where, limit=limit)
# ---- virgo_assets ------------------------------------------------
[docs] def add_virgo_asset(self, row: VirgoAssetRow) -> None: self.add(TABLE_VIRGO_ASSETS, [row])
[docs] def add_virgo_assets(self, rows: Iterable[VirgoAssetRow]) -> None: self.add(TABLE_VIRGO_ASSETS, rows)
[docs] def upsert_virgo_asset(self, row: VirgoAssetRow) -> None: self.upsert(TABLE_VIRGO_ASSETS, [row])
[docs] def upsert_virgo_assets(self, rows: Iterable[VirgoAssetRow]) -> None: self.upsert(TABLE_VIRGO_ASSETS, rows)
[docs] def get_virgo_asset(self, asset_id: str) -> VirgoAssetRow | None: return cast( "VirgoAssetRow | None", self.get(TABLE_VIRGO_ASSETS, asset_id=asset_id), )
[docs] def list_virgo_assets( self, *, where: Mapping[str, Any] | None = None, limit: int | None = None, ) -> list[VirgoAssetRow]: return self.query(TABLE_VIRGO_ASSETS, row_cls=VirgoAssetRow, where=where, limit=limit)
# ---- checkpoints -------------------------------------------------
[docs] def add_checkpoint(self, row: CheckpointRow) -> None: self.add(TABLE_CHECKPOINTS, [row])
[docs] def add_checkpoints(self, rows: Iterable[CheckpointRow]) -> None: self.add(TABLE_CHECKPOINTS, rows)
[docs] def upsert_checkpoint(self, row: CheckpointRow) -> None: self.upsert(TABLE_CHECKPOINTS, [row])
[docs] def upsert_checkpoints(self, rows: Iterable[CheckpointRow]) -> None: self.upsert(TABLE_CHECKPOINTS, rows)
[docs] def get_checkpoint(self, checkpoint_id: str) -> CheckpointRow | None: return cast( "CheckpointRow | None", self.get(TABLE_CHECKPOINTS, checkpoint_id=checkpoint_id), )
[docs] def list_checkpoints( self, *, where: Mapping[str, Any] | None = None, limit: int | None = None, ) -> list[CheckpointRow]: return self.query(TABLE_CHECKPOINTS, row_cls=CheckpointRow, where=where, limit=limit)
# ---- benchmark_suites --------------------------------------------
[docs] def add_benchmark_suite(self, row: BenchmarkSuiteRow) -> None: self.add(TABLE_BENCHMARK_SUITES, [row])
[docs] def add_benchmark_suites(self, rows: Iterable[BenchmarkSuiteRow]) -> None: self.add(TABLE_BENCHMARK_SUITES, rows)
[docs] def upsert_benchmark_suite(self, row: BenchmarkSuiteRow) -> None: self.upsert(TABLE_BENCHMARK_SUITES, [row])
[docs] def upsert_benchmark_suites(self, rows: Iterable[BenchmarkSuiteRow]) -> None: self.upsert(TABLE_BENCHMARK_SUITES, rows)
[docs] def get_benchmark_suite(self, suite_name: str, suite_version: int) -> BenchmarkSuiteRow | None: return cast( "BenchmarkSuiteRow | None", self.get( TABLE_BENCHMARK_SUITES, suite_name=suite_name, suite_version=suite_version, ), )
[docs] def list_benchmark_suites( self, *, where: Mapping[str, Any] | None = None, limit: int | None = None, ) -> list[BenchmarkSuiteRow]: return self.query( TABLE_BENCHMARK_SUITES, row_cls=BenchmarkSuiteRow, where=where, limit=limit, )
# ---- benchmark_results -------------------------------------------
[docs] def add_benchmark_result(self, row: BenchmarkResultRow) -> None: self.add(TABLE_BENCHMARK_RESULTS, [row])
[docs] def add_benchmark_results(self, rows: Iterable[BenchmarkResultRow]) -> None: self.add(TABLE_BENCHMARK_RESULTS, rows)
[docs] def upsert_benchmark_result(self, row: BenchmarkResultRow) -> None: self.upsert(TABLE_BENCHMARK_RESULTS, [row])
[docs] def upsert_benchmark_results(self, rows: Iterable[BenchmarkResultRow]) -> None: self.upsert(TABLE_BENCHMARK_RESULTS, rows)
[docs] def get_benchmark_result(self, result_id: str) -> BenchmarkResultRow | None: return cast( "BenchmarkResultRow | None", self.get(TABLE_BENCHMARK_RESULTS, result_id=result_id), )
[docs] def list_benchmark_results( self, *, where: Mapping[str, Any] | None = None, limit: int | None = None, ) -> list[BenchmarkResultRow]: return self.query( TABLE_BENCHMARK_RESULTS, row_cls=BenchmarkResultRow, where=where, limit=limit, )
# --------------------------------------------------------------------------- # Module-level helpers # --------------------------------------------------------------------------- # Module-scope cache of JSON-encoded MetadataDict columns by row class. # Built once at import time so ``_arrow_to_rows`` doesn't pay the cost # of resolving the row class back to its table on every read. _JSON_COLS_BY_CLASS: dict[type[CatalogRow], frozenset[str]] = { cls: JSON_METADATA_COLUMNS.get(table, frozenset()) # type: ignore[misc] for table, cls in ROW_CLASSES.items() }
[docs] def _json_cols_for_class(row_cls: type[CatalogRow]) -> frozenset[str]: """Return the JSON-encoded MetadataDict columns for a row class.""" return _JSON_COLS_BY_CLASS.get(row_cls, frozenset())
[docs] def _pk_in_clause(pk_cols: list[str], keys: list[tuple[Any, ...]]) -> str: """Render a WHERE clause matching any PK tuple in ``keys``. Single-column PKs render as ``col IN (lit, lit, ...)``; composite PKs render as ``(c1 = v1 AND c2 = v2) OR (...) ...``. """ from ursa.catalog._filters import _lit # internal — keeps escape rules in one place if len(pk_cols) == 1: col = pk_cols[0] rendered = ", ".join(_lit(k[0]) for k in keys) return f"{col} IN ({rendered})" parts: list[str] = [] for tup in keys: conjuncts = [f"{c} = {_lit(v)}" for c, v in zip(pk_cols, tup, strict=True)] parts.append("(" + " AND ".join(conjuncts) + ")") return " OR ".join(parts)
[docs] def _json_default(value: Any) -> Any: """Best-effort JSON encoder for values that ``json.dumps`` doesn't natively handle. Used only on the ``__pydantic_extra__`` column, where extras from forward-compat reads may carry oddball types. Encoding conventions (decoders must mirror these): * ``datetime``/``date`` → ISO 8601 string. * ``timedelta`` → integer microseconds. Use ``timedelta(microseconds=n)`` to decode. Microseconds rather than ``total_seconds()`` so sub-second precision survives the JSON round-trip without floating-point churn. * Pydantic ``BaseModel`` → ``model_dump(mode="json")``. """ from datetime import date, datetime, timedelta if isinstance(value, (datetime, date)): return value.isoformat() if isinstance(value, timedelta): return value // timedelta(microseconds=1) if isinstance(value, BaseModel): return value.model_dump(mode="json") raise TypeError(f"cannot JSON-encode {type(value).__name__}: {value!r}")