"""Lance-backed catalog of Ursa's nine tables.
The :class:`Catalog` exposes a generic core (:meth:`Catalog.add`,
:meth:`Catalog.upsert`, :meth:`Catalog.append_unchecked`,
:meth:`Catalog.get`, :meth:`Catalog.scan`, :meth:`Catalog.query`,
:meth:`Catalog.count`, :meth:`Catalog.delete`, :meth:`Catalog.tables`)
under thin per-table typed wrappers (``add_recording``,
``list_recordings``, etc.). The wrappers exist for ergonomics; the
generic surface is the path :mod:`ursa.query` (ENG-889) sits on.
Verb taxonomy
-------------
* :meth:`Catalog.scan` — raw :class:`pyarrow.Table`. Zero-copy escape
hatch for callers that want to handle conversion themselves.
* :meth:`Catalog.query` — generic typed rows. Use when the table name
is dynamic.
* ``list_<table>`` / ``get_<table>`` — sugar over ``query``/``get`` with
``row_cls`` baked in. **Default for new code.**
* ``add_<table>`` / ``add_<table>s`` — typed write entry. Raises
:class:`CatalogPKConflict` on duplicate primary key.
* ``upsert_<table>`` — overwrite by primary key. The only path that
mutates an existing row.
* :meth:`Catalog.append_unchecked` — explicit opt-in for ingestion
fast-paths that have already deduped. Use only if you know what
you're doing.
* :meth:`Catalog.delete` — raises ``NotImplementedError`` in M2;
driven by ENG-1069 (lifecycle GC + ``ursa.query`` deletion semantics).
Single-writer
-------------
The catalog assumes one writer per URI. Concurrent first-time
:meth:`Catalog.local` / :meth:`Catalog.from_store` against an empty
catalog may race during table creation. M2 has one ingestion process so
this isn't a live bug; multi-writer support is a follow-up if it becomes
a need.
Serialization
-------------
Rows are dumped with ``model_dump(mode="python")`` end-to-end so
``datetime`` and ``timedelta`` stay typed (no ISO-string churn at the
Arrow boundary). Top-level Pydantic extras ride in a JSON
``__pydantic_extra__`` column. Nested submodels (:class:`TimeWindow`,
:class:`EmbeddingSource`) reject extras at write time even though
Pydantic allows them — Arrow struct columns are fixed-schema, so
unknown nested keys cannot be persisted.
Metadata pushdown
-----------------
``MetadataDict`` columns (``metadata``, ``device_info``, ``channel_spec``)
serialize to JSON strings in M2. Filters of the form
``{"metadata.<key>": ...}`` raise
:class:`MetadataPushdownNotImplemented`; tracked by ENG-1066 (Lance
``MapType`` + hot-key promotion).
"""
from __future__ import annotations
import json
from collections.abc import Iterable, Mapping
from pathlib import Path
from typing import TYPE_CHECKING, Any, TypeVar, cast
import pyarrow as pa
from pydantic import BaseModel
from ursa.catalog._arrow import (
ARROW_SCHEMAS,
EXTRAS_COLUMN,
JSON_METADATA_COLUMNS,
NULLABLE_JSON_METADATA_COLUMNS,
PRIMARY_KEYS,
ROW_CLASSES,
)
from ursa.catalog._filters import compile_where
from ursa.catalog.exceptions import (
CatalogNotInitialized,
CatalogPKConflict,
CatalogSchemaError,
)
from ursa.catalog.schemas import (
BenchmarkResultRow,
BenchmarkSuiteRow,
CatalogRow,
CheckpointRow,
EmbeddingRow,
EmbeddingSource,
EventRow,
ModalityRow,
ParticipantRow,
RecordingRow,
TimeWindow,
VirgoAssetRow,
)
from ursa.layout import (
ALL_CATALOG_TABLES,
TABLE_BENCHMARK_RESULTS,
TABLE_BENCHMARK_SUITES,
TABLE_CHECKPOINTS,
TABLE_EMBEDDINGS,
TABLE_EVENTS,
TABLE_MODALITIES,
TABLE_PARTICIPANTS,
TABLE_RECORDINGS,
TABLE_VIRGO_ASSETS,
)
if TYPE_CHECKING:
from ursa.store.base import ObjectStore
__all__ = ["Catalog"]
T = TypeVar("T", bound=CatalogRow)
# Submodels with fixed Arrow struct schemas. Extras on these are rejected
# at write time even when Pydantic allows them on the Python side.
_FIXED_STRUCT_SUBMODELS: tuple[type[BaseModel], ...] = (TimeWindow, EmbeddingSource)
[docs]
class Catalog:
"""Lance-backed catalog. See module docstring for the verb taxonomy."""
# Public for tests / introspection. Mirrors ``ursa.layout.ALL_CATALOG_TABLES``.
TABLES: tuple[str, ...] = ALL_CATALOG_TABLES
def __init__(self, db: Any, *, uri: str) -> None:
# Private. Use :meth:`Catalog.local` or :meth:`Catalog.from_store`.
self._db = db
self._uri = uri
self._ensure_tables()
# ------------------------------------------------------------------
# Construction
# ------------------------------------------------------------------
[docs]
@classmethod
def local(cls, root: str | Path) -> Catalog:
"""Open (or create) a catalog under a local directory.
``root`` is the root of the catalog itself — tables land at
``<root>/<table_name>.lance``. The directory is created on first
use; subsequent opens are idempotent.
"""
import lancedb # heavy import; lazy
root_path = Path(root).expanduser().resolve()
root_path.mkdir(parents=True, exist_ok=True)
db = lancedb.connect(str(root_path))
return cls(db, uri=str(root_path))
[docs]
@classmethod
def from_store(cls, store: ObjectStore) -> Catalog:
"""Open a catalog backed by an :class:`ObjectStore`.
Reads ``(uri, storage_options)`` from
:meth:`ObjectStore.lance_connection` and connects lancedb at
``<uri>/catalog``. The catalog is the only consumer of this
sub-namespace; other Ursa data (Zarr arrays, raw blobs) lives
under sibling prefixes within the same store.
"""
import lancedb # heavy import; lazy
base_uri, storage_options = store.lance_connection()
catalog_uri = base_uri.rstrip("/") + "/catalog"
db = lancedb.connect(
catalog_uri,
storage_options=storage_options or None,
)
return cls(db, uri=catalog_uri)
# ------------------------------------------------------------------
# Schema management
# ------------------------------------------------------------------
[docs]
def _ensure_tables(self) -> None:
"""Idempotently create every table at its declared Arrow schema."""
if self._db is None:
raise CatalogNotInitialized("Catalog has no underlying lancedb connection")
# lancedb's list_tables() returns a paginated ListTablesResponse;
# table_names() returns a flat list[str]. We use table_names() and
# ignore the deprecation — list_tables() requires walking pages,
# which is overkill for the catalog's nine-table fixed surface.
import warnings
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
existing = set(self._db.table_names())
for name in self.TABLES:
if name in existing:
continue
empty = ARROW_SCHEMAS[name].empty_table()
self._db.create_table(name, data=empty, schema=ARROW_SCHEMAS[name])
@property
def uri(self) -> str:
"""The lancedb URI this catalog is rooted at."""
return self._uri
[docs]
def tables(self) -> list[str]:
"""Return the canonical list of catalog table names."""
return list(self.TABLES)
# ------------------------------------------------------------------
# Generic core
# ------------------------------------------------------------------
[docs]
def add(self, table: str, rows: Iterable[CatalogRow]) -> int:
"""Append rows. Raises :class:`CatalogPKConflict` on duplicate PK."""
materialized = list(rows)
if not materialized:
return 0
self._reject_existing_pks(table, materialized)
arrow = self._rows_to_arrow(table, materialized)
self._db.open_table(table).add(arrow)
return len(materialized)
[docs]
def upsert(self, table: str, rows: Iterable[CatalogRow]) -> int:
"""Overwrite by primary key. The only path that mutates an existing row."""
materialized = list(rows)
if not materialized:
return 0
arrow = self._rows_to_arrow(table, materialized)
tbl = self._db.open_table(table)
(
tbl.merge_insert(PRIMARY_KEYS[table])
.when_matched_update_all()
.when_not_matched_insert_all()
.execute(arrow)
)
return len(materialized)
[docs]
def append_unchecked(self, table: str, rows: Iterable[CatalogRow]) -> int:
"""Skip the PK check and append. Use only if the caller has already
deduped — duplicates corrupt the catalog's lineage assumptions.
"""
materialized = list(rows)
if not materialized:
return 0
arrow = self._rows_to_arrow(table, materialized)
self._db.open_table(table).add(arrow)
return len(materialized)
[docs]
def get(self, table: str, **pk: Any) -> CatalogRow | None:
"""Fetch the row matching the given primary-key fields, or ``None``."""
self._validate_pk_kwargs(table, pk)
arrow = self._scan_arrow(table, where=pk, limit=1)
if arrow.num_rows == 0:
return None
row_cls = cast("type[CatalogRow]", ROW_CLASSES[table])
rows = self._arrow_to_rows(arrow, row_cls)
return rows[0] if rows else None
[docs]
def scan(
self,
table: str,
*,
where: str | Mapping[str, Any] | None = None,
columns: list[str] | None = None,
limit: int | None = None,
) -> pa.Table:
"""Return a raw :class:`pyarrow.Table`. Zero-copy escape hatch."""
return self._scan_arrow(table, where=where, columns=columns, limit=limit)
[docs]
def query(
self,
table: str,
*,
row_cls: type[T],
where: str | Mapping[str, Any] | None = None,
limit: int | None = None,
) -> list[T]:
"""Return validated rows. Use when the table name is dynamic."""
arrow = self._scan_arrow(table, where=where, limit=limit)
return cast("list[T]", self._arrow_to_rows(arrow, row_cls))
[docs]
def count(
self,
table: str,
*,
where: str | Mapping[str, Any] | None = None,
) -> int:
"""Count rows, optionally filtered."""
tbl = self._db.open_table(table)
compiled = compile_where(where)
if compiled is None:
return int(tbl.count_rows())
return int(tbl.count_rows(filter=compiled))
[docs]
def delete(self, table: str, **pk: Any) -> None:
raise NotImplementedError(
"Catalog.delete() is driven by ENG-1069 (lifecycle GC + ENG-889 query semantics)"
)
# ------------------------------------------------------------------
# Internals — Arrow ↔ Pydantic
# ------------------------------------------------------------------
[docs]
def _scan_arrow(
self,
table: str,
*,
where: str | Mapping[str, Any] | None = None,
columns: list[str] | None = None,
limit: int | None = None,
) -> pa.Table:
if table not in ROW_CLASSES:
raise KeyError(f"unknown catalog table {table!r}")
tbl = self._db.open_table(table)
compiled = compile_where(where)
builder = tbl.search()
if compiled is not None:
builder = builder.where(compiled)
if columns:
builder = builder.select(columns)
if limit is not None:
builder = builder.limit(limit)
return builder.to_arrow()
[docs]
def _rows_to_arrow(self, table: str, rows: list[CatalogRow]) -> pa.Table:
if table not in ARROW_SCHEMAS:
raise KeyError(f"unknown catalog table {table!r}")
schema = ARROW_SCHEMAS[table]
row_cls = ROW_CLASSES[table]
if any(not isinstance(r, row_cls) for r in rows):
wrong = next(r for r in rows if not isinstance(r, row_cls))
raise CatalogSchemaError(
f"table {table!r} expects {row_cls.__name__}, got {type(wrong).__name__}"
)
json_cols = JSON_METADATA_COLUMNS.get(table, frozenset())
nullable_json_cols = NULLABLE_JSON_METADATA_COLUMNS.get(table, frozenset())
known_fields = set(row_cls.model_fields) # type: ignore[attr-defined]
records: list[dict[str, Any]] = []
for row in rows:
dump = row.model_dump(mode="python")
self._reject_nested_extras(row)
extras = {k: v for k, v in dump.items() if k not in known_fields}
record: dict[str, Any] = {k: dump[k] for k in known_fields}
for col in json_cols:
value = record.get(col)
if value is None and col in nullable_json_cols:
record[col] = None
else:
record[col] = json.dumps(value or {}, sort_keys=True)
# Architecture v0.4: ModalityRow.domain_intervals materializes as
# list<struct<start, end>>. Pydantic dumps tuples as plain lists
# (``[[s, e], ...]``); reshape into struct dicts before Arrow
# construction. ``None`` stays ``None`` (column is nullable).
if "domain_intervals" in record and record["domain_intervals"] is not None:
record["domain_intervals"] = [
{"start": s, "end": e} for s, e in record["domain_intervals"]
]
record[EXTRAS_COLUMN] = json.dumps(extras, sort_keys=True, default=_json_default)
records.append(record)
return pa.Table.from_pylist(records, schema=schema)
[docs]
def _arrow_to_rows(self, arrow: pa.Table, row_cls: type[T]) -> list[T]:
if arrow.num_rows == 0:
return []
json_cols = _json_cols_for_class(row_cls)
rows: list[T] = []
for record in arrow.to_pylist():
extras_blob = record.pop(EXTRAS_COLUMN, None)
for col in json_cols:
if col in record and isinstance(record[col], str):
record[col] = json.loads(record[col])
# Architecture v0.4: ModalityRow.domain_intervals comes back as a
# list of struct dicts (``[{"start": s, "end": e}, ...]``);
# collapse to ``list[tuple[float, float]]`` to match Pydantic.
if "domain_intervals" in record and record["domain_intervals"] is not None:
record["domain_intervals"] = [
(d["start"], d["end"]) for d in record["domain_intervals"]
]
if extras_blob:
extras = json.loads(extras_blob)
if extras:
record.update(extras)
rows.append(row_cls.model_validate(record))
return rows
[docs]
def _reject_existing_pks(self, table: str, rows: list[CatalogRow]) -> None:
pk_cols = PRIMARY_KEYS[table]
seen: set[tuple[Any, ...]] = set()
for row in rows:
key = tuple(getattr(row, c) for c in pk_cols)
if key in seen:
raise CatalogPKConflict(
f"duplicate PK {dict(zip(pk_cols, key, strict=True))!r} within batch for {table!r}"
)
seen.add(key)
if not seen:
return
clause = _pk_in_clause(pk_cols, list(seen))
existing = self._scan_arrow(table, where=clause, columns=list(pk_cols), limit=len(seen))
if existing.num_rows == 0:
return
existing_keys = {tuple(row[c] for c in pk_cols) for row in existing.to_pylist()}
collisions = seen & existing_keys
if collisions:
sample = next(iter(collisions))
# Structured form so callers (notably ``ursa.register``) can read
# ``exc.table`` / ``exc.primary_key`` without parsing the message.
raise CatalogPKConflict(
table,
tuple(zip(pk_cols, sample, strict=True)),
)
[docs]
@staticmethod
def _validate_pk_kwargs(table: str, pk: Mapping[str, Any]) -> None:
expected = set(PRIMARY_KEYS[table])
actual = set(pk)
if actual != expected:
raise ValueError(
f"get({table!r}) requires PK kwargs {sorted(expected)!r}; got {sorted(actual)!r}"
)
# ------------------------------------------------------------------
# Typed wrappers — hand-written, four (or more) per table.
# ------------------------------------------------------------------
# ---- participants ------------------------------------------------
[docs]
def add_participant(self, row: ParticipantRow) -> None:
self.add(TABLE_PARTICIPANTS, [row])
[docs]
def add_participants(self, rows: Iterable[ParticipantRow]) -> None:
self.add(TABLE_PARTICIPANTS, rows)
[docs]
def upsert_participant(self, row: ParticipantRow) -> None:
self.upsert(TABLE_PARTICIPANTS, [row])
[docs]
def upsert_participants(self, rows: Iterable[ParticipantRow]) -> None:
self.upsert(TABLE_PARTICIPANTS, rows)
[docs]
def get_participant(self, participant_id: str) -> ParticipantRow | None:
return cast(
"ParticipantRow | None",
self.get(TABLE_PARTICIPANTS, participant_id=participant_id),
)
[docs]
def list_participants(
self,
*,
where: Mapping[str, Any] | None = None,
limit: int | None = None,
) -> list[ParticipantRow]:
return self.query(TABLE_PARTICIPANTS, row_cls=ParticipantRow, where=where, limit=limit)
# ---- recordings --------------------------------------------------
[docs]
def add_recording(self, row: RecordingRow) -> None:
self.add(TABLE_RECORDINGS, [row])
[docs]
def add_recordings(self, rows: Iterable[RecordingRow]) -> None:
self.add(TABLE_RECORDINGS, rows)
[docs]
def upsert_recording(self, row: RecordingRow) -> None:
self.upsert(TABLE_RECORDINGS, [row])
[docs]
def upsert_recordings(self, rows: Iterable[RecordingRow]) -> None:
self.upsert(TABLE_RECORDINGS, rows)
[docs]
def get_recording(self, recording_hash: str) -> RecordingRow | None:
return cast(
"RecordingRow | None",
self.get(TABLE_RECORDINGS, recording_hash=recording_hash),
)
[docs]
def list_recordings(
self,
*,
where: Mapping[str, Any] | None = None,
limit: int | None = None,
) -> list[RecordingRow]:
return self.query(TABLE_RECORDINGS, row_cls=RecordingRow, where=where, limit=limit)
# ---- modalities --------------------------------------------------
[docs]
def add_modality(self, row: ModalityRow) -> None:
self.add(TABLE_MODALITIES, [row])
[docs]
def add_modalities(self, rows: Iterable[ModalityRow]) -> None:
self.add(TABLE_MODALITIES, rows)
[docs]
def upsert_modality(self, row: ModalityRow) -> None:
self.upsert(TABLE_MODALITIES, [row])
[docs]
def upsert_modalities(self, rows: Iterable[ModalityRow]) -> None:
self.upsert(TABLE_MODALITIES, rows)
[docs]
def get_modality(self, recording_hash: str, modality: str) -> ModalityRow | None:
return cast(
"ModalityRow | None",
self.get(TABLE_MODALITIES, recording_hash=recording_hash, modality=modality),
)
[docs]
def list_modalities(
self,
*,
where: Mapping[str, Any] | None = None,
limit: int | None = None,
) -> list[ModalityRow]:
return self.query(TABLE_MODALITIES, row_cls=ModalityRow, where=where, limit=limit)
# ---- events ------------------------------------------------------
[docs]
def add_event(self, row: EventRow) -> None:
self.add(TABLE_EVENTS, [row])
[docs]
def add_events(self, rows: Iterable[EventRow]) -> None:
self.add(TABLE_EVENTS, rows)
[docs]
def upsert_event(self, row: EventRow) -> None:
self.upsert(TABLE_EVENTS, [row])
[docs]
def upsert_events(self, rows: Iterable[EventRow]) -> None:
self.upsert(TABLE_EVENTS, rows)
[docs]
def get_event(self, event_id: str) -> EventRow | None:
return cast("EventRow | None", self.get(TABLE_EVENTS, event_id=event_id))
[docs]
def list_events(
self,
*,
where: Mapping[str, Any] | None = None,
limit: int | None = None,
) -> list[EventRow]:
return self.query(TABLE_EVENTS, row_cls=EventRow, where=where, limit=limit)
# ---- embeddings --------------------------------------------------
[docs]
def add_embedding(self, row: EmbeddingRow) -> None:
self.add(TABLE_EMBEDDINGS, [row])
[docs]
def add_embeddings(self, rows: Iterable[EmbeddingRow]) -> None:
self.add(TABLE_EMBEDDINGS, rows)
[docs]
def upsert_embedding(self, row: EmbeddingRow) -> None:
self.upsert(TABLE_EMBEDDINGS, [row])
[docs]
def upsert_embeddings(self, rows: Iterable[EmbeddingRow]) -> None:
self.upsert(TABLE_EMBEDDINGS, rows)
[docs]
def get_embedding(self, embedding_id: str) -> EmbeddingRow | None:
return cast(
"EmbeddingRow | None",
self.get(TABLE_EMBEDDINGS, embedding_id=embedding_id),
)
[docs]
def list_embeddings(
self,
*,
where: Mapping[str, Any] | None = None,
limit: int | None = None,
) -> list[EmbeddingRow]:
return self.query(TABLE_EMBEDDINGS, row_cls=EmbeddingRow, where=where, limit=limit)
# ---- virgo_assets ------------------------------------------------
[docs]
def add_virgo_asset(self, row: VirgoAssetRow) -> None:
self.add(TABLE_VIRGO_ASSETS, [row])
[docs]
def add_virgo_assets(self, rows: Iterable[VirgoAssetRow]) -> None:
self.add(TABLE_VIRGO_ASSETS, rows)
[docs]
def upsert_virgo_asset(self, row: VirgoAssetRow) -> None:
self.upsert(TABLE_VIRGO_ASSETS, [row])
[docs]
def upsert_virgo_assets(self, rows: Iterable[VirgoAssetRow]) -> None:
self.upsert(TABLE_VIRGO_ASSETS, rows)
[docs]
def get_virgo_asset(self, asset_id: str) -> VirgoAssetRow | None:
return cast(
"VirgoAssetRow | None",
self.get(TABLE_VIRGO_ASSETS, asset_id=asset_id),
)
[docs]
def list_virgo_assets(
self,
*,
where: Mapping[str, Any] | None = None,
limit: int | None = None,
) -> list[VirgoAssetRow]:
return self.query(TABLE_VIRGO_ASSETS, row_cls=VirgoAssetRow, where=where, limit=limit)
# ---- checkpoints -------------------------------------------------
[docs]
def add_checkpoint(self, row: CheckpointRow) -> None:
self.add(TABLE_CHECKPOINTS, [row])
[docs]
def add_checkpoints(self, rows: Iterable[CheckpointRow]) -> None:
self.add(TABLE_CHECKPOINTS, rows)
[docs]
def upsert_checkpoint(self, row: CheckpointRow) -> None:
self.upsert(TABLE_CHECKPOINTS, [row])
[docs]
def upsert_checkpoints(self, rows: Iterable[CheckpointRow]) -> None:
self.upsert(TABLE_CHECKPOINTS, rows)
[docs]
def get_checkpoint(self, checkpoint_id: str) -> CheckpointRow | None:
return cast(
"CheckpointRow | None",
self.get(TABLE_CHECKPOINTS, checkpoint_id=checkpoint_id),
)
[docs]
def list_checkpoints(
self,
*,
where: Mapping[str, Any] | None = None,
limit: int | None = None,
) -> list[CheckpointRow]:
return self.query(TABLE_CHECKPOINTS, row_cls=CheckpointRow, where=where, limit=limit)
# ---- benchmark_suites --------------------------------------------
[docs]
def add_benchmark_suite(self, row: BenchmarkSuiteRow) -> None:
self.add(TABLE_BENCHMARK_SUITES, [row])
[docs]
def add_benchmark_suites(self, rows: Iterable[BenchmarkSuiteRow]) -> None:
self.add(TABLE_BENCHMARK_SUITES, rows)
[docs]
def upsert_benchmark_suite(self, row: BenchmarkSuiteRow) -> None:
self.upsert(TABLE_BENCHMARK_SUITES, [row])
[docs]
def upsert_benchmark_suites(self, rows: Iterable[BenchmarkSuiteRow]) -> None:
self.upsert(TABLE_BENCHMARK_SUITES, rows)
[docs]
def get_benchmark_suite(self, suite_name: str, suite_version: int) -> BenchmarkSuiteRow | None:
return cast(
"BenchmarkSuiteRow | None",
self.get(
TABLE_BENCHMARK_SUITES,
suite_name=suite_name,
suite_version=suite_version,
),
)
[docs]
def list_benchmark_suites(
self,
*,
where: Mapping[str, Any] | None = None,
limit: int | None = None,
) -> list[BenchmarkSuiteRow]:
return self.query(
TABLE_BENCHMARK_SUITES,
row_cls=BenchmarkSuiteRow,
where=where,
limit=limit,
)
# ---- benchmark_results -------------------------------------------
[docs]
def add_benchmark_result(self, row: BenchmarkResultRow) -> None:
self.add(TABLE_BENCHMARK_RESULTS, [row])
[docs]
def add_benchmark_results(self, rows: Iterable[BenchmarkResultRow]) -> None:
self.add(TABLE_BENCHMARK_RESULTS, rows)
[docs]
def upsert_benchmark_result(self, row: BenchmarkResultRow) -> None:
self.upsert(TABLE_BENCHMARK_RESULTS, [row])
[docs]
def upsert_benchmark_results(self, rows: Iterable[BenchmarkResultRow]) -> None:
self.upsert(TABLE_BENCHMARK_RESULTS, rows)
[docs]
def get_benchmark_result(self, result_id: str) -> BenchmarkResultRow | None:
return cast(
"BenchmarkResultRow | None",
self.get(TABLE_BENCHMARK_RESULTS, result_id=result_id),
)
[docs]
def list_benchmark_results(
self,
*,
where: Mapping[str, Any] | None = None,
limit: int | None = None,
) -> list[BenchmarkResultRow]:
return self.query(
TABLE_BENCHMARK_RESULTS,
row_cls=BenchmarkResultRow,
where=where,
limit=limit,
)
# ---------------------------------------------------------------------------
# Module-level helpers
# ---------------------------------------------------------------------------
# Module-scope cache of JSON-encoded MetadataDict columns by row class.
# Built once at import time so ``_arrow_to_rows`` doesn't pay the cost
# of resolving the row class back to its table on every read.
_JSON_COLS_BY_CLASS: dict[type[CatalogRow], frozenset[str]] = {
cls: JSON_METADATA_COLUMNS.get(table, frozenset()) # type: ignore[misc]
for table, cls in ROW_CLASSES.items()
}
[docs]
def _json_cols_for_class(row_cls: type[CatalogRow]) -> frozenset[str]:
"""Return the JSON-encoded MetadataDict columns for a row class."""
return _JSON_COLS_BY_CLASS.get(row_cls, frozenset())
[docs]
def _pk_in_clause(pk_cols: list[str], keys: list[tuple[Any, ...]]) -> str:
"""Render a WHERE clause matching any PK tuple in ``keys``.
Single-column PKs render as ``col IN (lit, lit, ...)``; composite
PKs render as ``(c1 = v1 AND c2 = v2) OR (...) ...``.
"""
from ursa.catalog._filters import _lit # internal — keeps escape rules in one place
if len(pk_cols) == 1:
col = pk_cols[0]
rendered = ", ".join(_lit(k[0]) for k in keys)
return f"{col} IN ({rendered})"
parts: list[str] = []
for tup in keys:
conjuncts = [f"{c} = {_lit(v)}" for c, v in zip(pk_cols, tup, strict=True)]
parts.append("(" + " AND ".join(conjuncts) + ")")
return " OR ".join(parts)
[docs]
def _json_default(value: Any) -> Any:
"""Best-effort JSON encoder for values that ``json.dumps`` doesn't natively
handle. Used only on the ``__pydantic_extra__`` column, where extras
from forward-compat reads may carry oddball types.
Encoding conventions (decoders must mirror these):
* ``datetime``/``date`` → ISO 8601 string.
* ``timedelta`` → integer microseconds. Use ``timedelta(microseconds=n)``
to decode. Microseconds rather than ``total_seconds()`` so sub-second
precision survives the JSON round-trip without floating-point churn.
* Pydantic ``BaseModel`` → ``model_dump(mode="json")``.
"""
from datetime import date, datetime, timedelta
if isinstance(value, (datetime, date)):
return value.isoformat()
if isinstance(value, timedelta):
return value // timedelta(microseconds=1)
if isinstance(value, BaseModel):
return value.model_dump(mode="json")
raise TypeError(f"cannot JSON-encode {type(value).__name__}: {value!r}")