Source code for ursa.backends.video

"""MP4-backed lazy video reader for ``StorageFormat.MP4_INDEX`` modalities.

Ported from ``neuro-galaxy/temporaldata@ian/lazy-everything/temporaldata/lazy_video.py``
(2026-05 snapshot) with four adaptations:

1. ``video_file`` paths → ``uri: str`` + :class:`ObjectStore`. Segments are
   downloaded to a process-global LRU cache keyed by ``(uri, etag)`` on first
   slice; PyAV decodes the local copy. Cache directory and quota are
   controlled by the module-level :data:`_DEFAULT_CACHE_DIR` and
   :data:`_DEFAULT_CACHE_QUOTA_GB` constants — tests and operators
   override by monkeypatching them and calling
   :func:`_reset_cache_for_tests`.
2. ``segment_frame_counts`` / ``segment_pts_indices`` → :class:`LanceFrameIndex`.
   The frame index is a sibling Lance table at
   :func:`ursa.layout.lance_frame_index_uri(uri)` with columns
   ``(segment_idx, local_frame_idx, pts, timestamp)``. The original PyAV
   demux fallback survives for callers that open an MP4 without an index
   (tests, ad-hoc inspection); it is a transitional path slated for removal
   once every processed recording carries an index.
3. ``to_hdf5`` / ``from_hdf5`` removed. Persistent state is the catalog
   :class:`ModalityRow` + storage URI; HDF5 would just duplicate it.
4. ``metadata: ModalityRow | None`` slot mirroring the rest of
   :mod:`ursa.temporal`. ``.slice()`` propagates the same instance to the
   returned ``IrregularTimeSeries`` (identity-preserving — pinned by test).

Fork-safety: ``stream.thread_count = 1`` + ``thread_type = "NONE"`` is kept
verbatim — FFmpeg's frame/slice worker threads deadlock in
``avcodec_free_context`` if the codec context was created in the parent and
released in a forked child (PyTorch ``DataLoader(num_workers>0)`` defaults to
fork on Linux). The constraint is read-side; the writer side imposes the
same setting on its own PyAV containers.

PyAV is an optional dep (``ursa[video]``); construction raises a clear
``ImportError`` with the install hint if missing.
"""

from __future__ import annotations

import hashlib
import logging
import os
import shutil
import tempfile
import threading
from pathlib import Path
from typing import TYPE_CHECKING, Any, Sequence
from urllib.parse import urlparse

import numpy as np
from temporaldata import IrregularTimeSeries as _IrregularTimeSeries

from ursa.catalog.schemas import ModalityRow
from ursa.layout import lance_frame_index_uri, mp4_segment_uris
from ursa.store.base import ObjectMeta, ObjectStore
from ursa.temporal import _check_metadata

if TYPE_CHECKING:
    import lance  # only for type hints; runtime import is lazy

__all__ = ["LazyVideo", "LanceFrameIndex"]


# ---------------------------------------------------------------------------
# PyAV lazy-import — keeps `import ursa.backends.video` cheap when PyAV isn't
# installed and lets the optional-extras model work cleanly.
# ---------------------------------------------------------------------------

_av: Any = None


def _av_module() -> Any:
    """Lazy-import PyAV once. Raises with the install hint if missing."""
    global _av
    if _av is None:
        try:
            import av as _av_mod
        except ImportError as exc:  # pragma: no cover - optional-extra hint
            raise ImportError(
                "PyAV is not installed. ``ursa.LazyVideo`` requires the "
                "``video`` extra: `uv add 'ursa[video]'`."
            ) from exc
        _av = _av_mod
    return _av


# ---------------------------------------------------------------------------
# Process-global LRU segment cache. Keyed by (uri, etag) so cache hits across
# processes are safe and overwriting an MP4 in R2 invalidates stale entries.
# ---------------------------------------------------------------------------


#: Directory under which :class:`LazyVideo` downloads MP4 segments on
#: first slice. Defaults to ``$TMPDIR/ursa-video-cache``. Lives outside
#: ``$HOME`` so a stale cache never lands in a user's home folder.
#: Tests / operators override by monkeypatching this constant and
#: calling :func:`_reset_cache_for_tests` so the singleton picks up the
#: new value.
_DEFAULT_CACHE_DIR: Path = Path(tempfile.gettempdir()) / "ursa-video-cache"

#: Byte quota for the segment cache, in gigabytes. LRU eviction
#: enforces the quota on every new download. Default 10 GB.
_DEFAULT_CACHE_QUOTA_GB: int = 10


