"""MP4-backed lazy video reader for ``StorageFormat.MP4_INDEX`` modalities.
Ported from ``neuro-galaxy/temporaldata@ian/lazy-everything/temporaldata/lazy_video.py``
(2026-05 snapshot) with four adaptations:
1. ``video_file`` paths → ``uri: str`` + :class:`ObjectStore`. Segments are
downloaded to a process-global LRU cache keyed by ``(uri, etag)`` on first
slice; PyAV decodes the local copy. Cache directory and quota are
controlled by the module-level :data:`_DEFAULT_CACHE_DIR` and
:data:`_DEFAULT_CACHE_QUOTA_GB` constants — tests and operators
override by monkeypatching them and calling
:func:`_reset_cache_for_tests`.
2. ``segment_frame_counts`` / ``segment_pts_indices`` → :class:`LanceFrameIndex`.
The frame index is a sibling Lance table at
:func:`ursa.layout.lance_frame_index_uri(uri)` with columns
``(segment_idx, local_frame_idx, pts, timestamp)``. The original PyAV
demux fallback survives for callers that open an MP4 without an index
(tests, ad-hoc inspection); it is a transitional path slated for removal
once every processed recording carries an index.
3. ``to_hdf5`` / ``from_hdf5`` removed. Persistent state is the catalog
:class:`ModalityRow` + storage URI; HDF5 would just duplicate it.
4. ``metadata: ModalityRow | None`` slot mirroring the rest of
:mod:`ursa.temporal`. ``.slice()`` propagates the same instance to the
returned ``IrregularTimeSeries`` (identity-preserving — pinned by test).
Fork-safety: ``stream.thread_count = 1`` + ``thread_type = "NONE"`` is kept
verbatim — FFmpeg's frame/slice worker threads deadlock in
``avcodec_free_context`` if the codec context was created in the parent and
released in a forked child (PyTorch ``DataLoader(num_workers>0)`` defaults to
fork on Linux). The constraint is read-side; the writer side imposes the
same setting on its own PyAV containers.
PyAV is an optional dep (``ursa[video]``); construction raises a clear
``ImportError`` with the install hint if missing.
"""
from __future__ import annotations
import hashlib
import logging
import os
import shutil
import tempfile
import threading
from pathlib import Path
from typing import TYPE_CHECKING, Any, Sequence
from urllib.parse import urlparse
import numpy as np
from temporaldata import IrregularTimeSeries as _IrregularTimeSeries
from ursa.catalog.schemas import ModalityRow
from ursa.layout import lance_frame_index_uri, mp4_segment_uris
from ursa.store.base import ObjectMeta, ObjectStore
from ursa.temporal import _check_metadata
if TYPE_CHECKING:
import lance # only for type hints; runtime import is lazy
__all__ = ["LazyVideo", "LanceFrameIndex"]
# ---------------------------------------------------------------------------
# PyAV lazy-import — keeps `import ursa.backends.video` cheap when PyAV isn't
# installed and lets the optional-extras model work cleanly.
# ---------------------------------------------------------------------------
_av: Any = None
def _av_module() -> Any:
"""Lazy-import PyAV once. Raises with the install hint if missing."""
global _av
if _av is None:
try:
import av as _av_mod
except ImportError as exc: # pragma: no cover - optional-extra hint
raise ImportError(
"PyAV is not installed. ``ursa.LazyVideo`` requires the "
"``video`` extra: `uv add 'ursa[video]'`."
) from exc
_av = _av_mod
return _av
# ---------------------------------------------------------------------------
# Process-global LRU segment cache. Keyed by (uri, etag) so cache hits across
# processes are safe and overwriting an MP4 in R2 invalidates stale entries.
# ---------------------------------------------------------------------------
#: Directory under which :class:`LazyVideo` downloads MP4 segments on
#: first slice. Defaults to ``$TMPDIR/ursa-video-cache``. Lives outside
#: ``$HOME`` so a stale cache never lands in a user's home folder.
#: Tests / operators override by monkeypatching this constant and
#: calling :func:`_reset_cache_for_tests` so the singleton picks up the
#: new value.
_DEFAULT_CACHE_DIR: Path = Path(tempfile.gettempdir()) / "ursa-video-cache"
#: Byte quota for the segment cache, in gigabytes. LRU eviction
#: enforces the quota on every new download. Default 10 GB.
_DEFAULT_CACHE_QUOTA_GB: int = 10
class _VideoSegmentCache:
"""LRU cache of locally-materialized MP4 segments.
Quota is :data:`_DEFAULT_CACHE_QUOTA_GB` × ``2**30`` bytes by default.
Entries are downloaded on miss, touched on hit, and the oldest entries
are removed when a new download would exceed the quota.
Thread-safe via a single coarse lock — concurrent slices on the same
:class:`LazyVideo` serialize their downloads. The cache is process-global
so several :class:`LazyVideo` instances pointing at the same segment
share one local file.
"""
def __init__(self, root: Path, quota_bytes: int) -> None:
self._root = root
self._quota = quota_bytes
self._lock = threading.Lock()
self._root.mkdir(parents=True, exist_ok=True)
def _entry_path(self, uri: str, etag: str) -> Path:
# Hash the URI so the on-disk name is stable across machines and
# short enough not to bump into filesystem name limits. Embed the
# etag so a stale (uri, etag) can't be confused for the current one.
digest = hashlib.sha256(uri.encode("utf-8")).hexdigest()[:16]
safe_etag = etag.strip('"').replace("/", "_") or "noetag"
return self._root / f"{digest}.{safe_etag}.mp4"
def fetch(self, store: ObjectStore, uri: str) -> Path:
"""Return a local path to ``uri``'s bytes. Downloads on miss."""
key = urlparse(uri).path.lstrip("/")
meta: ObjectMeta = store.head(key)
etag = meta.etag or "noetag"
target = self._entry_path(uri, etag)
with self._lock:
if target.exists():
# LRU touch via mtime update; the next eviction sweep sees
# this entry as most-recently-used.
os.utime(target, None)
return target
# Evict if needed before reserving space for the new entry.
self._evict_until(meta.size)
tmp = target.with_suffix(target.suffix + ".part")
try:
with store.open(key) as src, tmp.open("wb") as dst:
shutil.copyfileobj(src, dst)
tmp.replace(target)
finally:
tmp.unlink(missing_ok=True)
return target
def _evict_until(self, headroom_bytes: int) -> None:
"""Drop oldest entries until ``headroom_bytes`` more fits in quota.
Collects ``(size, mtime, path)`` up-front so the eviction loop never
re-stats a file — guards against an external tmp-cleaner (macOS's
launchd periodic cleaner, systemd-tmpfiles, a colleague's
``rm -rf /tmp/ursa-video-cache``) removing a cache entry between
the glob and the loop iteration. The initial ``stat()`` is also
guarded so a race-deleted entry mid-glob doesn't propagate
``FileNotFoundError`` out of ``LazyVideo.slice()``.
"""
entries: list[tuple[int, float, Path]] = []
for p in self._root.glob("*.mp4"):
try:
st = p.stat()
except FileNotFoundError:
continue
if p.is_file():
entries.append((st.st_size, st.st_mtime, p))
entries.sort(key=lambda t: t[1])
used = sum(size for size, _, _ in entries)
while entries and used + headroom_bytes > self._quota:
size, _, victim = entries.pop(0)
used -= size
victim.unlink(missing_ok=True)
def clear(self) -> None:
"""Drop every cached entry. Test helper."""
with self._lock:
for p in self._root.glob("*.mp4"):
p.unlink(missing_ok=True)
_cache_singleton: _VideoSegmentCache | None = None
_cache_singleton_lock = threading.Lock()
def _get_cache() -> _VideoSegmentCache:
"""Return the process-global segment cache, building it on first use.
Constructs the cache from the module-level :data:`_DEFAULT_CACHE_DIR`
and :data:`_DEFAULT_CACHE_QUOTA_GB` on the first call only; the
singleton then stays put for the lifetime of the process. Tests
that monkeypatch those constants must call
:func:`_reset_cache_for_tests` afterwards to discard the prior
singleton — otherwise the new values are ignored.
"""
global _cache_singleton
with _cache_singleton_lock:
if _cache_singleton is None:
_cache_singleton = _VideoSegmentCache(
root=_DEFAULT_CACHE_DIR,
quota_bytes=_DEFAULT_CACHE_QUOTA_GB * (2**30),
)
return _cache_singleton
def _reset_cache_for_tests() -> None:
"""Drop the cache singleton so tests that monkeypatch config re-init."""
global _cache_singleton
with _cache_singleton_lock:
if _cache_singleton is not None:
_cache_singleton.clear()
_cache_singleton = None
# ---------------------------------------------------------------------------
# Lance frame index — sidecar table written by the upstream MP4 writer.
# ---------------------------------------------------------------------------
[docs]
class LanceFrameIndex:
"""Sibling Lance table at ``<mp4_uri>.lance`` describing one MP4's frames.
Schema (the writer side emits this; this module only reads):
.. code-block:: text
segment_idx: int64 # which underlying segment file
local_frame_idx: int64 # 0..segment_frame_count-1
pts: int64 # PyAV PTS (codec-time-base units)
timestamp: float64 # recording-relative seconds
Sorted by ``(segment_idx, local_frame_idx)`` (i.e. presentation order
within each segment). Construction is metadata-only; per-segment PTS
arrays are loaded lazily.
Rebuildable from the source MP4 via :func:`_probe_segment` as a fallback.
"""
def __init__(self, dataset: "lance.LanceDataset", uri: str) -> None:
self._ds = dataset
self._uri = uri
self._pts_cache: dict[int, np.ndarray] = {}
# Lazy — populated on first call.
self._segment_frame_counts_cache: np.ndarray | None = None
self._timestamps_cache: np.ndarray | None = None
[docs]
@classmethod
def open(cls, uri: str, store: ObjectStore) -> "LanceFrameIndex":
"""Open the frame-index dataset. Lance pulls only metadata + footer."""
import lance
url, storage_options = store.lance_connection()
# Strip the URI's scheme/bucket and use the lance_connection's base
# URL + the key. lance_connection returns a (base_url, options) pair
# already pointing at the right bucket; we just append the key.
key = urlparse(uri).path.lstrip("/")
full_url = f"{url.rstrip('/')}/{key}"
ds = lance.dataset(full_url, storage_options=storage_options)
return cls(ds, uri)
[docs]
def segment_frame_counts(self) -> np.ndarray:
"""Frames per segment as an ``int64`` ndarray of length ``n_segments``."""
if self._segment_frame_counts_cache is None:
tbl = self._ds.to_table(columns=["segment_idx"]).to_pandas()
# Histogram-style count per segment_idx.
if len(tbl) == 0:
self._segment_frame_counts_cache = np.zeros((0,), dtype=np.int64)
else:
max_seg = int(tbl["segment_idx"].max())
counts = np.zeros(max_seg + 1, dtype=np.int64)
vc = tbl["segment_idx"].value_counts()
for idx, count in vc.items():
counts[int(idx)] = int(count)
self._segment_frame_counts_cache = counts
return self._segment_frame_counts_cache
[docs]
def pts_table(self, segment_idx: int) -> np.ndarray:
"""Return the PTS array for ``segment_idx`` in presentation order."""
cached = self._pts_cache.get(segment_idx)
if cached is not None:
return cached
tbl = self._ds.to_table(
columns=["local_frame_idx", "pts"],
filter=f"segment_idx = {segment_idx}",
).to_pandas()
if len(tbl) == 0:
raise ValueError(
f"LanceFrameIndex at {self._uri!r} has no rows for segment_idx={segment_idx}"
)
tbl = tbl.sort_values("local_frame_idx")
arr: np.ndarray = tbl["pts"].to_numpy(dtype=np.int64)
self._pts_cache[segment_idx] = arr
return arr
[docs]
def timestamps(self) -> np.ndarray:
"""Flat presentation-ordered timestamps across all segments."""
if self._timestamps_cache is None:
tbl = self._ds.to_table(
columns=["segment_idx", "local_frame_idx", "timestamp"],
).to_pandas()
tbl = tbl.sort_values(["segment_idx", "local_frame_idx"])
self._timestamps_cache = tbl["timestamp"].to_numpy(dtype=np.float64)
return self._timestamps_cache
# ---------------------------------------------------------------------------
# PyAV-based fallbacks (used when no Lance frame index exists — tests,
# ad-hoc inspection, or any MP4 opened without a sidecar).
# ---------------------------------------------------------------------------
def _probe_segment_demux(path: str) -> np.ndarray:
"""Walk ``container.demux()``; collect packet PTS sorted ascending.
Order-of-magnitude faster than the decode probe — skips IDCT, motion
comp, swscale. Assumes ``sorted(packet.pts) == decode_order_pts`` which
holds for any standard GOP-structured H.264/H.265 stream. The caller
falls back to :func:`_probe_segment_decode` on exception.
"""
av = _av_module()
container = av.open(path)
try:
stream = container.streams.video[0]
ptses: list[int] = []
for packet in container.demux(stream):
if packet.pts is not None:
ptses.append(int(packet.pts))
ptses.sort()
return np.asarray(ptses, dtype=np.int64)
finally:
container.close()
def _probe_segment_decode(path: str) -> np.ndarray:
"""Slow path: walk ``container.decode()`` for presentation-order PTS."""
av = _av_module()
container = av.open(path)
try:
stream = container.streams.video[0]
ptses: list[int] = []
for frame in container.decode(stream):
if frame.pts is not None:
ptses.append(int(frame.pts))
return np.asarray(ptses, dtype=np.int64)
finally:
container.close()
def _probe_segment(path: str) -> tuple[int, np.ndarray]:
"""Return ``(frame_count, pts_array)`` for ``path``."""
try:
ptses_arr = _probe_segment_demux(path)
if ptses_arr.size > 0:
return len(ptses_arr), ptses_arr
except Exception as ex: # pragma: no cover - exercised on malformed files
logging.warning(
"LazyVideo: demux probe failed for %s (%s); falling back to decode probe.",
path,
ex,
)
ptses_arr = _probe_segment_decode(path)
return len(ptses_arr), ptses_arr
# ---------------------------------------------------------------------------
# The class.
# ---------------------------------------------------------------------------
[docs]
class LazyVideo:
r"""Lazy-decoded video for ``StorageFormat.MP4_INDEX`` modalities.
Construction is metadata-only **when a** :class:`LanceFrameIndex`
**is provided**: no MP4 GET, no frame decode happens until the first
:meth:`slice` call, which downloads the required segment(s) into the
process-global LRU cache and decodes the requested frames via PyAV.
When ``frame_index=None`` the constructor falls back to PyAV-demuxing
every segment up-front to recover frame counts + PTS tables; this
path DOES download bytes and probe PTS at construction time. Use the
Lance-frame-index path (the default through :meth:`from_uri`) for
the metadata-only contract.
Seeks are keyframe-aligned (``container.seek(pts, any_frame=False,
backward=True)``) and the decoder is then advanced frame-by-frame to the
target PTS, so frames returned mid-GOP are bit-correct (no
``mmco: unref short failure`` corruption).
Picklable: no PyAV container/stream/reformatter handles are stored on
``self`` (they live as locals inside :meth:`_load_frames`), so a
``LazyVideo`` survives the ``pickle.dumps`` that DataLoader workers
perform across the fork boundary.
Args:
uri: ``r2://`` URI of the MP4 (or the first segment if multi-segment).
store: :class:`ObjectStore` to read from — typically
:attr:`DataInterface.assets_ro_store`.
frame_index: :class:`LanceFrameIndex` for the sibling sidecar, or
``None`` to PyAV-demux the segments lazily (test-only fallback;
see module docstring).
metadata: optional :class:`ModalityRow` to carry through to
:meth:`slice` results.
segment_uris: additional URIs for multi-segment videos. If absent,
``uri`` is the sole segment.
resize: ``(height, width)`` to resize frames to, or ``None`` for
original dimensions.
colorspace: ``"RGB"`` or ``"G"``.
channel_format: ``"NCHW"`` or ``"NHWC"``.
"""
def __init__(
self,
uri: str,
*,
store: ObjectStore,
frame_index: LanceFrameIndex | None = None,
metadata: ModalityRow | None = None,
segment_uris: Sequence[str] | None = None,
resize: tuple[int, int] | None = None,
colorspace: str = "RGB",
channel_format: str = "NCHW",
) -> None:
# Validate PyAV is importable but do not open any container here —
# picklability of `self` depends on no `av.Container` ever landing on
# the instance.
_av_module()
_check_metadata(metadata, ModalityRow, "LazyVideo")
if resize is not None and (not isinstance(resize, tuple) or len(resize) != 2):
raise ValueError('"resize" must be None or a (height, width) tuple')
if colorspace not in ("RGB", "G"):
raise ValueError('"colorspace" must be "RGB" or "G"')
if channel_format not in ("NCHW", "NHWC"):
raise ValueError('"channel_format" must be "NCHW" or "NHWC"')
all_uris: list[str] = [uri]
if segment_uris:
all_uris.extend(str(s) for s in segment_uris)
self._store = store
self._segment_uris: list[str] = all_uris
self.uri = uri # primary URI (used by repr / metadata-style display)
self.resize = resize
self.colorspace = colorspace
self.channel_format = channel_format
self._metadata = metadata
self._frame_index = frame_index
# Per-segment PTS cache (populated on first decode of a segment).
# When `frame_index` is set, PTS arrays come from Lance; otherwise
# PyAV demuxes the local file on first need.
self._pts_cache: list[np.ndarray | None] = [None] * len(all_uris)
if frame_index is not None:
self.segment_frame_counts = np.asarray(
frame_index.segment_frame_counts(), dtype=np.int64
)
# ``copy=True`` is load-bearing: ``LanceFrameIndex.timestamps()``
# caches the underlying ndarray; the non-monotonic re-sort below
# would otherwise permute that cache and corrupt subsequent
# ``LazyVideo`` instances built on the same index.
self.timestamps = frame_index.timestamps().astype(np.float64, copy=True)
else:
# Fallback: pull each segment locally and demux it.
counts: list[int] = []
pts: list[np.ndarray] = []
for u in all_uris:
path = _get_cache().fetch(store, u)
n, p = _probe_segment(str(path))
counts.append(n)
pts.append(p)
self.segment_frame_counts = np.asarray(counts, dtype=np.int64)
self._pts_cache = list(pts)
# No real timestamps known without a frame index — synthesize
# uniform 0..N indices in seconds-of-presentation order.
n_total = int(self.segment_frame_counts.sum())
self.timestamps = np.arange(n_total, dtype=np.float64)
if len(self.segment_frame_counts) != len(all_uris):
raise ValueError(
f"frame_index reports {len(self.segment_frame_counts)} segment(s) "
f"but {len(all_uris)} URI(s) were provided"
)
# Running prefix sum of segment frame counts for fast segment lookup.
self.segment_frame_offsets = np.concatenate(
([0], np.cumsum(self.segment_frame_counts[:-1]))
).astype(np.int64)
frame_count = int(self.segment_frame_counts.sum())
ts_len = int(self.timestamps.shape[0])
if frame_count != ts_len:
if frame_count > ts_len:
# Tolerate a small trailing overshoot (mirrors Ian's
# original behavior — codec sometimes emits N+1 frames where
# the timestamp series tops out at N).
frame_count = ts_len
else:
raise ValueError(
f"Frame count mismatch: counts sum to {frame_count} "
f"but timestamps have {ts_len} entries"
)
self.frame_count = frame_count
self.frame_indices = np.arange(frame_count, dtype=np.int64)
# Defensive: re-sort if the frame_index returned non-monotonic
# timestamps (shouldn't happen for well-formed Lance indices).
if frame_count > 1 and np.any(np.diff(self.timestamps[:frame_count]) < 0):
sort_idx = np.argsort(self.timestamps[:frame_count])
self.timestamps[:frame_count] = self.timestamps[:frame_count][sort_idx]
self.frame_indices = sort_idx.astype(np.int64)
logging.info("LazyVideo: re-sorted %d non-monotonic timestamps", frame_count)
# ---------------------------------------------------------------------
# Properties
# ---------------------------------------------------------------------
@property
def metadata(self) -> ModalityRow | None:
return self._metadata
[docs]
def __len__(self) -> int:
return self.frame_count
[docs]
def __repr__(self) -> str:
return (
f"{type(self).__name__}("
f"uri={self.uri!r}, "
f"frames={self.frame_count}, "
f"segments={len(self._segment_uris)})"
)
# ---------------------------------------------------------------------
# Concat — single-recording only.
# ---------------------------------------------------------------------
[docs]
@classmethod
def from_uri(
cls,
uri: str,
*,
store: ObjectStore,
metadata: ModalityRow | None = None,
) -> "LazyVideo":
"""Open *uri* as a lazy MP4 video.
Mirrors :meth:`ursa.RegularTimeSeries.from_uri` /
:meth:`ursa.LazyIrregularTimeSeries.from_uri` so callers can use a
uniform construction surface across backends. Resolves the sibling
Lance frame index via :func:`ursa.layout.lance_frame_index_uri`; on
a missing sidecar, falls back to demuxing the MP4 itself.
Video is intentionally lazy-only. An eager equivalent would
decode every frame at construction, which defeats the point of
``MP4_INDEX`` for any realistic recording.
``DataInterface.materialize(..., lazy=False)`` therefore still returns a
``LazyVideo`` for video subfields.
Video isn't dispatched through :class:`_BackendOpeners` because the
regular/irregular split doesn't fit per-frame video access.
:class:`ursa.DataInterface` invokes this classmethod directly when
``ModalityRow.format == StorageFormat.MP4_INDEX``.
"""
return _open_mp4_video_lazy(uri, store, metadata)
[docs]
@classmethod
def concat(cls, videos: Sequence["LazyVideo"]) -> "LazyVideo":
"""Concatenate segments of one logical video.
Single-recording only — cross-recording concat is not supported
because aligned time domains across recordings are only
established by the per-recording processed path.
Multi-video concat with frame indexes is also rejected: the
receiver would have to merge ``LanceFrameIndex`` instances from
each input, which the writer-side helper has not been built yet.
Calling ``concat([a])`` (single-video — i.e. metadata-only
re-wrap) stays supported.
"""
if len(videos) == 0:
raise ValueError("LazyVideo.concat requires at least one video")
first = videos[0]
first_hash = first._metadata.recording_hash if first._metadata else None
for v in videos[1:]:
if not isinstance(v, cls):
raise TypeError("LazyVideo.concat: all items must be LazyVideo")
v_hash = v._metadata.recording_hash if v._metadata else None
if first_hash is not None and v_hash is not None and first_hash != v_hash:
raise ValueError(
"LazyVideo.concat is single-recording only; cross-recording "
"concat requires aligned time domains, which are only "
"established by the per-recording processed path."
)
if v.resize != first.resize:
raise ValueError("LazyVideo.concat: resize must match across videos")
if v.colorspace != first.colorspace:
raise ValueError("LazyVideo.concat: colorspace must match")
if v.channel_format != first.channel_format:
raise ValueError("LazyVideo.concat: channel_format must match")
# Multi-video concat with any frame index is unsafe: the first
# video's index would silently be used to derive frame offsets
# across every segment, which is wrong. Reject explicitly until
# the writer ships a LanceFrameIndex-merge helper.
if len(videos) > 1 and any(v._frame_index is not None for v in videos):
raise NotImplementedError(
"LazyVideo.concat with multiple videos and a Lance frame index "
"is not supported: merging per-video frame-index tables into a "
"single contiguous index is not implemented. Either materialize "
"each video's frames separately, or drop the frame indexes "
"(pass frame_index=None) so concat falls back to the demux path."
)
all_uris = [u for v in videos for u in v._segment_uris]
return cls(
uri=all_uris[0],
store=first._store,
frame_index=first._frame_index,
metadata=first._metadata,
segment_uris=all_uris[1:] if len(all_uris) > 1 else None,
resize=first.resize,
colorspace=first.colorspace,
channel_format=first.channel_format,
)
# ---------------------------------------------------------------------
# Lazy slice — returns a windowed LazyVideo with no decode.
# ---------------------------------------------------------------------
[docs]
def lazy_slice(self, start: float, end: float) -> "LazyVideo":
"""Return a new :class:`LazyVideo` windowed to ``[start, end)`` with
**no PyAV decode** at call time.
The returned object is still a :class:`LazyVideo` carrying a recorded
frame-range window; frames decode only on the next :meth:`slice` call,
and only for the windowed range. ``_apply_time_window`` calls this
form so that ``stream(time_range=…)`` never triggers a segment
download or decode.
Implementation: ``np.searchsorted`` on the in-memory timestamp array
to find ``idx_l:idx_r``, then ``object.__new__`` + attribute shallow-
copy with the windowed ``timestamps``, ``frame_indices``, and
``frame_count`` substituted. All per-segment metadata
(``segment_frame_counts``, ``segment_frame_offsets``, ``_pts_cache``,
etc.) is shared by reference — these use **global** frame indices, so
they remain correct for the windowed object's :meth:`_segment_for_frame`
lookups.
``reset_origin`` is always ``False``: the returned series keeps
original recording-relative coordinates.
If ``start >= end`` or the window covers no frames, an empty
:class:`LazyVideo` (``frame_count=0``) is returned rather than raising.
"""
ts = self.timestamps[: self.frame_count]
idx_l = int(np.searchsorted(ts, start, side="left"))
idx_r = int(np.searchsorted(ts, end, side="left"))
# Clamp to valid range.
idx_l = max(0, min(idx_l, self.frame_count))
idx_r = max(idx_l, min(idx_r, self.frame_count))
# Build the windowed copy without calling __init__ (which would
# re-open the Lance frame index or re-demux the MP4 — both are
# network I/O that must not happen here).
windowed: LazyVideo = object.__new__(LazyVideo)
# Shared state — safe because all these are read-only after __init__.
windowed._store = self._store
windowed._segment_uris = self._segment_uris
windowed.uri = self.uri
windowed.resize = self.resize
windowed.colorspace = self.colorspace
windowed.channel_format = self.channel_format
windowed._metadata = self._metadata
windowed._frame_index = self._frame_index
windowed._pts_cache = self._pts_cache # shared — populated lazily
windowed.segment_frame_counts = self.segment_frame_counts
windowed.segment_frame_offsets = self.segment_frame_offsets
# Windowed state — new views into the parent's arrays.
windowed.timestamps = ts[idx_l:idx_r].copy()
windowed.frame_indices = self.frame_indices[idx_l:idx_r].copy()
windowed.frame_count = idx_r - idx_l
return windowed
# ---------------------------------------------------------------------
# Slice — the main user-facing path (eager decode).
# ---------------------------------------------------------------------
[docs]
def slice(self, start: float, end: float, reset_origin: bool = True) -> _IrregularTimeSeries:
r"""Return an :class:`IrregularTimeSeries` of decoded frames in
``[start, end)`` (end-exclusive).
``reset_origin=True`` (default) shifts the returned ``timestamps`` to
be relative to ``start``; ``False`` keeps absolute camera time.
"""
# Build a one-shot IrregularTimeSeries over (timestamps, frame_indices)
# so we can leverage upstream's slice() for the time→index filter
# in one place rather than re-deriving it here.
scratch = _IrregularTimeSeries(
timestamps=np.asarray(self.timestamps[: self.frame_count], dtype=np.float64),
frame_indices=self.frame_indices,
domain="auto",
)
sliced = scratch.slice(start=start, end=end, reset_origin=reset_origin)
frames = self._load_frames(sliced.frame_indices)
# Attach the decoded frames + propagate metadata IDENTITY (High-5).
sliced.frames = frames
sliced._metadata = self._metadata
return sliced
# ---------------------------------------------------------------------
# Frame decode — verbatim from Ian's port with the path resolution
# routed through the cache.
# ---------------------------------------------------------------------
[docs]
def _segment_for_frame(self, frame_index: int) -> tuple[int, int]:
segment_idx = int(
np.searchsorted(self.segment_frame_offsets, frame_index, side="right") - 1
)
segment_start = int(self.segment_frame_offsets[segment_idx])
local_index = int(frame_index - segment_start)
return segment_idx, local_index
[docs]
def _ensure_pts_table(self, segment_idx: int) -> np.ndarray:
cached = self._pts_cache[segment_idx]
if cached is not None:
return cached
if self._frame_index is not None:
arr = self._frame_index.pts_table(segment_idx)
else:
path = _get_cache().fetch(self._store, self._segment_uris[segment_idx])
_, arr = _probe_segment(str(path))
expected = int(self.segment_frame_counts[segment_idx])
if len(arr) != expected:
if len(arr) > expected:
arr = arr[:expected]
else:
raise ValueError(
f"Segment {segment_idx} reports {expected} frames but "
f"PyAV demuxed {len(arr)} PTS entries"
)
self._pts_cache[segment_idx] = arr
return arr
[docs]
def _empty_frames_array(self) -> np.ndarray:
n_channels = 3 if self.colorspace == "RGB" else 1
if self.channel_format == "NCHW":
return np.zeros((0, n_channels, 1, 1), dtype="uint8")
return np.zeros((0, 1, 1, n_channels), dtype="uint8")
[docs]
def _resolve_segment_path(self, segment_idx: int) -> str:
"""Cache-resolve a segment URI to a local-fs path for PyAV."""
local = _get_cache().fetch(self._store, self._segment_uris[segment_idx])
return str(local)
[docs]
def _load_frames(self, frame_indices: np.ndarray) -> np.ndarray:
"""Decode the requested presentation-ordered frames.
Implementation notes (verbatim from Ian's port — keep these):
* Indices are sorted by ``(segment, local_index)`` so each segment is
walked forward in presentation order; the only seeks are at
segment boundaries (or when the caller passes a non-monotonic
sequence).
* Single-threaded decode (``stream.thread_count = 1``,
``stream.thread_type = "NONE"``) avoids the libavcodec frame/slice
thread + ``os.fork()`` deadlock in ``avcodec_free_context``. This
shows up in any consumer that loads ``LazyVideo`` from a forked
child (PyTorch ``DataLoader(num_workers>0)`` defaults to fork on
Linux).
* A single ``av.video.reformatter.VideoReformatter`` does
colorspace + resize via libswscale.
"""
av = _av_module()
n_frames = len(frame_indices)
if n_frames == 0:
return self._empty_frames_array()
n_channels = 3 if self.colorspace == "RGB" else 1
target_format = "rgb24" if self.colorspace == "RGB" else "gray8"
seg_arr = np.empty(n_frames, dtype=np.int64)
local_arr = np.empty(n_frames, dtype=np.int64)
for i, fidx in enumerate(frame_indices):
s, loc = self._segment_for_frame(int(fidx))
seg_arr[i] = s
local_arr[i] = loc
order = np.lexsort((local_arr, seg_arr))
frames: np.ndarray | None = None
out_h: int | None = None
out_w: int | None = None
container = None
stream = None
decode_iter = None
last_pts: int | None = None
cur_seg = -1
pts_table: np.ndarray | None = None
reformatter = None
seg_frame_cache: dict[int, np.ndarray] = {}
try:
for k, sorted_i in enumerate(order):
sorted_i = int(sorted_i)
seg_idx = int(seg_arr[sorted_i])
local_idx = int(local_arr[sorted_i])
if seg_idx != cur_seg:
if container is not None:
container.close()
path = self._resolve_segment_path(seg_idx)
container = av.open(path)
stream = container.streams.video[0]
# Single-threaded decode — see docstring.
stream.thread_count = 1
stream.thread_type = "NONE"
pts_table = self._ensure_pts_table(seg_idx)
cur_seg = seg_idx
decode_iter = None
last_pts = None
seg_frame_cache = {}
if local_idx in seg_frame_cache:
assert frames is not None
frames[sorted_i] = seg_frame_cache[local_idx]
continue
assert pts_table is not None
target_pts = int(pts_table[local_idx])
need_seek = decode_iter is None or last_pts is None or target_pts < last_pts
if need_seek:
assert container is not None and stream is not None
container.seek(
target_pts,
any_frame=False,
backward=True,
stream=stream,
)
decode_iter = container.decode(video=0)
last_pts = None
seg_frame_cache.clear()
if frames is None:
assert stream is not None
if self.resize is not None:
out_h, out_w = self.resize
else:
out_h = stream.codec_context.height
out_w = stream.codec_context.width
if self.channel_format == "NCHW":
frames = np.zeros((n_frames, n_channels, out_h, out_w), dtype="uint8")
else:
frames = np.zeros((n_frames, out_h, out_w, n_channels), dtype="uint8")
collected = None
while True:
try:
frame = next(decode_iter) # type: ignore[arg-type]
except StopIteration:
break
if frame.pts is None:
continue
last_pts = int(frame.pts)
if last_pts < target_pts:
continue
collected = frame
break
if collected is None:
logging.warning(
"LazyVideo: end of segment %d reached early at "
"frame %d/%d (target pts=%d); leaving remaining "
"frames zero-filled.",
seg_idx,
k,
n_frames,
target_pts,
)
break
if reformatter is None:
reformatter = av.video.reformatter.VideoReformatter()
out_frame = reformatter.reformat(
collected,
width=out_w,
height=out_h,
format=target_format,
)
arr = out_frame.to_ndarray()
if self.colorspace == "G":
if arr.ndim == 2:
arr = np.expand_dims(arr, axis=-1)
if self.channel_format == "NCHW":
arr = np.transpose(arr, (2, 0, 1))
seg_frame_cache[local_idx] = arr
assert frames is not None
frames[sorted_i] = arr
finally:
if container is not None:
container.close()
if frames is None:
return self._empty_frames_array()
return frames
# ---------------------------------------------------------------------------
# Backend factory. MP4 doesn't slot into ``_BackendOpeners`` (which has
# regular/irregular slots and no video slot); :meth:`DataInterface._materialize_modality`
# invokes this directly for ``StorageFormat.MP4_INDEX`` modalities, and
# :meth:`LazyVideo.from_uri` delegates to it for direct callers.
# ---------------------------------------------------------------------------
def _open_mp4_video_lazy(
uri: str,
store: ObjectStore,
metadata: ModalityRow | None,
) -> LazyVideo:
"""Backend factory: resolve the sibling Lance frame index + open lazily.
Construction is metadata-only **when the sibling Lance frame index
exists**: it touches only the frame-index Lance table (~footer +
manifest read). When the index is **missing** (``FileNotFoundError``
or Lance's "Dataset at path … was not found" ``ValueError``), the
constructor falls back to PyAV-demuxing each segment to recover
per-segment frame counts and PTS tables — that path DOES download
bytes and decode at construction time, and a ``WARNING`` is logged
so monitoring picks up the unexpected demux.
Any **other** exception (corrupted Lance footer, schema mismatch,
permission-denied, partial upload, etc.) is re-raised. The metadata-only
contract holds: callers see a clean error rather than silently
paying for a multi-GB demux on a corrupted asset.
The frame-index URI is built via
:func:`ursa.layout.lance_frame_index_uri` so the convention is owned
in one place — writers and readers agree on the location without
re-deriving it.
"""
fi_uri = lance_frame_index_uri(uri)
try:
frame_index: LanceFrameIndex | None = LanceFrameIndex.open(fi_uri, store)
except FileNotFoundError as exc:
logging.warning(
"LazyVideo: no Lance frame index at %s (%s); falling back to "
"PyAV demux on first slice.",
fi_uri,
exc,
)
frame_index = None
except ValueError as exc:
# Lance signals "dataset does not exist" as a ValueError with a
# recognizable message ("Dataset at path … was not found"). Other
# ValueErrors (schema mismatch, corrupted footer, etc.) are real
# problems that must surface to the caller.
if "was not found" in str(exc):
logging.warning(
"LazyVideo: no Lance frame index at %s (%s); falling back to "
"PyAV demux on first slice.",
fi_uri,
exc,
)
frame_index = None
else:
raise
if frame_index is not None:
# A multi-segment recording is ONE concatenated MP4 (virgo.io.mp4_writer
# PTS-offsets each source segment into the single output file). Repeat
# the same URI per logical segment so LazyVideo's per-segment_idx decode
# machinery resolves every segment back to the one physical file.
# mp4_segment_uris returns the extras-only (LazyVideo prepends `uri`),
# so no [1:] slice here. Derive n_segments AFTER the index is open; the
# same frame_index instance is passed through, so the second
# segment_frame_counts() call in LazyVideo.__init__ is a cache hit.
n_segments = int(len(frame_index.segment_frame_counts()))
# [] (single segment, n==1) -> None; a non-empty list passes through.
segment_uris = mp4_segment_uris(uri, n_segments) or None
else:
segment_uris = None
return LazyVideo(
uri,
store=store,
frame_index=frame_index,
metadata=metadata,
segment_uris=segment_uris,
)