Source code for ursa.backends.lance

"""Lance-backed reader for ``StorageFormat.LANCE`` modalities.

Used by genuinely-event modalities (keyboard, mouse, browser, notes,
battery, location). High-density "almost-regular" streams like
``pupillabs-gaze`` deliberately go to Zarr instead — see the
Zarr-vs-Lance rule of thumb (fixed payload + dense → Zarr; variable
payload or sparse → Lance).

Two callable surfaces:

* The materialized opener wired through the temporal-class dispatcher
  (:func:`ursa.IrregularTimeSeries.from_uri` → backend factory) loads
  the whole table and populates an ``IrregularTimeSeries`` instance.
* The free-standing :class:`LazyLanceIrregularTimeSeries` exposes a
  column-on-access ``__getattr__`` that pushes
  ``timestamp >= t0 AND timestamp < t1`` filters down to DataFusion on
  the next read after :meth:`slice`. Reach for it via
  :func:`open_lance_irregular_lazy_freestanding` (or the class directly)
  when ``DataInterface.materialize(lazy=False)`` would materialize too much.

Sort-by-timestamp contract: writers MUST sort by ``timestamp`` and emit
an ``ursa.sorted_by="timestamp"`` flag in the dataset's user metadata.
Readers warn (don't raise) if the flag is absent — the slice still
works but the scan is unbounded.

Timestamp-unit contract: the ``timestamp`` column MUST be float64
recording-relative seconds. The reader rejects other Arrow types
(notably int64 nanoseconds) up-front via :func:`_check_timestamp_column`
so a writer that picks the wrong unit fails fast instead of silently
producing a series with a ~55-billion-year domain.
"""

from __future__ import annotations

import math
import warnings
from typing import TYPE_CHECKING, Any
from urllib.parse import urlparse

import numpy as np
from temporaldata import IrregularTimeSeries as _td_IrregularTimeSeries

from ursa.catalog.schemas import ModalityRow
from ursa.store.base import ObjectStore
from ursa.temporal import _build_domain, _check_metadata

if TYPE_CHECKING:
    import lance

__all__ = [
    "LazyLanceIrregularTimeSeries",
    "open_lance_irregular_lazy_freestanding",
    "_open_lance_irregular_materialized",
    "_open_lance_irregular_lazy",
]


_SORT_KEY_METADATA = "ursa.sorted_by"
_SORT_KEY_EXPECTED = "timestamp"


def _open_dataset(uri: str, store: ObjectStore) -> "lance.LanceDataset":
    """Open a Lance dataset behind an :class:`ObjectStore`-bound URI."""
    import lance

    base_url, storage_options = store.lance_connection()
    key = urlparse(uri).path.lstrip("/")
    full_url = f"{base_url.rstrip('/')}/{key}"
    return lance.dataset(full_url, storage_options=storage_options)


def _check_timestamp_column(ds: "lance.LanceDataset", uri: str) -> None:
    """Validate that the ``timestamp`` column is float64 seconds.

    Ursa's Lance event-stream convention is **float64 recording-relative
    seconds**. Some Arrow types coerce silently through
    ``to_numpy(dtype=np.float64)`` and produce values in the wrong unit:
    an ``int64`` column written as nanoseconds would cast to float64 in
    the range ~1e18 (treated as seconds → a domain of ~55 billion years)
    without raising. Refuse the read up-front so writers learn early.

    Accept: ``float64``, ``float32`` (promoted on cast). Reject everything
    else — including Arrow timestamp types — with a clear message
    pointing at the convention.
    """
    import pyarrow as pa

    schema = ds.schema
    field_idx = schema.get_field_index("timestamp")
    if field_idx < 0:
        # The missing-column check fires later with a more specific message;
        # here we just bail so we don't crash on .field(-1).
        return
    ts_type = schema.field(field_idx).type
    if not (pa.types.is_floating(ts_type)):
        raise ValueError(
            f"Lance dataset at {uri!r}: column 'timestamp' has Arrow type "
            f"{ts_type!r}; Ursa requires float64 recording-relative seconds. "
            "If the writer produced int64 nanoseconds or an Arrow timestamp "
            "type, convert to float64 seconds before writing."
        )