class _VideoSegmentCache:
    """LRU cache of locally-materialized MP4 segments.

    Quota is :data:`_DEFAULT_CACHE_QUOTA_GB` × ``2**30`` bytes by default.
    Entries are downloaded on miss, touched on hit, and the oldest entries
    are removed when a new download would exceed the quota.

    Thread-safe via a single coarse lock — concurrent slices on the same
    :class:`LazyVideo` serialize their downloads. The cache is process-global
    so several :class:`LazyVideo` instances pointing at the same segment
    share one local file.
    """

    def __init__(self, root: Path, quota_bytes: int) -> None:
        self._root = root
        self._quota = quota_bytes
        self._lock = threading.Lock()
        self._root.mkdir(parents=True, exist_ok=True)

    def _entry_path(self, uri: str, etag: str) -> Path:
        # Hash the URI so the on-disk name is stable across machines and
        # short enough not to bump into filesystem name limits. Embed the
        # etag so a stale (uri, etag) can't be confused for the current one.
        digest = hashlib.sha256(uri.encode("utf-8")).hexdigest()[:16]
        safe_etag = etag.strip('"').replace("/", "_") or "noetag"
        return self._root / f"{digest}.{safe_etag}.mp4"

    def fetch(self, store: ObjectStore, uri: str) -> Path:
        """Return a local path to ``uri``'s bytes. Downloads on miss."""
        key = urlparse(uri).path.lstrip("/")
        meta: ObjectMeta = store.head(key)
        etag = meta.etag or "noetag"
        target = self._entry_path(uri, etag)
        with self._lock:
            if target.exists():
                # LRU touch via mtime update; the next eviction sweep sees
                # this entry as most-recently-used.
                os.utime(target, None)
                return target
            # Evict if needed before reserving space for the new entry.
            self._evict_until(meta.size)
            tmp = target.with_suffix(target.suffix + ".part")
            try:
                with store.open(key) as src, tmp.open("wb") as dst:
                    shutil.copyfileobj(src, dst)
                tmp.replace(target)
            finally:
                tmp.unlink(missing_ok=True)
            return target

    def _evict_until(self, headroom_bytes: int) -> None:
        """Drop oldest entries until ``headroom_bytes`` more fits in quota.

        Collects ``(size, mtime, path)`` up-front so the eviction loop never
        re-stats a file — guards against an external tmp-cleaner (macOS's
        launchd periodic cleaner, systemd-tmpfiles, a colleague's
        ``rm -rf /tmp/ursa-video-cache``) removing a cache entry between
        the glob and the loop iteration. The initial ``stat()`` is also
        guarded so a race-deleted entry mid-glob doesn't propagate
        ``FileNotFoundError`` out of ``LazyVideo.slice()``.
        """
        entries: list[tuple[int, float, Path]] = []
        for p in self._root.glob("*.mp4"):
            try:
                st = p.stat()
            except FileNotFoundError:
                continue
            if p.is_file():
                entries.append((st.st_size, st.st_mtime, p))
        entries.sort(key=lambda t: t[1])
        used = sum(size for size, _, _ in entries)
        while entries and used + headroom_bytes > self._quota:
            size, _, victim = entries.pop(0)
            used -= size
            victim.unlink(missing_ok=True)

    def clear(self) -> None:
        """Drop every cached entry. Test helper."""
        with self._lock:
            for p in self._root.glob("*.mp4"):
                p.unlink(missing_ok=True)


_cache_singleton: _VideoSegmentCache | None = None
_cache_singleton_lock = threading.Lock()


def _get_cache() -> _VideoSegmentCache:
    """Return the process-global segment cache, building it on first use.

    Constructs the cache from the module-level :data:`_DEFAULT_CACHE_DIR`
    and :data:`_DEFAULT_CACHE_QUOTA_GB` on the first call only; the
    singleton then stays put for the lifetime of the process. Tests
    that monkeypatch those constants must call
    :func:`_reset_cache_for_tests` afterwards to discard the prior
    singleton — otherwise the new values are ignored.
    """
    global _cache_singleton
    with _cache_singleton_lock:
        if _cache_singleton is None:
            _cache_singleton = _VideoSegmentCache(
                root=_DEFAULT_CACHE_DIR,
                quota_bytes=_DEFAULT_CACHE_QUOTA_GB * (2**30),
            )
        return _cache_singleton


def _reset_cache_for_tests() -> None:
    """Drop the cache singleton so tests that monkeypatch config re-init."""
    global _cache_singleton
    with _cache_singleton_lock:
        if _cache_singleton is not None:
            _cache_singleton.clear()
        _cache_singleton = None


