"""Lance-backed reader for ``StorageFormat.LANCE`` modalities.
Used by genuinely-event modalities (keyboard, mouse, browser, notes,
battery, location). High-density "almost-regular" streams like
``pupillabs-gaze`` deliberately go to Zarr instead — see the
Zarr-vs-Lance rule of thumb (fixed payload + dense → Zarr; variable
payload or sparse → Lance).
Two callable surfaces:
* The materialized opener wired through the temporal-class dispatcher
(:func:`ursa.IrregularTimeSeries.from_uri` → backend factory) loads
the whole table and populates an ``IrregularTimeSeries`` instance.
* The free-standing :class:`LazyLanceIrregularTimeSeries` exposes a
column-on-access ``__getattr__`` that pushes
``timestamp >= t0 AND timestamp < t1`` filters down to DataFusion on
the next read after :meth:`slice`. Reach for it via
:func:`open_lance_irregular_lazy_freestanding` (or the class directly)
when ``DataInterface.materialize(lazy=False)`` would materialize too much.
Sort-by-timestamp contract: writers MUST sort by ``timestamp`` and emit
an ``ursa.sorted_by="timestamp"`` flag in the dataset's user metadata.
Readers warn (don't raise) if the flag is absent — the slice still
works but the scan is unbounded.
Timestamp-unit contract: the ``timestamp`` column MUST be float64
recording-relative seconds. The reader rejects other Arrow types
(notably int64 nanoseconds) up-front via :func:`_check_timestamp_column`
so a writer that picks the wrong unit fails fast instead of silently
producing a series with a ~55-billion-year domain.
"""
from __future__ import annotations
import math
import warnings
from typing import TYPE_CHECKING, Any
from urllib.parse import urlparse
import numpy as np
from temporaldata import IrregularTimeSeries as _td_IrregularTimeSeries
from ursa.catalog.schemas import ModalityRow
from ursa.store.base import ObjectStore
from ursa.temporal import _build_domain, _check_metadata
if TYPE_CHECKING:
import lance
__all__ = [
"LazyLanceIrregularTimeSeries",
"open_lance_irregular_lazy_freestanding",
"_open_lance_irregular_materialized",
"_open_lance_irregular_lazy",
]
_SORT_KEY_METADATA = "ursa.sorted_by"
_SORT_KEY_EXPECTED = "timestamp"
def _open_dataset(uri: str, store: ObjectStore) -> "lance.LanceDataset":
"""Open a Lance dataset behind an :class:`ObjectStore`-bound URI."""
import lance
base_url, storage_options = store.lance_connection()
key = urlparse(uri).path.lstrip("/")
full_url = f"{base_url.rstrip('/')}/{key}"
return lance.dataset(full_url, storage_options=storage_options)
def _check_timestamp_column(ds: "lance.LanceDataset", uri: str) -> None:
"""Validate that the ``timestamp`` column is float64 seconds.
Ursa's Lance event-stream convention is **float64 recording-relative
seconds**. Some Arrow types coerce silently through
``to_numpy(dtype=np.float64)`` and produce values in the wrong unit:
an ``int64`` column written as nanoseconds would cast to float64 in
the range ~1e18 (treated as seconds → a domain of ~55 billion years)
without raising. Refuse the read up-front so writers learn early.
Accept: ``float64``, ``float32`` (promoted on cast). Reject everything
else — including Arrow timestamp types — with a clear message
pointing at the convention.
"""
import pyarrow as pa
schema = ds.schema
field_idx = schema.get_field_index("timestamp")
if field_idx < 0:
# The missing-column check fires later with a more specific message;
# here we just bail so we don't crash on .field(-1).
return
ts_type = schema.field(field_idx).type
if not (pa.types.is_floating(ts_type)):
raise ValueError(
f"Lance dataset at {uri!r}: column 'timestamp' has Arrow type "
f"{ts_type!r}; Ursa requires float64 recording-relative seconds. "
"If the writer produced int64 nanoseconds or an Arrow timestamp "
"type, convert to float64 seconds before writing."
)
def _check_sort_metadata(ds: "lance.LanceDataset", uri: str) -> None:
"""Warn if the writer didn't tag the dataset as timestamp-sorted.
Lance/DataFusion's predicate pushdown (`timestamp >= t0 AND timestamp < t1`)
only bounds the scan if per-fragment min/max statistics are useful,
which requires timestamp ordering. An unsorted dataset still reads
correctly — it just scans every fragment.
"""
schema_meta = getattr(ds.schema, "metadata", None) or {}
# Lance returns metadata keys as ``bytes`` over the Arrow C-data
# interface; tolerate both bytes and str.
decoded = {
(k.decode() if isinstance(k, bytes) else k): (v.decode() if isinstance(v, bytes) else v)
for k, v in schema_meta.items()
}
if decoded.get(_SORT_KEY_METADATA) != _SORT_KEY_EXPECTED:
warnings.warn(
f"Lance dataset at {uri!r} has no {_SORT_KEY_METADATA}="
f"{_SORT_KEY_EXPECTED!r} flag — .slice() pushdown will scan "
"the full table. Re-write through virgo.ingestion.lance_writer "
"with sort_by_timestamp=True (the default) to fix.",
UserWarning,
stacklevel=3,
)
[docs]
def _open_lance_irregular_materialized(
instance: Any,
uri: str,
*,
store: ObjectStore,
metadata: ModalityRow | None,
max_fallback_bytes: int | None = None,
) -> None:
# ``max_fallback_bytes`` is accepted for opener-signature uniformity with
# the Zarr branches; Lance reads the full table so the per-chunk cap is moot.
del max_fallback_bytes
"""Backend opener: populate ``instance`` in-place.
Reads the entire Lance table and initializes ``instance`` via the upstream
``temporaldata.IrregularTimeSeries.__init__`` plus an ``_metadata`` slot.
Mirrors :func:`ursa.backends._zarr._open_zarr_irregular_materialized`.
Called by the dispatcher (``_resolve_backend(uri, metadata).materialized_irregular``)
when ``metadata.format == StorageFormat.LANCE``. Direct callers should use
:meth:`ursa.IrregularTimeSeries.from_uri` rather than invoking this opener.
Lance lazy access is intentionally NOT plumbed through ``LazyIrregularTimeSeries.from_uri``
because the upstream lazy contract is built around zarr.Array column handles
and Lance's column-scan model doesn't fit it cleanly. Use the free-standing
:class:`LazyLanceIrregularTimeSeries` for column-on-demand Lance access.
"""
ds = _open_dataset(uri, store)
if "timestamp" not in ds.schema.names:
raise ValueError(
f"Lance dataset at {uri!r} is missing required column "
"'timestamp'. Did the writer use lance_writer.write_irregular_lance?"
)
_check_timestamp_column(ds, uri)
_check_sort_metadata(ds, uri)
tbl = ds.to_table().to_pandas()
timestamps = tbl["timestamp"].to_numpy(dtype=np.float64)
payload = {c: tbl[c].to_numpy() for c in tbl.columns if c != "timestamp"}
if metadata is not None:
domain = _build_domain(metadata, timestamps_array=None)
else:
domain = _build_domain(metadata, timestamps_array=timestamps)
_td_IrregularTimeSeries.__init__(instance, timestamps, domain=domain, **payload)
instance._metadata = metadata
[docs]
def _open_lance_irregular_lazy(
instance: Any,
uri: str,
*,
store: ObjectStore,
metadata: ModalityRow | None,
max_fallback_bytes: int | None = None,
) -> None:
# ``max_fallback_bytes`` accepted for opener-signature uniformity; the
# body raises ``NotImplementedError`` so the cap never applies anyway.
del max_fallback_bytes
"""Lance lazy via :class:`ursa.LazyIrregularTimeSeries` is intentionally NYI.
The upstream :class:`temporaldata.LazyIrregularTimeSeries` contract gates
column materialization on ``isinstance(handle, h5py.Dataset)``; Ursa's
Zarr adapter swaps in a ``zarr.Array`` check. Adapting the same gate for
Lance would require a third isinstance branch or a Lance-aware column
adapter that quacks like one of those handles — out of scope for the
initial dispatcher wiring.
Callers that want column-on-demand Lance access should use the free-standing
:class:`LazyLanceIrregularTimeSeries` directly (constructed from
``ursa.backends.lance.LazyLanceIrregularTimeSeries(uri, store=store, metadata=row)``).
"""
raise NotImplementedError(
"Lance backend lazy mode via LazyIrregularTimeSeries.from_uri is not "
"supported. Use the free-standing ursa.backends.lance.LazyLanceIrregularTimeSeries "
"for column-on-demand Lance access, or pass lazy=False / call "
"IrregularTimeSeries.from_uri for an eager materialized read."
)
[docs]
class LazyLanceIrregularTimeSeries:
"""Streaming-pointer Lance reader for irregular event streams.
Construction reads only dataset metadata (schema + row count +
sort-metadata check). Column access via ``__getattr__`` triggers a
per-column Lance scan; if :meth:`slice` was called first, the scan is
pushed down to a half-open ``timestamp >= t0 AND timestamp < t1``
filter via DataFusion.
Free-standing on purpose: the upstream
:class:`temporaldata.LazyIrregularTimeSeries` materialization gate
expects ``h5py.Dataset``- or ``zarr.Array``-shaped column handles,
and Lance's column-scan model doesn't slot in cleanly. The class
exposes the same external surface (``.slice()``, attribute-style
column access, ``.metadata``) so callers don't need to branch on
backend type.
"""
# Private attributes that ``__getattr__`` must NEVER intercept (would
# cause infinite recursion / mask AttributeError). Listed explicitly so
# the schema-typo guard can fire cleanly.
_RESERVED = frozenset(
{
"_ds",
"_uri",
"_schema_columns",
"_n_rows",
"_slice_window",
"_materialized",
"_materialized_shift",
"_metadata",
}
)
def __init__(
self,
uri: str,
*,
store: ObjectStore,
metadata: ModalityRow | None = None,
) -> None:
_check_metadata(metadata, ModalityRow, "LazyLanceIrregularTimeSeries")
ds = _open_dataset(uri, store)
if "timestamp" not in ds.schema.names:
raise ValueError(
f"Lance dataset at {uri!r} is missing required column "
"'timestamp'. Did the writer use lance_writer.write_irregular_lance?"
)
_check_timestamp_column(ds, uri)
_check_sort_metadata(ds, uri)
self._ds = ds
self._uri = uri
self._metadata = metadata
self._schema_columns: list[str] = ds.schema.names
self._n_rows: int = ds.count_rows()
# Time window recorded by .slice(); flushed to filter on next col fetch.
self._slice_window: tuple[float, float] | None = None
# Per-column materialization cache so repeated access of the same
# column doesn't issue a fresh Lance scan.
self._materialized: dict[str, np.ndarray] = {}
# ``slice()`` sets this on the returned instance; initialize here so
# ``timestamps`` never has to fall through ``__getattr__`` on a
# newly-constructed (unsliced) view.
self._materialized_shift: float = 0.0
# ---------------------------------------------------------------------
@property
def metadata(self) -> ModalityRow | None:
return self._metadata
[docs]
def __len__(self) -> int:
if self._slice_window is None:
return self._n_rows
# Slice known: materialize timestamps to get the exact count.
return int(self.timestamps.shape[0])
[docs]
def __repr__(self) -> str:
return (
f"{type(self).__name__}("
f"uri={self._uri!r}, "
f"n_rows={self._n_rows}, "
f"slice_window={self._slice_window})"
)
# ---------------------------------------------------------------------
# Column access — the lazy ``__getattr__`` (plan Medium-10).
# ---------------------------------------------------------------------
[docs]
def __getattr__(self, name: str) -> Any:
# __getattr__ is only invoked on misses, so accessing self._ds etc.
# via normal attribute lookup is safe and won't recurse. Reserved
# names that DO miss are bugs — raise immediately.
if name in type(self)._RESERVED:
raise AttributeError(name)
# Typo guard: a column not in the Lance schema is an AttributeError,
# NOT an empty-table scan that returns nothing.
if name not in self._schema_columns:
raise AttributeError(
f"{type(self).__name__} has no column {name!r}. "
f"Available columns: {self._schema_columns}"
)
cached = self._materialized.get(name)
if cached is not None:
return cached
arr = self._fetch_column(name)
self._materialized[name] = arr
return arr
[docs]
def _fetch_column(self, name: str) -> np.ndarray:
"""Issue the Lance scan for one column with optional time pushdown."""
if self._slice_window is None:
tbl = self._ds.to_table(columns=[name])
else:
t0, t1 = self._slice_window
tbl = self._ds.to_table(
columns=[name],
filter=f"timestamp >= {t0} AND timestamp < {t1}",
)
col = tbl.column(name)
# Cast to numpy where possible; fall back to a pandas roundtrip for
# exotic Arrow types (struct, list-of-lists) the writer may emit.
try:
arr: np.ndarray = col.to_numpy(zero_copy_only=False)
return arr
except Exception: # pragma: no cover - exercised on complex payloads
return np.asarray(col.to_pylist())
# ---------------------------------------------------------------------
# Slice — records the window; next column fetch pushes it down.
# ---------------------------------------------------------------------
[docs]
def slice(
self, start: float, end: float, reset_origin: bool = True
) -> "LazyLanceIrregularTimeSeries":
"""Return a new lazy view restricted to ``[start, end)``.
``reset_origin=True`` (default) is recorded for downstream
materialization — when the timestamps column is fetched, the
values are shifted by ``-start`` before being returned, matching
:meth:`temporaldata.IrregularTimeSeries.slice`'s default.
Both bounds must be finite. ``inf``/``nan`` would flow into the
DataFusion filter as literal `inf`/`nan` and produce an opaque
parse error deep inside Lance; we catch that here and raise a
clean ``ValueError`` instead.
"""
if not (math.isfinite(start) and math.isfinite(end)):
raise ValueError(
f"LazyLanceIrregularTimeSeries.slice bounds must be finite; "
f"got start={start!r}, end={end!r}."
)
# Build a fresh instance pointing at the same dataset so the cache
# isn't shared between the unsliced and sliced views.
out = LazyLanceIrregularTimeSeries.__new__(LazyLanceIrregularTimeSeries)
out._ds = self._ds
out._uri = self._uri
out._metadata = self._metadata
out._schema_columns = self._schema_columns
out._n_rows = self._n_rows
out._slice_window = (float(start), float(end))
out._materialized = {}
out._materialized_shift = float(start) if reset_origin else 0.0
return out
@property
def timestamps(self) -> np.ndarray:
"""Materialize the timestamp column, applying the slice-origin shift."""
raw = self._materialized.get("timestamp")
if raw is None:
raw = self._fetch_column("timestamp")
self._materialized["timestamp"] = raw
shift = self._materialized_shift
return raw - shift if shift else raw
[docs]
def open_lance_irregular_lazy_freestanding(
uri: str,
store: ObjectStore,
metadata: ModalityRow | None,
) -> LazyLanceIrregularTimeSeries:
"""Construct the free-standing :class:`LazyLanceIrregularTimeSeries`.
NOT the dispatcher opener — that's :func:`_open_lance_irregular_lazy`
above and intentionally raises (see its docstring). This helper exists for
callers (notably :meth:`DataInterface.materialize`) that want
column-on-demand Lance access via the free-standing class rather than
through ``LazyIrregularTimeSeries.from_uri``.
"""
return LazyLanceIrregularTimeSeries(uri, store=store, metadata=metadata)