Source code for ursa.download

"""Stream raw-modality bytes from object storage to local disk (ENG-1091).

Third verb in the M2 three-verb read API (``query`` → ``get`` →
``download``): ``query()`` selects, ``get()`` (ENG-890) materializes
bytes into memory, and ``download()`` writes those same bytes to disk
**without parsing**. ``download`` and ``get`` are symmetric — both
consume a :class:`QueryResult` (or iterable of them), both resolve URIs
via :func:`ursa.store.parse_storage_uri`, and both are M2-gated to
``ingestion_status="raw"``.

Phase 1a (M2) — raw path only
-----------------------------

For each modality with ``ingestion_status="raw"``, list the segment
files under ``ModalityRow.raw_storage_uri`` (a prefix), stream each
segment's bytes to disk under a per-modality directory, and return the
list of files written. ``ObjectStore.open()`` is used (not ``get()``) so
multi-GB objects (``RAW_VIDEO``, ``RAW_AUDIO``) never sit in memory.

The ticket's ``{modality}.{ext}`` form presupposes single-file canonical
formats — that's Phase 1b. Raw modalities are multi-segment prefixes
(N segment files under a directory), so M2 writes a directory tree per
modality and the ``layout`` selector controls only the parent path. The
ticket annotation is captured in the PR body.

Phase 1b — processed path
-------------------------

Deferred to ENG-1093. ``download`` currently raises
:class:`NotImplementedError` for any modality with
``ingestion_status="processed"``; the processed-path read (using
``storage_uri`` and canonical Zarr/Lance/MP4_INDEX/Parquet tiers) lands
with the rest of ENG-1093.
"""

from __future__ import annotations

import os
from collections.abc import Iterable
from pathlib import Path
from typing import Literal, NamedTuple

from ursa.catalog.schemas import IngestionStatus, ModalityRow
from ursa.query import QueryResult
from ursa.store import ObjectStore, get_store, parse_storage_uri

__all__ = ["download"]


# Forward-reference markers used in error messages so the next-step
# ticket numbers live in one place.
_PROCESSED_PATH_TICKET = "ENG-1093"

_LayoutMode = Literal["by_recording", "by_modality", "flat"]
_VALID_LAYOUTS: frozenset[str] = frozenset({"by_recording", "by_modality", "flat"})

# Chunk size for the streaming copy. 1 MiB balances syscall overhead
# against memory pressure; even a 10 GB video walks 10240 chunks.
_STREAM_CHUNK = 1 << 20