# ---------------------------------------------------------------------------
# Lance frame index — sidecar table written by the upstream MP4 writer.
# ---------------------------------------------------------------------------


[docs] class LanceFrameIndex: """Sibling Lance table at ``<mp4_uri>.lance`` describing one MP4's frames. Schema (the writer side emits this; this module only reads): .. code-block:: text segment_idx: int64 # which underlying segment file local_frame_idx: int64 # 0..segment_frame_count-1 pts: int64 # PyAV PTS (codec-time-base units) timestamp: float64 # recording-relative seconds Sorted by ``(segment_idx, local_frame_idx)`` (i.e. presentation order within each segment). Construction is metadata-only; per-segment PTS arrays are loaded lazily. Rebuildable from the source MP4 via :func:`_probe_segment` as a fallback. """ def __init__(self, dataset: "lance.LanceDataset", uri: str) -> None: self._ds = dataset self._uri = uri self._pts_cache: dict[int, np.ndarray] = {} # Lazy — populated on first call. self._segment_frame_counts_cache: np.ndarray | None = None self._timestamps_cache: np.ndarray | None = None
[docs] @classmethod def open(cls, uri: str, store: ObjectStore) -> "LanceFrameIndex": """Open the frame-index dataset. Lance pulls only metadata + footer.""" import lance url, storage_options = store.lance_connection() # Strip the URI's scheme/bucket and use the lance_connection's base # URL + the key. lance_connection returns a (base_url, options) pair # already pointing at the right bucket; we just append the key. key = urlparse(uri).path.lstrip("/") full_url = f"{url.rstrip('/')}/{key}" ds = lance.dataset(full_url, storage_options=storage_options) return cls(ds, uri)
[docs] def segment_frame_counts(self) -> np.ndarray: """Frames per segment as an ``int64`` ndarray of length ``n_segments``.""" if self._segment_frame_counts_cache is None: tbl = self._ds.to_table(columns=["segment_idx"]).to_pandas() # Histogram-style count per segment_idx. if len(tbl) == 0: self._segment_frame_counts_cache = np.zeros((0,), dtype=np.int64) else: max_seg = int(tbl["segment_idx"].max()) counts = np.zeros(max_seg + 1, dtype=np.int64) vc = tbl["segment_idx"].value_counts() for idx, count in vc.items(): counts[int(idx)] = int(count) self._segment_frame_counts_cache = counts return self._segment_frame_counts_cache
[docs] def pts_table(self, segment_idx: int) -> np.ndarray: """Return the PTS array for ``segment_idx`` in presentation order.""" cached = self._pts_cache.get(segment_idx) if cached is not None: return cached tbl = self._ds.to_table( columns=["local_frame_idx", "pts"], filter=f"segment_idx = {segment_idx}", ).to_pandas() if len(tbl) == 0: raise ValueError( f"LanceFrameIndex at {self._uri!r} has no rows for segment_idx={segment_idx}" ) tbl = tbl.sort_values("local_frame_idx") arr: np.ndarray = tbl["pts"].to_numpy(dtype=np.int64) self._pts_cache[segment_idx] = arr return arr
[docs] def timestamps(self) -> np.ndarray: """Flat presentation-ordered timestamps across all segments.""" if self._timestamps_cache is None: tbl = self._ds.to_table( columns=["segment_idx", "local_frame_idx", "timestamp"], ).to_pandas() tbl = tbl.sort_values(["segment_idx", "local_frame_idx"]) self._timestamps_cache = tbl["timestamp"].to_numpy(dtype=np.float64) return self._timestamps_cache
# --------------------------------------------------------------------------- # PyAV-based fallbacks (used when no Lance frame index exists — tests, # ad-hoc inspection, or any MP4 opened without a sidecar). # --------------------------------------------------------------------------- def _probe_segment_demux(path: str) -> np.ndarray: """Walk ``container.demux()``; collect packet PTS sorted ascending. Order-of-magnitude faster than the decode probe — skips IDCT, motion comp, swscale. Assumes ``sorted(packet.pts) == decode_order_pts`` which holds for any standard GOP-structured H.264/H.265 stream. The caller falls back to :func:`_probe_segment_decode` on exception. """ av = _av_module() container = av.open(path) try: stream = container.streams.video[0] ptses: list[int] = [] for packet in container.demux(stream): if packet.pts is not None: ptses.append(int(packet.pts)) ptses.sort() return np.asarray(ptses, dtype=np.int64) finally: container.close() def _probe_segment_decode(path: str) -> np.ndarray: """Slow path: walk ``container.decode()`` for presentation-order PTS.""" av = _av_module() container = av.open(path) try: stream = container.streams.video[0] ptses: list[int] = [] for frame in container.decode(stream): if frame.pts is not None: ptses.append(int(frame.pts)) return np.asarray(ptses, dtype=np.int64) finally: container.close() def _probe_segment(path: str) -> tuple[int, np.ndarray]: """Return ``(frame_count, pts_array)`` for ``path``.""" try: ptses_arr = _probe_segment_demux(path) if ptses_arr.size > 0: return len(ptses_arr), ptses_arr except Exception as ex: # pragma: no cover - exercised on malformed files logging.warning( "LazyVideo: demux probe failed for %s (%s); falling back to decode probe.", path, ex, ) ptses_arr = _probe_segment_decode(path) return len(ptses_arr), ptses_arr # --------------------------------------------------------------------------- # The class. # ---------------------------------------------------------------------------
[docs] class LazyVideo: r"""Lazy-decoded video for ``StorageFormat.MP4_INDEX`` modalities. Construction is metadata-only **when a** :class:`LanceFrameIndex` **is provided**: no MP4 GET, no frame decode happens until the first :meth:`slice` call, which downloads the required segment(s) into the process-global LRU cache and decodes the requested frames via PyAV. When ``frame_index=None`` the constructor falls back to PyAV-demuxing every segment up-front to recover frame counts + PTS tables; this path DOES download bytes and probe PTS at construction time. Use the Lance-frame-index path (the default through :meth:`from_uri`) for the metadata-only contract. Seeks are keyframe-aligned (``container.seek(pts, any_frame=False, backward=True)``) and the decoder is then advanced frame-by-frame to the target PTS, so frames returned mid-GOP are bit-correct (no ``mmco: unref short failure`` corruption). Picklable: no PyAV container/stream/reformatter handles are stored on ``self`` (they live as locals inside :meth:`_load_frames`), so a ``LazyVideo`` survives the ``pickle.dumps`` that DataLoader workers perform across the fork boundary. Args: uri: ``r2://`` URI of the MP4 (or the first segment if multi-segment). store: :class:`ObjectStore` to read from — typically :attr:`DataInterface.assets_ro_store`. frame_index: :class:`LanceFrameIndex` for the sibling sidecar, or ``None`` to PyAV-demux the segments lazily (test-only fallback; see module docstring). metadata: optional :class:`ModalityRow` to carry through to :meth:`slice` results. segment_uris: additional URIs for multi-segment videos. If absent, ``uri`` is the sole segment. resize: ``(height, width)`` to resize frames to, or ``None`` for original dimensions. colorspace: ``"RGB"`` or ``"G"``. channel_format: ``"NCHW"`` or ``"NHWC"``. """ def __init__( self, uri: str, *, store: ObjectStore, frame_index: LanceFrameIndex | None = None, metadata: ModalityRow | None = None, segment_uris: Sequence[str] | None = None, resize: tuple[int, int] | None = None, colorspace: str = "RGB", channel_format: str = "NCHW", ) -> None: # Validate PyAV is importable but do not open any container here — # picklability of `self` depends on no `av.Container` ever landing on # the instance. _av_module() _check_metadata(metadata, ModalityRow, "LazyVideo") if resize is not None and (not isinstance(resize, tuple) or len(resize) != 2): raise ValueError('"resize" must be None or a (height, width) tuple') if colorspace not in ("RGB", "G"): raise ValueError('"colorspace" must be "RGB" or "G"') if channel_format not in ("NCHW", "NHWC"): raise ValueError('"channel_format" must be "NCHW" or "NHWC"') all_uris: list[str] = [uri] if segment_uris: all_uris.extend(str(s) for s in segment_uris) self._store = store self._segment_uris: list[str] = all_uris self.uri = uri # primary URI (used by repr / metadata-style display) self.resize = resize self.colorspace = colorspace self.channel_format = channel_format self._metadata = metadata self._frame_index = frame_index # Per-segment PTS cache (populated on first decode of a segment). # When `frame_index` is set, PTS arrays come from Lance; otherwise # PyAV demuxes the local file on first need. self._pts_cache: list[np.ndarray | None] = [None] * len(all_uris) if frame_index is not None: self.segment_frame_counts = np.asarray( frame_index.segment_frame_counts(), dtype=np.int64 ) # ``copy=True`` is load-bearing: ``LanceFrameIndex.timestamps()`` # caches the underlying ndarray; the non-monotonic re-sort below # would otherwise permute that cache and corrupt subsequent # ``LazyVideo`` instances built on the same index. self.timestamps = frame_index.timestamps().astype(np.float64, copy=True) else: # Fallback: pull each segment locally and demux it. counts: list[int] = [] pts: list[np.ndarray] = [] for u in all_uris: path = _get_cache().fetch(store, u) n, p = _probe_segment(str(path)) counts.append(n) pts.append(p) self.segment_frame_counts = np.asarray(counts, dtype=np.int64) self._pts_cache = list(pts) # No real timestamps known without a frame index — synthesize # uniform 0..N indices in seconds-of-presentation order. n_total = int(self.segment_frame_counts.sum()) self.timestamps = np.arange(n_total, dtype=np.float64) if len(self.segment_frame_counts) != len(all_uris): raise ValueError( f"frame_index reports {len(self.segment_frame_counts)} segment(s) " f"but {len(all_uris)} URI(s) were provided" ) # Running prefix sum of segment frame counts for fast segment lookup. self.segment_frame_offsets = np.concatenate( ([0], np.cumsum(self.segment_frame_counts[:-1])) ).astype(np.int64) frame_count = int(self.segment_frame_counts.sum()) ts_len = int(self.timestamps.shape[0]) if frame_count != ts_len: if frame_count > ts_len: # Tolerate a small trailing overshoot (mirrors Ian's # original behavior — codec sometimes emits N+1 frames where # the timestamp series tops out at N). frame_count = ts_len else: raise ValueError( f"Frame count mismatch: counts sum to {frame_count} " f"but timestamps have {ts_len} entries" ) self.frame_count = frame_count self.frame_indices = np.arange(frame_count, dtype=np.int64) # Defensive: re-sort if the frame_index returned non-monotonic # timestamps (shouldn't happen for well-formed Lance indices). if frame_count > 1 and np.any(np.diff(self.timestamps[:frame_count]) < 0): sort_idx = np.argsort(self.timestamps[:frame_count]) self.timestamps[:frame_count] = self.timestamps[:frame_count][sort_idx] self.frame_indices = sort_idx.astype(np.int64) logging.info("LazyVideo: re-sorted %d non-monotonic timestamps", frame_count) # --------------------------------------------------------------------- # Properties # --------------------------------------------------------------------- @property def metadata(self) -> ModalityRow | None: return self._metadata
[docs] def __len__(self) -> int: return self.frame_count
[docs] def __repr__(self) -> str: return ( f"{type(self).__name__}(" f"uri={self.uri!r}, " f"frames={self.frame_count}, " f"segments={len(self._segment_uris)})" )
# --------------------------------------------------------------------- # Concat — single-recording only. # ---------------------------------------------------------------------
[docs] @classmethod def from_uri( cls, uri: str, *, store: ObjectStore, metadata: ModalityRow | None = None, ) -> "LazyVideo": """Open *uri* as a lazy MP4 video. Mirrors :meth:`ursa.RegularTimeSeries.from_uri` / :meth:`ursa.LazyIrregularTimeSeries.from_uri` so callers can use a uniform construction surface across backends. Resolves the sibling Lance frame index via :func:`ursa.layout.lance_frame_index_uri`; on a missing sidecar, falls back to demuxing the MP4 itself. Video is intentionally lazy-only. An eager equivalent would decode every frame at construction, which defeats the point of ``MP4_INDEX`` for any realistic recording. ``DataInterface.materialize(..., lazy=False)`` therefore still returns a ``LazyVideo`` for video subfields. Video isn't dispatched through :class:`_BackendOpeners` because the regular/irregular split doesn't fit per-frame video access. :class:`ursa.DataInterface` invokes this classmethod directly when ``ModalityRow.format == StorageFormat.MP4_INDEX``. """ return _open_mp4_video_lazy(uri, store, metadata)
[docs] @classmethod def concat(cls, videos: Sequence["LazyVideo"]) -> "LazyVideo": """Concatenate segments of one logical video. Single-recording only — cross-recording concat is not supported because aligned time domains across recordings are only established by the per-recording processed path. Multi-video concat with frame indexes is also rejected: the receiver would have to merge ``LanceFrameIndex`` instances from each input, which the writer-side helper has not been built yet. Calling ``concat([a])`` (single-video — i.e. metadata-only re-wrap) stays supported. """ if len(videos) == 0: raise ValueError("LazyVideo.concat requires at least one video") first = videos[0] first_hash = first._metadata.recording_hash if first._metadata else None for v in videos[1:]: if not isinstance(v, cls): raise TypeError("LazyVideo.concat: all items must be LazyVideo") v_hash = v._metadata.recording_hash if v._metadata else None if first_hash is not None and v_hash is not None and first_hash != v_hash: raise ValueError( "LazyVideo.concat is single-recording only; cross-recording " "concat requires aligned time domains, which are only " "established by the per-recording processed path." ) if v.resize != first.resize: raise ValueError("LazyVideo.concat: resize must match across videos") if v.colorspace != first.colorspace: raise ValueError("LazyVideo.concat: colorspace must match") if v.channel_format != first.channel_format: raise ValueError("LazyVideo.concat: channel_format must match") # Multi-video concat with any frame index is unsafe: the first # video's index would silently be used to derive frame offsets # across every segment, which is wrong. Reject explicitly until # the writer ships a LanceFrameIndex-merge helper. if len(videos) > 1 and any(v._frame_index is not None for v in videos): raise NotImplementedError( "LazyVideo.concat with multiple videos and a Lance frame index " "is not supported: merging per-video frame-index tables into a " "single contiguous index is not implemented. Either materialize " "each video's frames separately, or drop the frame indexes " "(pass frame_index=None) so concat falls back to the demux path." ) all_uris = [u for v in videos for u in v._segment_uris] return cls( uri=all_uris[0], store=first._store, frame_index=first._frame_index, metadata=first._metadata, segment_uris=all_uris[1:] if len(all_uris) > 1 else None, resize=first.resize, colorspace=first.colorspace, channel_format=first.channel_format, )
# --------------------------------------------------------------------- # Lazy slice — returns a windowed LazyVideo with no decode. # ---------------------------------------------------------------------
[docs] def lazy_slice(self, start: float, end: float) -> "LazyVideo": """Return a new :class:`LazyVideo` windowed to ``[start, end)`` with **no PyAV decode** at call time. The returned object is still a :class:`LazyVideo` carrying a recorded frame-range window; frames decode only on the next :meth:`slice` call, and only for the windowed range. ``_apply_time_window`` calls this form so that ``stream(time_range=…)`` never triggers a segment download or decode. Implementation: ``np.searchsorted`` on the in-memory timestamp array to find ``idx_l:idx_r``, then ``object.__new__`` + attribute shallow- copy with the windowed ``timestamps``, ``frame_indices``, and ``frame_count`` substituted. All per-segment metadata (``segment_frame_counts``, ``segment_frame_offsets``, ``_pts_cache``, etc.) is shared by reference — these use **global** frame indices, so they remain correct for the windowed object's :meth:`_segment_for_frame` lookups. ``reset_origin`` is always ``False``: the returned series keeps original recording-relative coordinates. If ``start >= end`` or the window covers no frames, an empty :class:`LazyVideo` (``frame_count=0``) is returned rather than raising. """ ts = self.timestamps[: self.frame_count] idx_l = int(np.searchsorted(ts, start, side="left")) idx_r = int(np.searchsorted(ts, end, side="left")) # Clamp to valid range. idx_l = max(0, min(idx_l, self.frame_count)) idx_r = max(idx_l, min(idx_r, self.frame_count)) # Build the windowed copy without calling __init__ (which would # re-open the Lance frame index or re-demux the MP4 — both are # network I/O that must not happen here). windowed: LazyVideo = object.__new__(LazyVideo) # Shared state — safe because all these are read-only after __init__. windowed._store = self._store windowed._segment_uris = self._segment_uris windowed.uri = self.uri windowed.resize = self.resize windowed.colorspace = self.colorspace windowed.channel_format = self.channel_format windowed._metadata = self._metadata windowed._frame_index = self._frame_index windowed._pts_cache = self._pts_cache # shared — populated lazily windowed.segment_frame_counts = self.segment_frame_counts windowed.segment_frame_offsets = self.segment_frame_offsets # Windowed state — new views into the parent's arrays. windowed.timestamps = ts[idx_l:idx_r].copy() windowed.frame_indices = self.frame_indices[idx_l:idx_r].copy() windowed.frame_count = idx_r - idx_l return windowed
# --------------------------------------------------------------------- # Slice — the main user-facing path (eager decode). # ---------------------------------------------------------------------
[docs] def slice(self, start: float, end: float, reset_origin: bool = True) -> _IrregularTimeSeries: r"""Return an :class:`IrregularTimeSeries` of decoded frames in ``[start, end)`` (end-exclusive). ``reset_origin=True`` (default) shifts the returned ``timestamps`` to be relative to ``start``; ``False`` keeps absolute camera time. """ # Build a one-shot IrregularTimeSeries over (timestamps, frame_indices) # so we can leverage upstream's slice() for the time→index filter # in one place rather than re-deriving it here. scratch = _IrregularTimeSeries( timestamps=np.asarray(self.timestamps[: self.frame_count], dtype=np.float64), frame_indices=self.frame_indices, domain="auto", ) sliced = scratch.slice(start=start, end=end, reset_origin=reset_origin) frames = self._load_frames(sliced.frame_indices) # Attach the decoded frames + propagate metadata IDENTITY (High-5). sliced.frames = frames sliced._metadata = self._metadata return sliced
# --------------------------------------------------------------------- # Frame decode — verbatim from Ian's port with the path resolution # routed through the cache. # ---------------------------------------------------------------------
[docs] def _segment_for_frame(self, frame_index: int) -> tuple[int, int]: segment_idx = int( np.searchsorted(self.segment_frame_offsets, frame_index, side="right") - 1 ) segment_start = int(self.segment_frame_offsets[segment_idx]) local_index = int(frame_index - segment_start) return segment_idx, local_index
[docs] def _ensure_pts_table(self, segment_idx: int) -> np.ndarray: cached = self._pts_cache[segment_idx] if cached is not None: return cached if self._frame_index is not None: arr = self._frame_index.pts_table(segment_idx) else: path = _get_cache().fetch(self._store, self._segment_uris[segment_idx]) _, arr = _probe_segment(str(path)) expected = int(self.segment_frame_counts[segment_idx]) if len(arr) != expected: if len(arr) > expected: arr = arr[:expected] else: raise ValueError( f"Segment {segment_idx} reports {expected} frames but " f"PyAV demuxed {len(arr)} PTS entries" ) self._pts_cache[segment_idx] = arr return arr
[docs] def _empty_frames_array(self) -> np.ndarray: n_channels = 3 if self.colorspace == "RGB" else 1 if self.channel_format == "NCHW": return np.zeros((0, n_channels, 1, 1), dtype="uint8") return np.zeros((0, 1, 1, n_channels), dtype="uint8")
[docs] def _resolve_segment_path(self, segment_idx: int) -> str: """Cache-resolve a segment URI to a local-fs path for PyAV.""" local = _get_cache().fetch(self._store, self._segment_uris[segment_idx]) return str(local)
[docs] def _load_frames(self, frame_indices: np.ndarray) -> np.ndarray: """Decode the requested presentation-ordered frames. Implementation notes (verbatim from Ian's port — keep these): * Indices are sorted by ``(segment, local_index)`` so each segment is walked forward in presentation order; the only seeks are at segment boundaries (or when the caller passes a non-monotonic sequence). * Single-threaded decode (``stream.thread_count = 1``, ``stream.thread_type = "NONE"``) avoids the libavcodec frame/slice thread + ``os.fork()`` deadlock in ``avcodec_free_context``. This shows up in any consumer that loads ``LazyVideo`` from a forked child (PyTorch ``DataLoader(num_workers>0)`` defaults to fork on Linux). * A single ``av.video.reformatter.VideoReformatter`` does colorspace + resize via libswscale. """ av = _av_module() n_frames = len(frame_indices) if n_frames == 0: return self._empty_frames_array() n_channels = 3 if self.colorspace == "RGB" else 1 target_format = "rgb24" if self.colorspace == "RGB" else "gray8" seg_arr = np.empty(n_frames, dtype=np.int64) local_arr = np.empty(n_frames, dtype=np.int64) for i, fidx in enumerate(frame_indices): s, loc = self._segment_for_frame(int(fidx)) seg_arr[i] = s local_arr[i] = loc order = np.lexsort((local_arr, seg_arr)) frames: np.ndarray | None = None out_h: int | None = None out_w: int | None = None container = None stream = None decode_iter = None last_pts: int | None = None cur_seg = -1 pts_table: np.ndarray | None = None reformatter = None seg_frame_cache: dict[int, np.ndarray] = {} try: for k, sorted_i in enumerate(order): sorted_i = int(sorted_i) seg_idx = int(seg_arr[sorted_i]) local_idx = int(local_arr[sorted_i]) if seg_idx != cur_seg: if container is not None: container.close() path = self._resolve_segment_path(seg_idx) container = av.open(path) stream = container.streams.video[0] # Single-threaded decode — see docstring. stream.thread_count = 1 stream.thread_type = "NONE" pts_table = self._ensure_pts_table(seg_idx) cur_seg = seg_idx decode_iter = None last_pts = None seg_frame_cache = {} if local_idx in seg_frame_cache: assert frames is not None frames[sorted_i] = seg_frame_cache[local_idx] continue assert pts_table is not None target_pts = int(pts_table[local_idx]) need_seek = decode_iter is None or last_pts is None or target_pts < last_pts if need_seek: assert container is not None and stream is not None container.seek( target_pts, any_frame=False, backward=True, stream=stream, ) decode_iter = container.decode(video=0) last_pts = None seg_frame_cache.clear() if frames is None: assert stream is not None if self.resize is not None: out_h, out_w = self.resize else: out_h = stream.codec_context.height out_w = stream.codec_context.width if self.channel_format == "NCHW": frames = np.zeros((n_frames, n_channels, out_h, out_w), dtype="uint8") else: frames = np.zeros((n_frames, out_h, out_w, n_channels), dtype="uint8") collected = None while True: try: frame = next(decode_iter) # type: ignore[arg-type] except StopIteration: break if frame.pts is None: continue last_pts = int(frame.pts) if last_pts < target_pts: continue collected = frame break if collected is None: logging.warning( "LazyVideo: end of segment %d reached early at " "frame %d/%d (target pts=%d); leaving remaining " "frames zero-filled.", seg_idx, k, n_frames, target_pts, ) break if reformatter is None: reformatter = av.video.reformatter.VideoReformatter() out_frame = reformatter.reformat( collected, width=out_w, height=out_h, format=target_format, ) arr = out_frame.to_ndarray() if self.colorspace == "G": if arr.ndim == 2: arr = np.expand_dims(arr, axis=-1) if self.channel_format == "NCHW": arr = np.transpose(arr, (2, 0, 1)) seg_frame_cache[local_idx] = arr assert frames is not None frames[sorted_i] = arr finally: if container is not None: container.close() if frames is None: return self._empty_frames_array() return frames
# --------------------------------------------------------------------------- # Backend factory. MP4 doesn't slot into ``_BackendOpeners`` (which has # regular/irregular slots and no video slot); :meth:`DataInterface._materialize_modality` # invokes this directly for ``StorageFormat.MP4_INDEX`` modalities, and # :meth:`LazyVideo.from_uri` delegates to it for direct callers. # --------------------------------------------------------------------------- def _open_mp4_video_lazy( uri: str, store: ObjectStore, metadata: ModalityRow | None, ) -> LazyVideo: """Backend factory: resolve the sibling Lance frame index + open lazily. Construction is metadata-only **when the sibling Lance frame index exists**: it touches only the frame-index Lance table (~footer + manifest read). When the index is **missing** (``FileNotFoundError`` or Lance's "Dataset at path … was not found" ``ValueError``), the constructor falls back to PyAV-demuxing each segment to recover per-segment frame counts and PTS tables — that path DOES download bytes and decode at construction time, and a ``WARNING`` is logged so monitoring picks up the unexpected demux. Any **other** exception (corrupted Lance footer, schema mismatch, permission-denied, partial upload, etc.) is re-raised. The metadata-only contract holds: callers see a clean error rather than silently paying for a multi-GB demux on a corrupted asset. The frame-index URI is built via :func:`ursa.layout.lance_frame_index_uri` so the convention is owned in one place — writers and readers agree on the location without re-deriving it. """ fi_uri = lance_frame_index_uri(uri) try: frame_index: LanceFrameIndex | None = LanceFrameIndex.open(fi_uri, store) except FileNotFoundError as exc: logging.warning( "LazyVideo: no Lance frame index at %s (%s); falling back to " "PyAV demux on first slice.", fi_uri, exc, ) frame_index = None except ValueError as exc: # Lance signals "dataset does not exist" as a ValueError with a # recognizable message ("Dataset at path … was not found"). Other # ValueErrors (schema mismatch, corrupted footer, etc.) are real # problems that must surface to the caller. if "was not found" in str(exc): logging.warning( "LazyVideo: no Lance frame index at %s (%s); falling back to " "PyAV demux on first slice.", fi_uri, exc, ) frame_index = None else: raise if frame_index is not None: # A multi-segment recording is ONE concatenated MP4 (virgo.io.mp4_writer # PTS-offsets each source segment into the single output file). Repeat # the same URI per logical segment so LazyVideo's per-segment_idx decode # machinery resolves every segment back to the one physical file. # mp4_segment_uris returns the extras-only (LazyVideo prepends `uri`), # so no [1:] slice here. Derive n_segments AFTER the index is open; the # same frame_index instance is passed through, so the second # segment_frame_counts() call in LazyVideo.__init__ is a cache hit. n_segments = int(len(frame_index.segment_frame_counts())) # [] (single segment, n==1) -> None; a non-empty list passes through. segment_uris = mp4_segment_uris(uri, n_segments) or None else: segment_uris = None return LazyVideo( uri, store=store, frame_index=frame_index, metadata=metadata, segment_uris=segment_uris, )