def _check_sort_metadata(ds: "lance.LanceDataset", uri: str) -> None:
    """Warn if the writer didn't tag the dataset as timestamp-sorted.

    Lance/DataFusion's predicate pushdown (`timestamp >= t0 AND timestamp < t1`)
    only bounds the scan if per-fragment min/max statistics are useful,
    which requires timestamp ordering. An unsorted dataset still reads
    correctly — it just scans every fragment.
    """
    schema_meta = getattr(ds.schema, "metadata", None) or {}
    # Lance returns metadata keys as ``bytes`` over the Arrow C-data
    # interface; tolerate both bytes and str.
    decoded = {
        (k.decode() if isinstance(k, bytes) else k): (v.decode() if isinstance(v, bytes) else v)
        for k, v in schema_meta.items()
    }
    if decoded.get(_SORT_KEY_METADATA) != _SORT_KEY_EXPECTED:
        warnings.warn(
            f"Lance dataset at {uri!r} has no {_SORT_KEY_METADATA}="
            f"{_SORT_KEY_EXPECTED!r} flag — .slice() pushdown will scan "
            "the full table. Re-write through virgo.ingestion.lance_writer "
            "with sort_by_timestamp=True (the default) to fix.",
            UserWarning,
            stacklevel=3,
        )


[docs] def _open_lance_irregular_materialized( instance: Any, uri: str, *, store: ObjectStore, metadata: ModalityRow | None, max_fallback_bytes: int | None = None, ) -> None: # ``max_fallback_bytes`` is accepted for opener-signature uniformity with # the Zarr branches; Lance reads the full table so the per-chunk cap is moot. del max_fallback_bytes """Backend opener: populate ``instance`` in-place. Reads the entire Lance table and initializes ``instance`` via the upstream ``temporaldata.IrregularTimeSeries.__init__`` plus an ``_metadata`` slot. Mirrors :func:`ursa.backends._zarr._open_zarr_irregular_materialized`. Called by the dispatcher (``_resolve_backend(uri, metadata).materialized_irregular``) when ``metadata.format == StorageFormat.LANCE``. Direct callers should use :meth:`ursa.IrregularTimeSeries.from_uri` rather than invoking this opener. Lance lazy access is intentionally NOT plumbed through ``LazyIrregularTimeSeries.from_uri`` because the upstream lazy contract is built around zarr.Array column handles and Lance's column-scan model doesn't fit it cleanly. Use the free-standing :class:`LazyLanceIrregularTimeSeries` for column-on-demand Lance access. """ ds = _open_dataset(uri, store) if "timestamp" not in ds.schema.names: raise ValueError( f"Lance dataset at {uri!r} is missing required column " "'timestamp'. Did the writer use lance_writer.write_irregular_lance?" ) _check_timestamp_column(ds, uri) _check_sort_metadata(ds, uri) tbl = ds.to_table().to_pandas() timestamps = tbl["timestamp"].to_numpy(dtype=np.float64) payload = {c: tbl[c].to_numpy() for c in tbl.columns if c != "timestamp"} if metadata is not None: domain = _build_domain(metadata, timestamps_array=None) else: domain = _build_domain(metadata, timestamps_array=timestamps) _td_IrregularTimeSeries.__init__(instance, timestamps, domain=domain, **payload) instance._metadata = metadata
[docs] def _open_lance_irregular_lazy( instance: Any, uri: str, *, store: ObjectStore, metadata: ModalityRow | None, max_fallback_bytes: int | None = None, ) -> None: # ``max_fallback_bytes`` accepted for opener-signature uniformity; the # body raises ``NotImplementedError`` so the cap never applies anyway. del max_fallback_bytes """Lance lazy via :class:`ursa.LazyIrregularTimeSeries` is intentionally NYI. The upstream :class:`temporaldata.LazyIrregularTimeSeries` contract gates column materialization on ``isinstance(handle, h5py.Dataset)``; Ursa's Zarr adapter swaps in a ``zarr.Array`` check. Adapting the same gate for Lance would require a third isinstance branch or a Lance-aware column adapter that quacks like one of those handles — out of scope for the initial dispatcher wiring. Callers that want column-on-demand Lance access should use the free-standing :class:`LazyLanceIrregularTimeSeries` directly (constructed from ``ursa.backends.lance.LazyLanceIrregularTimeSeries(uri, store=store, metadata=row)``). """ raise NotImplementedError( "Lance backend lazy mode via LazyIrregularTimeSeries.from_uri is not " "supported. Use the free-standing ursa.backends.lance.LazyLanceIrregularTimeSeries " "for column-on-demand Lance access, or pass lazy=False / call " "IrregularTimeSeries.from_uri for an eager materialized read." )
[docs] class LazyLanceIrregularTimeSeries: """Streaming-pointer Lance reader for irregular event streams. Construction reads only dataset metadata (schema + row count + sort-metadata check). Column access via ``__getattr__`` triggers a per-column Lance scan; if :meth:`slice` was called first, the scan is pushed down to a half-open ``timestamp >= t0 AND timestamp < t1`` filter via DataFusion. Free-standing on purpose: the upstream :class:`temporaldata.LazyIrregularTimeSeries` materialization gate expects ``h5py.Dataset``- or ``zarr.Array``-shaped column handles, and Lance's column-scan model doesn't slot in cleanly. The class exposes the same external surface (``.slice()``, attribute-style column access, ``.metadata``) so callers don't need to branch on backend type. """ # Private attributes that ``__getattr__`` must NEVER intercept (would # cause infinite recursion / mask AttributeError). Listed explicitly so # the schema-typo guard can fire cleanly. _RESERVED = frozenset( { "_ds", "_uri", "_schema_columns", "_n_rows", "_slice_window", "_materialized", "_materialized_shift", "_metadata", } ) def __init__( self, uri: str, *, store: ObjectStore, metadata: ModalityRow | None = None, ) -> None: _check_metadata(metadata, ModalityRow, "LazyLanceIrregularTimeSeries") ds = _open_dataset(uri, store) if "timestamp" not in ds.schema.names: raise ValueError( f"Lance dataset at {uri!r} is missing required column " "'timestamp'. Did the writer use lance_writer.write_irregular_lance?" ) _check_timestamp_column(ds, uri) _check_sort_metadata(ds, uri) self._ds = ds self._uri = uri self._metadata = metadata self._schema_columns: list[str] = ds.schema.names self._n_rows: int = ds.count_rows() # Time window recorded by .slice(); flushed to filter on next col fetch. self._slice_window: tuple[float, float] | None = None # Per-column materialization cache so repeated access of the same # column doesn't issue a fresh Lance scan. self._materialized: dict[str, np.ndarray] = {} # ``slice()`` sets this on the returned instance; initialize here so # ``timestamps`` never has to fall through ``__getattr__`` on a # newly-constructed (unsliced) view. self._materialized_shift: float = 0.0 # --------------------------------------------------------------------- @property def metadata(self) -> ModalityRow | None: return self._metadata
[docs] def __len__(self) -> int: if self._slice_window is None: return self._n_rows # Slice known: materialize timestamps to get the exact count. return int(self.timestamps.shape[0])
[docs] def __repr__(self) -> str: return ( f"{type(self).__name__}(" f"uri={self._uri!r}, " f"n_rows={self._n_rows}, " f"slice_window={self._slice_window})" )
# --------------------------------------------------------------------- # Column access — the lazy ``__getattr__`` (plan Medium-10). # ---------------------------------------------------------------------
[docs] def __getattr__(self, name: str) -> Any: # __getattr__ is only invoked on misses, so accessing self._ds etc. # via normal attribute lookup is safe and won't recurse. Reserved # names that DO miss are bugs — raise immediately. if name in type(self)._RESERVED: raise AttributeError(name) # Typo guard: a column not in the Lance schema is an AttributeError, # NOT an empty-table scan that returns nothing. if name not in self._schema_columns: raise AttributeError( f"{type(self).__name__} has no column {name!r}. " f"Available columns: {self._schema_columns}" ) cached = self._materialized.get(name) if cached is not None: return cached arr = self._fetch_column(name) self._materialized[name] = arr return arr
[docs] def _fetch_column(self, name: str) -> np.ndarray: """Issue the Lance scan for one column with optional time pushdown.""" if self._slice_window is None: tbl = self._ds.to_table(columns=[name]) else: t0, t1 = self._slice_window tbl = self._ds.to_table( columns=[name], filter=f"timestamp >= {t0} AND timestamp < {t1}", ) col = tbl.column(name) # Cast to numpy where possible; fall back to a pandas roundtrip for # exotic Arrow types (struct, list-of-lists) the writer may emit. try: arr: np.ndarray = col.to_numpy(zero_copy_only=False) return arr except Exception: # pragma: no cover - exercised on complex payloads return np.asarray(col.to_pylist())
# --------------------------------------------------------------------- # Slice — records the window; next column fetch pushes it down. # ---------------------------------------------------------------------
[docs] def slice( self, start: float, end: float, reset_origin: bool = True ) -> "LazyLanceIrregularTimeSeries": """Return a new lazy view restricted to ``[start, end)``. ``reset_origin=True`` (default) is recorded for downstream materialization — when the timestamps column is fetched, the values are shifted by ``-start`` before being returned, matching :meth:`temporaldata.IrregularTimeSeries.slice`'s default. Both bounds must be finite. ``inf``/``nan`` would flow into the DataFusion filter as literal `inf`/`nan` and produce an opaque parse error deep inside Lance; we catch that here and raise a clean ``ValueError`` instead. """ if not (math.isfinite(start) and math.isfinite(end)): raise ValueError( f"LazyLanceIrregularTimeSeries.slice bounds must be finite; " f"got start={start!r}, end={end!r}." ) # Build a fresh instance pointing at the same dataset so the cache # isn't shared between the unsliced and sliced views. out = LazyLanceIrregularTimeSeries.__new__(LazyLanceIrregularTimeSeries) out._ds = self._ds out._uri = self._uri out._metadata = self._metadata out._schema_columns = self._schema_columns out._n_rows = self._n_rows out._slice_window = (float(start), float(end)) out._materialized = {} out._materialized_shift = float(start) if reset_origin else 0.0 return out
@property def timestamps(self) -> np.ndarray: """Materialize the timestamp column, applying the slice-origin shift.""" raw = self._materialized.get("timestamp") if raw is None: raw = self._fetch_column("timestamp") self._materialized["timestamp"] = raw shift = self._materialized_shift return raw - shift if shift else raw
[docs] def open_lance_irregular_lazy_freestanding( uri: str, store: ObjectStore, metadata: ModalityRow | None, ) -> LazyLanceIrregularTimeSeries: """Construct the free-standing :class:`LazyLanceIrregularTimeSeries`. NOT the dispatcher opener — that's :func:`_open_lance_irregular_lazy` above and intentionally raises (see its docstring). This helper exists for callers (notably :meth:`DataInterface.materialize`) that want column-on-demand Lance access via the free-standing class rather than through ``LazyIrregularTimeSeries.from_uri``. """ return LazyLanceIrregularTimeSeries(uri, store=store, metadata=metadata)