[docs] class _PlannedWrite(NamedTuple): """One (store, source key, destination path) tuple produced by the plan phase and consumed by the execute phase. Holds the :class:`ObjectMeta` too so the execute phase can carry size info for diagnostics without re-listing the prefix. """ store: ObjectStore source_key: str dest_path: Path meta_size: int
# --------------------------------------------------------------------------- # Public surface # ---------------------------------------------------------------------------
[docs] def download( target: QueryResult | Iterable[QueryResult], dest: str | os.PathLike[str], *, layout: _LayoutMode = "by_recording", overwrite: bool = False, ) -> list[Path]: """Stream raw-modality bytes for ``target`` to disk under ``dest``. Parameters ---------- target A single :class:`QueryResult` or any iterable of them. Single-vs-iterable disambiguation is by type (``isinstance(target, QueryResult)``), not by length — a one-element iterable still yields the flat-list contract. dest Destination directory. Created if missing. Coerced to :class:`pathlib.Path`. layout Per-segment dest path scheme: * ``"by_recording"`` (default) → ``dest/{recording_hash}/{modality}/{relative_segment_key}`` * ``"by_modality"`` → ``dest/{modality}/{recording_hash}/{relative_segment_key}`` * ``"flat"`` → ``dest/{recording_hash}__{modality}/{relative_segment_key}`` overwrite ``False`` (default) raises :class:`FileExistsError` if any destination file already exists, before any I/O. ``True`` lets the temp-file rename in :func:`_stream_to_disk` atomically replace each existing target — no explicit unlink, so a mid-stream crash leaves the prior file intact at the canonical path. Returns ------- list[Path] Files written, **always a flat list** even for a single :class:`QueryResult` input. Ordering is deterministic: 1. input-recording order (``target`` iteration order), 2. then ``qr.modalities.items()`` insertion order, 3. then lexicographic order on ``ObjectMeta.key`` within a modality. Raises ------ NotImplementedError If any matched modality has ``ingestion_status="processed"`` (deferred to ENG-1093 / Phase 1b). FileNotFoundError If a registered modality's ``raw_storage_uri`` lists zero objects — a catalog/upload bug rather than a silent no-op. FileExistsError If ``overwrite=False`` and one or more destination files already exist. Every collision is collected and surfaced in the error message (truncated past 20 entries). ValueError If ``layout`` is not one of the three accepted modes, if the plan produces two writes for the same destination path (defense-in-depth across all layouts), or if ``layout="flat"`` would join an unsafe ``recording_hash`` or modality name (one containing ``"__"``). Notes ----- Streaming uses :meth:`ObjectStore.open` (forward-only) — never :meth:`ObjectStore.get` — so multi-GB raw video/audio is never fully materialized in memory. A crash or signal mid-stream leaves a ``.part`` file that is unlinked by the except handler; no half-written file is left at the canonical destination. The single-file ``{modality}.{ext}`` form implied by the ticket signature lands with the Phase 1b processed-path work (ENG-1093); M2 writes a directory tree per modality regardless of layout. """ if layout not in _VALID_LAYOUTS: raise ValueError(f"layout must be one of {sorted(_VALID_LAYOUTS)!r}; got {layout!r}") dest_root = Path(dest) qrs: list[QueryResult] = [target] if isinstance(target, QueryResult) else list(target) planned = _plan_writes(qrs, dest_root, layout=layout, overwrite=overwrite) written: list[Path] = [] for pw in planned: # No pre-unlink even when overwrite=True — _stream_to_disk # writes to a .part sibling and uses os.replace() to swap into # the canonical destination, which is atomic on POSIX *and* # overwrites the existing file. Unlinking first would create # a window where a mid-stream crash leaves the user with # nothing instead of the old bytes. _stream_to_disk(pw.store, pw.source_key, pw.dest_path) written.append(pw.dest_path) return written
# --------------------------------------------------------------------------- # Plan phase # ---------------------------------------------------------------------------
[docs] def _plan_writes( qrs: list[QueryResult], dest_root: Path, *, layout: _LayoutMode, overwrite: bool, ) -> list[_PlannedWrite]: """Enumerate every write before any I/O. One pass over the inputs produces: * the full ordered list of :class:`_PlannedWrite` tuples (consumed by the execute phase — no re-listing of R2 prefixes), * an intra-call duplicate-destination check that catches collisions across all layouts (cheap dict lookup; future-proofs against new layout modes), * a pre-existing-destination check that batches every collision into one :class:`FileExistsError` when ``overwrite=False``. """ planned: list[_PlannedWrite] = [] seen_dests: dict[Path, _PlannedWrite] = {} for qr in qrs: recording_hash = qr.recording_hash for modality_name, mrow in qr.modalities.items(): _check_modality_eligibility(modality_name, mrow, layout=layout) role, key_prefix = parse_storage_uri(mrow.raw_storage_uri) store: ObjectStore = get_store(role) metas = sorted(store.list(prefix=key_prefix), key=lambda m: m.key) if not metas: raise FileNotFoundError( f"no objects at {mrow.raw_storage_uri!r} for modality " f"{modality_name!r} on recording {recording_hash!r}; " "registered modality with zero segments is a catalog " "or upload bug." ) for meta in metas: rel_key = meta.key.removeprefix(key_prefix).lstrip("/") dest_path = _dest_path( dest_root, layout=layout, recording_hash=recording_hash, modality=modality_name, rel_key=rel_key, ) if dest_path in seen_dests: prior = seen_dests[dest_path] raise ValueError( f"layout={layout!r} produces duplicate destination " f"{dest_path}: would be written by both " f"{prior.source_key!r} and {meta.key!r}" ) pw = _PlannedWrite( store=store, source_key=meta.key, dest_path=dest_path, meta_size=meta.size, ) seen_dests[dest_path] = pw planned.append(pw) if not overwrite: collisions = [pw.dest_path for pw in planned if pw.dest_path.exists()] if collisions: shown = ", ".join(str(p) for p in collisions[:20]) more = f" …and {len(collisions) - 20} more" if len(collisions) > 20 else "" raise FileExistsError( f"refusing to overwrite {len(collisions)} existing file(s): " f"[{shown}{more}]; pass overwrite=True to replace." ) return planned
[docs] def _check_modality_eligibility( modality_name: str, mrow: ModalityRow, *, layout: _LayoutMode, ) -> None: """Raise if ``mrow`` is not eligible for M2 raw-path download.""" if mrow.ingestion_status is not IngestionStatus.RAW: raise NotImplementedError( f"modality {modality_name!r} on recording {mrow.recording_hash!r} " f"has ingestion_status={mrow.ingestion_status.value!r}; " f"download for the processed path (storage_uri) lands with " f"{_PROCESSED_PATH_TICKET} / Phase 1b. M2 supports raw " "modalities only." ) if layout == "flat": # Avoid ambiguous flattened directory names. CatalogID's regex # permits ``_`` so ``__`` is technically valid in a hash, and # ModalityName is NonEmptyString with no further constraints — # so reject both at the layout boundary rather than crashing # the intra-call dup-dest check downstream. if "__" in mrow.recording_hash: raise ValueError( f"layout='flat' requires recording_hash without '__'; got {mrow.recording_hash!r}" ) if "__" in modality_name: raise ValueError( f"layout='flat' requires modality names without '__'; got {modality_name!r}" )
[docs] def _dest_path( dest_root: Path, *, layout: _LayoutMode, recording_hash: str, modality: str, rel_key: str, ) -> Path: """Compute the on-disk destination for one source segment. ``rel_key`` is the source key's path relative to its modality prefix; passing it through as a sub-path preserves any data-engine-internal worker structure (e.g. multi-file segments under date-partitioned subdirectories). ``FORMAT_EXT[mrow.format]`` is intentionally unused in M2 — Phase 1b (ENG-1093) consumes it for the single-file ``{modality}.{ext}`` form. """ if layout == "by_recording": return dest_root / recording_hash / modality / rel_key if layout == "by_modality": return dest_root / modality / recording_hash / rel_key # layout == "flat" return dest_root / f"{recording_hash}__{modality}" / rel_key
# --------------------------------------------------------------------------- # Execute phase # ---------------------------------------------------------------------------
[docs] def _stream_to_disk(store: ObjectStore, key: str, dest: Path) -> None: """Copy bytes from ``store[key]`` to ``dest`` via a temp-file rename. Uses :meth:`ObjectStore.open` (forward-only ``BinaryIO``), not :meth:`ObjectStore.get`, so multi-GB raw video/audio never sits in memory. The ``.part`` rename ensures partial writes are not visible at the canonical destination — if the stream fails or the process is interrupted, the temp file is unlinked and the canonical path either doesn't exist (first write) or still holds the previous bytes (overwrite case — ``os.replace`` is atomic and only swaps in the new file once it's fully written). """ dest.parent.mkdir(parents=True, exist_ok=True) tmp = dest.with_suffix(dest.suffix + ".part") try: with store.open(key) as src, tmp.open("wb") as out: while chunk := src.read(_STREAM_CHUNK): out.write(chunk) tmp.replace(dest) # atomic same-volume rename (os.replace) except BaseException: # Cover normal exceptions and KeyboardInterrupt / SystemExit # alike — a half-written ``.part`` file is never useful. tmp.unlink(missing_ok=True) raise