"""Stream raw-modality bytes from object storage to local disk (ENG-1091).
Third verb in the M2 three-verb read API (``query`` → ``get`` →
``download``): ``query()`` selects, ``get()`` (ENG-890) materializes
bytes into memory, and ``download()`` writes those same bytes to disk
**without parsing**. ``download`` and ``get`` are symmetric — both
consume a :class:`QueryResult` (or iterable of them), both resolve URIs
via :func:`ursa.store.parse_storage_uri`, and both are M2-gated to
``ingestion_status="raw"``.
Phase 1a (M2) — raw path only
-----------------------------
For each modality with ``ingestion_status="raw"``, list the segment
files under ``ModalityRow.raw_storage_uri`` (a prefix), stream each
segment's bytes to disk under a per-modality directory, and return the
list of files written. ``ObjectStore.open()`` is used (not ``get()``) so
multi-GB objects (``RAW_VIDEO``, ``RAW_AUDIO``) never sit in memory.
The ticket's ``{modality}.{ext}`` form presupposes single-file canonical
formats — that's Phase 1b. Raw modalities are multi-segment prefixes
(N segment files under a directory), so M2 writes a directory tree per
modality and the ``layout`` selector controls only the parent path. The
ticket annotation is captured in the PR body.
Phase 1b — processed path
-------------------------
Deferred to ENG-1093. ``download`` currently raises
:class:`NotImplementedError` for any modality with
``ingestion_status="processed"``; the processed-path read (using
``storage_uri`` and canonical Zarr/Lance/MP4_INDEX/Parquet tiers) lands
with the rest of ENG-1093.
"""
from __future__ import annotations
import os
from collections.abc import Iterable
from pathlib import Path
from typing import Literal, NamedTuple
from ursa.catalog.schemas import IngestionStatus, ModalityRow
from ursa.query import QueryResult
from ursa.store import ObjectStore, get_store, parse_storage_uri
__all__ = ["download"]
# Forward-reference markers used in error messages so the next-step
# ticket numbers live in one place.
_PROCESSED_PATH_TICKET = "ENG-1093"
_LayoutMode = Literal["by_recording", "by_modality", "flat"]
_VALID_LAYOUTS: frozenset[str] = frozenset({"by_recording", "by_modality", "flat"})
# Chunk size for the streaming copy. 1 MiB balances syscall overhead
# against memory pressure; even a 10 GB video walks 10240 chunks.
_STREAM_CHUNK = 1 << 20
[docs]
class _PlannedWrite(NamedTuple):
"""One (store, source key, destination path) tuple produced by the
plan phase and consumed by the execute phase.
Holds the :class:`ObjectMeta` too so the execute phase can carry size
info for diagnostics without re-listing the prefix.
"""
store: ObjectStore
source_key: str
dest_path: Path
meta_size: int
# ---------------------------------------------------------------------------
# Public surface
# ---------------------------------------------------------------------------
[docs]
def download(
target: QueryResult | Iterable[QueryResult],
dest: str | os.PathLike[str],
*,
layout: _LayoutMode = "by_recording",
overwrite: bool = False,
) -> list[Path]:
"""Stream raw-modality bytes for ``target`` to disk under ``dest``.
Parameters
----------
target
A single :class:`QueryResult` or any iterable of them.
Single-vs-iterable disambiguation is by type
(``isinstance(target, QueryResult)``), not by length — a
one-element iterable still yields the flat-list contract.
dest
Destination directory. Created if missing. Coerced to
:class:`pathlib.Path`.
layout
Per-segment dest path scheme:
* ``"by_recording"`` (default) →
``dest/{recording_hash}/{modality}/{relative_segment_key}``
* ``"by_modality"`` →
``dest/{modality}/{recording_hash}/{relative_segment_key}``
* ``"flat"`` →
``dest/{recording_hash}__{modality}/{relative_segment_key}``
overwrite
``False`` (default) raises :class:`FileExistsError` if any
destination file already exists, before any I/O. ``True``
lets the temp-file rename in :func:`_stream_to_disk`
atomically replace each existing target — no explicit unlink,
so a mid-stream crash leaves the prior file intact at the
canonical path.
Returns
-------
list[Path]
Files written, **always a flat list** even for a single
:class:`QueryResult` input. Ordering is deterministic:
1. input-recording order (``target`` iteration order),
2. then ``qr.modalities.items()`` insertion order,
3. then lexicographic order on ``ObjectMeta.key`` within a
modality.
Raises
------
NotImplementedError
If any matched modality has ``ingestion_status="processed"``
(deferred to ENG-1093 / Phase 1b).
FileNotFoundError
If a registered modality's ``raw_storage_uri`` lists zero
objects — a catalog/upload bug rather than a silent no-op.
FileExistsError
If ``overwrite=False`` and one or more destination files already
exist. Every collision is collected and surfaced in the error
message (truncated past 20 entries).
ValueError
If ``layout`` is not one of the three accepted modes, if the
plan produces two writes for the same destination path
(defense-in-depth across all layouts), or if
``layout="flat"`` would join an unsafe ``recording_hash`` or
modality name (one containing ``"__"``).
Notes
-----
Streaming uses :meth:`ObjectStore.open` (forward-only) — never
:meth:`ObjectStore.get` — so multi-GB raw video/audio is never fully
materialized in memory. A crash or signal mid-stream leaves a
``.part`` file that is unlinked by the except handler; no
half-written file is left at the canonical destination.
The single-file ``{modality}.{ext}`` form implied by the ticket
signature lands with the Phase 1b processed-path work (ENG-1093);
M2 writes a directory tree per modality regardless of layout.
"""
if layout not in _VALID_LAYOUTS:
raise ValueError(f"layout must be one of {sorted(_VALID_LAYOUTS)!r}; got {layout!r}")
dest_root = Path(dest)
qrs: list[QueryResult] = [target] if isinstance(target, QueryResult) else list(target)
planned = _plan_writes(qrs, dest_root, layout=layout, overwrite=overwrite)
written: list[Path] = []
for pw in planned:
# No pre-unlink even when overwrite=True — _stream_to_disk
# writes to a .part sibling and uses os.replace() to swap into
# the canonical destination, which is atomic on POSIX *and*
# overwrites the existing file. Unlinking first would create
# a window where a mid-stream crash leaves the user with
# nothing instead of the old bytes.
_stream_to_disk(pw.store, pw.source_key, pw.dest_path)
written.append(pw.dest_path)
return written
# ---------------------------------------------------------------------------
# Plan phase
# ---------------------------------------------------------------------------
[docs]
def _plan_writes(
qrs: list[QueryResult],
dest_root: Path,
*,
layout: _LayoutMode,
overwrite: bool,
) -> list[_PlannedWrite]:
"""Enumerate every write before any I/O.
One pass over the inputs produces:
* the full ordered list of :class:`_PlannedWrite` tuples (consumed
by the execute phase — no re-listing of R2 prefixes),
* an intra-call duplicate-destination check that catches collisions
across all layouts (cheap dict lookup; future-proofs against new
layout modes),
* a pre-existing-destination check that batches every collision
into one :class:`FileExistsError` when ``overwrite=False``.
"""
planned: list[_PlannedWrite] = []
seen_dests: dict[Path, _PlannedWrite] = {}
for qr in qrs:
recording_hash = qr.recording_hash
for modality_name, mrow in qr.modalities.items():
_check_modality_eligibility(modality_name, mrow, layout=layout)
role, key_prefix = parse_storage_uri(mrow.raw_storage_uri)
store: ObjectStore = get_store(role)
metas = sorted(store.list(prefix=key_prefix), key=lambda m: m.key)
if not metas:
raise FileNotFoundError(
f"no objects at {mrow.raw_storage_uri!r} for modality "
f"{modality_name!r} on recording {recording_hash!r}; "
"registered modality with zero segments is a catalog "
"or upload bug."
)
for meta in metas:
rel_key = meta.key.removeprefix(key_prefix).lstrip("/")
dest_path = _dest_path(
dest_root,
layout=layout,
recording_hash=recording_hash,
modality=modality_name,
rel_key=rel_key,
)
if dest_path in seen_dests:
prior = seen_dests[dest_path]
raise ValueError(
f"layout={layout!r} produces duplicate destination "
f"{dest_path}: would be written by both "
f"{prior.source_key!r} and {meta.key!r}"
)
pw = _PlannedWrite(
store=store,
source_key=meta.key,
dest_path=dest_path,
meta_size=meta.size,
)
seen_dests[dest_path] = pw
planned.append(pw)
if not overwrite:
collisions = [pw.dest_path for pw in planned if pw.dest_path.exists()]
if collisions:
shown = ", ".join(str(p) for p in collisions[:20])
more = f" …and {len(collisions) - 20} more" if len(collisions) > 20 else ""
raise FileExistsError(
f"refusing to overwrite {len(collisions)} existing file(s): "
f"[{shown}{more}]; pass overwrite=True to replace."
)
return planned
[docs]
def _check_modality_eligibility(
modality_name: str,
mrow: ModalityRow,
*,
layout: _LayoutMode,
) -> None:
"""Raise if ``mrow`` is not eligible for M2 raw-path download."""
if mrow.ingestion_status is not IngestionStatus.RAW:
raise NotImplementedError(
f"modality {modality_name!r} on recording {mrow.recording_hash!r} "
f"has ingestion_status={mrow.ingestion_status.value!r}; "
f"download for the processed path (storage_uri) lands with "
f"{_PROCESSED_PATH_TICKET} / Phase 1b. M2 supports raw "
"modalities only."
)
if layout == "flat":
# Avoid ambiguous flattened directory names. CatalogID's regex
# permits ``_`` so ``__`` is technically valid in a hash, and
# ModalityName is NonEmptyString with no further constraints —
# so reject both at the layout boundary rather than crashing
# the intra-call dup-dest check downstream.
if "__" in mrow.recording_hash:
raise ValueError(
f"layout='flat' requires recording_hash without '__'; got {mrow.recording_hash!r}"
)
if "__" in modality_name:
raise ValueError(
f"layout='flat' requires modality names without '__'; got {modality_name!r}"
)
[docs]
def _dest_path(
dest_root: Path,
*,
layout: _LayoutMode,
recording_hash: str,
modality: str,
rel_key: str,
) -> Path:
"""Compute the on-disk destination for one source segment.
``rel_key`` is the source key's path relative to its modality
prefix; passing it through as a sub-path preserves any
data-engine-internal worker structure (e.g. multi-file segments
under date-partitioned subdirectories). ``FORMAT_EXT[mrow.format]``
is intentionally unused in M2 — Phase 1b (ENG-1093) consumes it for
the single-file ``{modality}.{ext}`` form.
"""
if layout == "by_recording":
return dest_root / recording_hash / modality / rel_key
if layout == "by_modality":
return dest_root / modality / recording_hash / rel_key
# layout == "flat"
return dest_root / f"{recording_hash}__{modality}" / rel_key
# ---------------------------------------------------------------------------
# Execute phase
# ---------------------------------------------------------------------------
[docs]
def _stream_to_disk(store: ObjectStore, key: str, dest: Path) -> None:
"""Copy bytes from ``store[key]`` to ``dest`` via a temp-file rename.
Uses :meth:`ObjectStore.open` (forward-only ``BinaryIO``), not
:meth:`ObjectStore.get`, so multi-GB raw video/audio never sits in
memory. The ``.part`` rename ensures partial writes are not visible
at the canonical destination — if the stream fails or the process
is interrupted, the temp file is unlinked and the canonical path
either doesn't exist (first write) or still holds the previous
bytes (overwrite case — ``os.replace`` is atomic and only swaps in
the new file once it's fully written).
"""
dest.parent.mkdir(parents=True, exist_ok=True)
tmp = dest.with_suffix(dest.suffix + ".part")
try:
with store.open(key) as src, tmp.open("wb") as out:
while chunk := src.read(_STREAM_CHUNK):
out.write(chunk)
tmp.replace(dest) # atomic same-volume rename (os.replace)
except BaseException:
# Cover normal exceptions and KeyboardInterrupt / SystemExit
# alike — a half-written ``.part`` file is never useful.
tmp.unlink(missing_ok=True)
raise