Source code for ursa.register.orchestrator

"""``DataInterface.ingest()`` orchestrator (ENG-1130).

The orchestrator is the single high-level write surface that owns *both*
the R2 upload of raw segment files and the catalog write — atomically,
in the sense that catalog rows only appear *after* every modality byte
is in R2. This guarantees ``DataInterface.query()`` never returns a
row whose download would 404.

It inherits the data-engine uploader's existing semantics:

* **Mid-file cooperative cancellation.** Each file body is wrapped in
  a :class:`_CancellableReader` that polls ``is_active()`` before
  every chunk read. When it flips ``True`` (a rig worker started
  recording), the next ``read()`` raises :class:`UploadInterrupted`,
  the backend's ``put`` propagates it out, and :func:`ingest` returns
  ``IngestResult(paused=True, recording=None, ...)``. Latency is
  bounded by the chunks-in-flight that obstore has already pulled from
  the reader: ``chunk_size`` (5 MiB default) × ``max_concurrency``
  (12 default) ≈ 60 MiB of in-flight bytes per file. Those finish
  before the cancellation propagates; subsequent chunks fault on
  the next ``read()`` call. So for an obstore-default 500 MiB PUT
  the worst-case preemption is "remaining time to flush the
  ~60 MiB already in flight" (~1–2 s at typical R2 throughput),
  not "remaining time to finish the whole file" (~30 s). For small
  single-read files the wrapper still polls before the read; sub-
  multipart files complete fast enough that they're effectively
  atomic. **No catalog rows are written on pause.** If preemption
  latency matters more than throughput for a particular caller,
  pass ``max_concurrent_files=1`` and consider plumbing
  ``chunk_size`` / ``max_concurrency`` overrides through
  ``ObjectStore.put`` (not exposed in v0.1.0).
* **Resume.** Re-running with the same manifest is idempotent
  end-to-end: hash-skip + ``_register_*`` skip-on-match. Partially-
  uploaded objects whose ``put`` was interrupted leave nothing intact
  in R2 (obstore aborts the multipart) and the next call re-uploads
  them via the normal path.

The orchestrator trusts the manifest's pre-computed sha256s (written by
``data-engine/uploader/manifest.py:augment_manifest`` before ingest is
called). This avoids re-hashing every file — per-file hashing already
happened at augment time. If the manifest is missing ``files`` or
``recording_hash``, :func:`ingest` raises a clear error pointing to
``augment_manifest``.

The **resume-skip signal is size-only** (obstore's ``head()`` doesn't
surface ``x-amz-meta-sha256`` user metadata; see ``_head_matches``).
The per-file sha256 from the manifest is still written into
``x-amz-meta-sha256`` on each PUT for post-hoc audit via ``get``, but
it is *not* what decides "is this object already in R2?" on a
subsequent ``ingest()`` call. Content drift is caught at the catalog
layer instead: the manifest's content-addressed ``recording_hash``
changes whenever any file's bytes change, so a re-augmented session
with modified content lands as a fresh row rather than overwriting.

For backfill (ENG-1096), :func:`ingest_from_r2` reads the manifest +
file listing from R2 directly and runs only the register step (no
upload).
"""

from __future__ import annotations

import json
import logging
import os
import threading
from collections.abc import Callable
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Any, BinaryIO

from ursa.catalog import CatalogRowExists, RecordingRow
from ursa.register.payload import RegisterPayload, manifest_to_register_payload
from ursa.store import ObjectNotFound, ObjectStore

if TYPE_CHECKING:
    from ursa.data_interface import DataInterface

logger = logging.getLogger("ursa.register.orchestrator")


__all__ = [
    "IngestResult",
    "UploadInterrupted",
    "UploadProgress",
    "ingest",
    "ingest_from_r2",
]


[docs] class UploadInterrupted(RuntimeError): """Raised internally when a :class:`_CancellableReader` sees ``is_active()`` flip ``True`` during a chunk read. Propagates out of the backend's ``put`` call; :func:`ingest` catches it and converts to ``IngestResult(paused=True)``. Mirrors ``uploader.r2_client.UploadInterrupted`` in data-engine, where the same exception aborts a boto3 ``Callback=``-instrumented PUT. Public for callers who want to differentiate "the wrapped read was interrupted" from other errors (typically: don't); the orchestrator catches it before bubbling to user code. """
class _CancellableReader: """File-body proxy whose ``read()`` polls ``is_active()`` before each chunk. obstore's streaming ``put`` reads :class:`BinaryIO` chunk-by-chunk (especially in multipart mode for files above the threshold, ~8 MiB). Each chunk-read is preceded by a callback check; flipping ``is_active`` between two chunks raises :class:`UploadInterrupted`, which propagates out of ``put`` and out of :func:`_upload_one` and is caught by :func:`ingest`'s outer loop. For very small files that complete in one ``read(-1)`` call, the cancellation point is the pre-call ``is_active()`` check — they're small enough that "either we abort instantly or we finish instantly" is acceptable. Delegates non-``read`` attributes via ``__getattr__`` so seekable / tell / readable probes the backend does at setup pass through to the underlying file. Not subclassed from :class:`io.IOBase` because obstore accepts duck-typed file-like objects and the explicit delegate keeps the wrapper's surface tight. """ __slots__ = ("_fileobj", "_is_active", "_abort") def __init__( self, fileobj: BinaryIO, is_active: Callable[[], bool], abort: threading.Event | None = None, ) -> None: self._fileobj = fileobj self._is_active = is_active self._abort = abort def read(self, size: int = -1) -> bytes: # The orchestrator-wide ``abort`` event flips when *any* worker # raised (e.g. an early R2 5xx). Without this check the # ThreadPoolExecutor's __exit__ would block waiting for every # other in-flight PUT to drain — minutes of wasted bandwidth on # the error path (B3). The ``is_active`` callback is the # recording-active cancel path; ``abort`` is the error path. if self._abort is not None and self._abort.is_set(): raise UploadInterrupted("orchestrator aborted in-flight uploads") if self._is_active(): raise UploadInterrupted("recording started; aborting in-flight upload") return self._fileobj.read(size) def readable(self) -> bool: return True def __getattr__(self, name: str) -> Any: # Delegate seek / tell / fileno / close to the wrapped file so # backend probes (obstore's content-length probe, fsspec hooks) # see a fully functional file-like object. # # Dunder + private-attribute guard (B2): if anything probes # ``__class__`` or another private name before ``__init__`` # has set ``_fileobj`` (traceback formatting, copy, debugger # introspection), the naive delegation recurses into # ``__getattr__`` forever. Raising AttributeError on the # underscore prefix breaks the cycle. if name.startswith("_"): raise AttributeError(name) return getattr(self._fileobj, name)
[docs] @dataclass(frozen=True, slots=True) class UploadProgress: """Per-file progress signal threaded to caller-supplied callbacks. Daemon mappers (data-engine ENG-1131) turn this into ``extra={"uploader.*": ...}`` structured logs. ``bytes_uploaded`` counts only PUT bytes (excludes hash-skipped files); ``bytes_processed`` counts every file's full size whether uploaded or hash-skipped. The daemon's batch progress bar reads ``bytes_processed`` so resume-after-pause climbs through skips, while ``bytes_uploaded`` drives the per-session network counter. Both are *cumulative* across the events emitted for one session; consumers that need deltas should diff against the prior event. """ files_done: int files_total: int bytes_uploaded: int bytes_processed: int bytes_total: int current_file: str
[docs] @dataclass(frozen=True, slots=True) class IngestResult: """Return value of :func:`ingest` / :func:`ingest_from_r2`. ``paused`` is the daemon's signal to leave the session "pending" and retry later. When ``paused=True``, ``recording`` is ``None`` and **no catalog rows were written**. When ``paused=False``, ``recording`` is the registered :class:`RecordingRow`. """ paused: bool recording: RecordingRow | None files_uploaded: int files_skipped: int bytes_uploaded: int
def _read_manifest(manifest_path: Path) -> dict[str, Any]: with manifest_path.open() as f: data = json.load(f) if not isinstance(data, dict): raise ValueError(f"manifest at {manifest_path!r} is not a JSON object") return data def _require_augmented(manifest: dict[str, Any], manifest_path: Path) -> None: """Reject manifests written by the dashboard's `_finalize_session` *before* the uploader's ``augment_manifest`` step has run.""" # Truthy check (not ``not in``) so a manifest with # ``"recording_hash": ""`` or ``"files": []`` fails *here* with # the clear error, not later inside ``manifest_to_register_payload`` # with a less obvious one (S12). missing = [k for k in ("recording_hash", "files") if not manifest.get(k)] if missing: raise ValueError( f"manifest at {manifest_path!r} is missing {missing!r}; call " f"augment_manifest(session, files) before ingest(). The base manifest " f"written by the dashboard carries display metadata only; per-file " f"sha256s + recording_hash are added at upload time." ) def _r2_key_for(rec_id: str, rel_path: str) -> str: """R2 object key for a session-relative file (mirrors data-engine layout).""" return f"recordings/{rec_id}/{rel_path}" def _head_matches(store: ObjectStore, key: str, *, expected_size: int) -> bool: """True if R2 already has an object at ``key`` that we should skip. **Size-based skip only.** obstore's ``head()`` does not surface ``x-amz-meta-sha256`` user metadata (see ``ursa.store.backends._obstore._meta_from_obstore``); ``ObjectMeta.sha256`` is always ``None`` from a head response. We therefore key the skip decision on size match alone. The original PUT stored the sha256 as user metadata on the object, so content integrity is still verifiable post-hoc via ``get`` — we just can't read it cheaply during a HEAD-only resume sweep. Catalog-side defense in depth: the manifest's ``recording_hash`` is the content-addressed digest of every per-file sha256 (see ``data-engine/uploader/hashing.py:compute_recording_hash``). A rig that re-augments a session whose contents changed produces a different ``recording_hash`` and lands as a fresh catalog row; there is no path where size-match-but-content-differ silently overwrites an existing catalog row. """ try: meta = store.head(key) except ObjectNotFound: return False return meta.size == expected_size def _process_one_file( store: ObjectStore, *, key: str, abs_path: str, expected_size: int, expected_sha256: str, is_active: Callable[[], bool] | None = None, abort: threading.Event | None = None, overwrite: bool = False, ) -> tuple[str, int]: """HEAD-skip + maybe-upload one file. Worker-thread entry point. Returns ``(action, bytes_uploaded)`` where action is ``"uploaded"``, ``"skipped"``, or ``"interrupted"``. Putting HEAD + PUT in one work unit lets the pool parallelize the HEADs alongside the uploads (matches the prior data-engine ``_process_one``). ``abort`` is the orchestrator-wide error-path signal — when set, in-flight workers raise :class:`UploadInterrupted` on the next chunk read so the executor's ``__exit__`` doesn't block draining every other PUT to completion (B3). Raises propagate (corrupt manifest, R2 5xx, etc.); the caller treats unexpected raises as a hard error. """ if not overwrite and _head_matches(store, key, expected_size=expected_size): return ("skipped", 0) try: n = _upload_one( store, key=key, abs_path=abs_path, sha256=expected_sha256, is_active=is_active, abort=abort, ) except UploadInterrupted: return ("interrupted", 0) return ("uploaded", n) def _upload_one( store: ObjectStore, *, key: str, abs_path: str, sha256: str, is_active: Callable[[], bool] | None = None, abort: threading.Event | None = None, ) -> int: """Upload one file to R2. Returns bytes uploaded. When ``is_active`` is supplied, the file body is wrapped in a :class:`_CancellableReader` that lets a recording-start preempt the in-flight PUT mid-chunk; ``abort`` is the orchestrator-wide error-path signal that does the same when *any* worker raised. Raises :class:`UploadInterrupted` in either case; the caller is responsible for handling it. """ with open(abs_path, "rb") as f: # Stream the file body to the store. ``put`` accepts BinaryIO # and the obstore backend pipes it without buffering the whole # file in memory. ``sha256=`` lands in x-amz-meta-sha256 so the # next ``head()`` can hash-skip without re-fetching the body. if is_active is not None or abort is not None: body: BinaryIO = _CancellableReader( # type: ignore[assignment] f, is_active if is_active is not None else _never_active, abort, ) else: body = f store.put(key, body, sha256=sha256) return os.path.getsize(abs_path) def _never_active() -> bool: """Sentinel callback for ``_CancellableReader`` when only the ``abort`` signal is in play (no ``is_active`` callback supplied). """ return False def _maybe_emit(progress: Callable[[UploadProgress], None] | None, evt: UploadProgress) -> None: if progress is None: return try: progress(evt) except Exception: # noqa: BLE001 # Progress callbacks must not break the upload. Log and continue. logger.exception("progress callback raised; continuing upload") def _do_register( data: DataInterface, payload: RegisterPayload, *, participant_id: str, participant_enrolled_at: int, participant_display_name: str | None, ) -> RecordingRow: """Run the participant + recording + modality writes idempotently. Each underlying ``register_*`` is check-and-add: identical re-write returns the existing row, mismatched re-write raises :class:`CatalogRowExists`. Asymmetric handling by row type: * **Recording / modality**: any payload divergence propagates as ``CatalogRowExists``. The orchestrator does NOT catch — let the daemon's outer error path log + abort. Divergence here means a real conflict (two rigs claiming the same recording_hash with different metadata, etc.) and silently swallowing it would mask a bug. * **Participant**: divergence is swallowed and logged at INFO. This is deliberate. The dashboard's free-form ``participant`` display-name slugs into a CatalogID that may collide across sessions for the same person; the *first* session's ``enrolled_at`` + display-name metadata wins, subsequent ones are no-ops. Tracked separately under ENG-1099 (proper participant tracking with stable IDs). """ # 1. Participant (idempotent on exact match; divergent metadata # logged + skipped, see docstring). try: data.register_participant( participant_id=participant_id, enrolled_at=participant_enrolled_at, metadata={"display_name": participant_display_name} if participant_display_name else {}, ) except CatalogRowExists: logger.info( "participant %s already registered with divergent metadata; " "skipping update (ENG-1099 follow-up).", participant_id, ) # 2. Recording. data.register_recording( recording_hash=payload.recording.recording_hash, participant_ids=payload.recording.participant_ids, start_time=payload.recording.start_time, duration=payload.recording.duration, device_info=payload.recording.device_info, metadata=payload.recording.metadata, ) # 3. Modalities. for m in payload.modalities: data.register_modality( recording_hash=m.recording_hash, modality=m.modality, raw_storage_uri=m.raw_storage_uri, storage_uri=m.storage_uri, ingestion_status=m.ingestion_status, format=m.format, metadata=m.metadata, ) return payload.recording
[docs] def ingest( data: DataInterface, manifest_path: Path | str, *, source_dir: Path | str, participant_id: str, is_active: Callable[[], bool] | None = None, progress: Callable[[UploadProgress], None] | None = None, overwrite: bool = False, collapse_modalities: bool = True, max_concurrent_files: int = 4, ) -> IngestResult: """Upload a session's raw files to R2 and register the catalog rows. Reads the (augmented) manifest at ``manifest_path``, uploads every file listed in ``manifest['files']`` from ``source_dir`` to the raw R2 bucket via hash-skip + streaming PUT, and registers the catalog rows once every byte is uploaded. Cancellation cooperates with the data-engine daemon's worker-activity watcher: when ``is_active()`` flips True between files, the function drains pending work and returns ``paused=True`` *without* writing catalog rows. Parameters ---------- data A :class:`DataInterface`. Must have called ``enable_writes(roles=("assets_rw", "raw_rw"))`` first — :func:`ingest` raises :class:`WritesNotEnabled` otherwise. manifest_path Path to the local ``manifest.json`` (typically ``<source_dir>/manifest.json``). Must already be augmented with ``recording_hash`` + ``files`` (data-engine's ``augment_manifest``). source_dir Root of the on-disk session, ``rec_<id>/`` typically. File paths in ``manifest['files'][*].rel_path`` are resolved against this directory. participant_id Catalog ID for the participant. Schema currently requires non-empty ``participant_ids`` on a :class:`RecordingRow`; legacy null-participant support is tracked separately (ENG-1099). is_active Optional cooperative-cancellation callback. Polled in three places: (1) a pre-flight check before the upload pool runs, (2) inside each ``_CancellableReader.read()`` before every chunk the backend pulls from disk, and (3) on each completed ``as_completed`` future. Returning ``True`` aborts in-flight PUTs mid-chunk (the next ``read`` raises :class:`UploadInterrupted`), cancels pending futures, and triggers a paused return *without writing catalog rows*. **Thread-safety contract**: ``is_active`` is invoked from ``ThreadPoolExecutor`` worker threads as well as the main thread. Callers must ensure their implementation is safe under concurrent calls (B8). The data-engine integration (``RecordingActivityWatcher.is_recording_active``) reads a single bool and is fine; ad-hoc lambdas that mutate state need their own synchronisation. progress Optional per-file progress callback. Receives an :class:`UploadProgress` after each file's status is decided (uploaded or skipped). Exceptions raised inside the callback are caught and logged so they cannot break the upload. overwrite If ``True``, skip the hash-skip step and re-upload every file. Mostly useful for tests; production callers leave it False. collapse_modalities When ``True`` (default), worker subdir names are collapsed to their first underscore segment for the catalog modality name (``camera_0`` → ``camera``). If two worker subdirs collapse to the same short name (e.g. multi-camera rig ``camera_aaaa`` + ``camera_bbbb`` → both ``camera``), the function automatically retries with ``collapse_modalities=False`` and logs at WARNING. Modality names then equal the worker subdir verbatim. max_concurrent_files Worker-pool size for the upload step. Default 4 matches the prior data-engine uploader (``uploader/config.yaml:71``); set to 1 for deterministic test ordering. """ manifest_path = Path(manifest_path) source_dir = Path(source_dir) data._require_role("raw_rw") data._require_role("assets_rw") raw_store = data._raw_rw_store # Belt-and-braces (N4): unreachable in practice because # ``_require_role("raw_rw")`` above either passed (so # ``_raw_rw_store`` was set by ``enable_writes``) or raised. The # one case where it can be None is ``profile="local"``, which # is the explicit branch immediately below; everywhere else # this is a torn invariant worth shouting about. if raw_store is None and data._profile != "local": raise RuntimeError("internal: _raw_rw_store is None despite role check") manifest = _read_manifest(manifest_path) _require_augmented(manifest, manifest_path) rec_id: str = manifest["recording_id"] files: list[dict[str, Any]] = manifest["files"] # Build the payload eagerly so manifest-shape errors surface before # we put bytes on the wire. Auto-retry with collapse_modalities=False # if the rig has two workers of the same modality family (e.g. # ``camera_aaaa`` + ``camera_bbbb`` both collapse to ``camera`` and # trip the collision check). Logs the fallback so operators can see # which sessions ended up with verbatim worker_subdir modality names. rel_paths = [f["rel_path"] for f in files] try: payload = manifest_to_register_payload( manifest, rel_paths, participant_id=participant_id, profile=data._profile, collapse_modalities=collapse_modalities, ) except ValueError as e: if collapse_modalities and "modality-name collision" in str(e): logger.warning( "ingest: %s; retrying with collapse_modalities=False for rec=%s", e, rec_id, ) payload = manifest_to_register_payload( manifest, rel_paths, participant_id=participant_id, profile=data._profile, collapse_modalities=False, ) else: raise bytes_total = sum(int(f["size"]) for f in files) files_total = len(files) files_uploaded = 0 files_skipped = 0 bytes_uploaded = 0 bytes_processed = 0 # uploaded + skipped, for batch-bar progress paused = False if raw_store is None: # profile="local" — no real R2; emit a single info log and # short-circuit to the register step. Used by tests that don't # care about the upload mechanics but want to exercise the # register flow. logger.info("ingest: profile=local; skipping R2 upload step for rec=%s", rec_id) elif is_active is not None and is_active(): # Pre-flight is_active check — if a recording is already active # when ingest() is called, return paused immediately without # touching the pool. logger.info( "ingest: is_active=True at start; pausing rec=%s without uploads", rec_id, ) return IngestResult( paused=True, recording=None, files_uploaded=0, files_skipped=0, bytes_uploaded=0, ) else: # Parallel uploads via a worker pool; matches the prior # data-engine ``_process_files`` semantics (default 4 workers). # Each worker's HEAD-skip-or-PUT is independent; all share the # same ``is_active`` callback so a worker start propagates to # every in-flight :class:`_CancellableReader` at once. max_workers = max(1, max_concurrent_files) # Shared error-path signal (B3). Set when any worker raises so # every other in-flight :class:`_CancellableReader` aborts on # the next chunk read — otherwise the executor's ``__exit__`` # waits for every in-flight PUT to drain before propagating # the exception (minutes of wasted bandwidth on an early 5xx). abort = threading.Event() with ThreadPoolExecutor(max_workers=max_workers) as pool: futures = { pool.submit( _process_one_file, raw_store, key=_r2_key_for(rec_id, entry["rel_path"]), abs_path=str(source_dir / entry["rel_path"]), expected_size=int(entry["size"]), expected_sha256=entry["sha256"], is_active=is_active, abort=abort, overwrite=overwrite, ): entry for entry in files } for fut in as_completed(futures): if fut.cancelled(): continue entry = futures[fut] rel_path = entry["rel_path"] try: action, n = fut.result() except Exception: # Unexpected raises (R2 5xx, OS error). Set the # shared abort signal so every in-flight reader # aborts on its next chunk (B3) instead of running # the rest of its PUT to completion under # ThreadPoolExecutor.__exit__. Then cancel pending # and re-raise for the daemon's error path # (matches ``_abort_session`` semantics in the # prior uploader). abort.set() for fut_to_cancel in futures: fut_to_cancel.cancel() raise if action == "uploaded": files_uploaded += 1 bytes_uploaded += n bytes_processed += n elif action == "skipped": files_skipped += 1 # Hash-skipped files contribute their full size to # bytes_processed (so the dashboard's batch bar # advances on resume-after-pause) but not to # bytes_uploaded (no network bytes flowed). bytes_processed += int(entry["size"]) elif action == "interrupted": # Mid-PUT abort — _CancellableReader fired because # is_active flipped. Other in-flight workers will # see the same flag on their next chunk; just mark # paused and let `as_completed` finish collecting. paused = True _maybe_emit( progress, UploadProgress( files_done=files_uploaded + files_skipped, files_total=files_total, bytes_uploaded=bytes_uploaded, bytes_processed=bytes_processed, bytes_total=bytes_total, current_file=rel_path, ), ) # Cancel pending the first time we observe paused; # subsequent iterations skip the cancel sweep. Already- # running futures continue and abort themselves via the # shared ``is_active`` callback in their _CancellableReader. if not paused and is_active is not None and is_active(): paused = True for fut_to_cancel in futures: fut_to_cancel.cancel() elif paused and action == "interrupted": # The interrupted future flipped paused this iteration — # cancel pending now (one sweep, not per iteration). for fut_to_cancel in futures: fut_to_cancel.cancel() if paused: logger.info( "ingest: paused rec=%s files_uploaded=%d files_skipped=%d before catalog write", rec_id, files_uploaded, files_skipped, ) return IngestResult( paused=True, recording=None, files_uploaded=files_uploaded, files_skipped=files_skipped, bytes_uploaded=bytes_uploaded, ) # Upload succeeded for every file. Register the catalog rows. enrolled_at = payload.recording.start_time display_name = payload.recording.metadata.get("participant_display_name") recording = _do_register( data, payload, participant_id=participant_id, participant_enrolled_at=enrolled_at, participant_display_name=display_name if isinstance(display_name, str) else None, ) logger.info( "ingest: rec=%s files_uploaded=%d files_skipped=%d bytes_uploaded=%d recording_hash=%s", rec_id, files_uploaded, files_skipped, bytes_uploaded, recording.recording_hash, ) return IngestResult( paused=False, recording=recording, files_uploaded=files_uploaded, files_skipped=files_skipped, bytes_uploaded=bytes_uploaded, )
[docs] def ingest_from_r2( data: DataInterface, rec_id: str, *, participant_id: str, ) -> IngestResult: """Register a recording that's already in R2 — backfill / ENG-1096. Reads ``recordings/<rec_id>/manifest.json`` from the raw bucket, requires it to be augmented (``recording_hash`` + ``files`` present), and runs only the register step. No bytes uploaded. For legacy recordings without an augmented manifest, a separate code path (tracked alongside ENG-1096) lists R2 directly and recomputes ``recording_hash`` from object sha256 user metadata. That fallback is **not** implemented in this entrypoint to keep the contract sharp; ENG-1096 may add it as a sibling function. Raises ``ValueError`` for ``profile="local"`` — backfill only makes sense against an R2-backed catalog. """ if data._profile == "local": raise NotImplementedError( "ingest_from_r2 requires an R2 profile (r2 / r2-test); " "got profile='local'. There is no R2 to read the manifest " "from — use `ingest()` with a local source_dir instead." ) data._require_role("assets_rw") # Reading the manifest from R2 needs the raw_ro role rather than # raw_rw. Cached on the DataInterface so successive # ``ingest_from_r2`` calls (the typical backfill loop pattern) # reuse the underlying obstore connection pool instead of leaking # a fresh handle per call (S3). No ``enable_writes`` gate needed — # raw_ro is a read role. if data._raw_ro_store is None: from ursa.store import get_store data._raw_ro_store = get_store("raw_ro", profile=data._profile) raw_ro = data._raw_ro_store manifest_key = f"recordings/{rec_id}/manifest.json" try: body = raw_ro.get(manifest_key) except ObjectNotFound as e: raise FileNotFoundError( f"no manifest at r2://.../{manifest_key}; " f"recording {rec_id!r} is missing or pre-augment-era. " f"Legacy backfill without an augmented manifest is tracked " f"alongside ENG-1096." ) from e manifest = json.loads(body.decode("utf-8")) if not isinstance(manifest, dict): raise ValueError(f"manifest at {manifest_key!r} is not a JSON object") _require_augmented(manifest, Path(manifest_key)) rel_paths = [f["rel_path"] for f in manifest["files"]] # HEAD-probe every file in the manifest before writing catalog rows # (S8). ``ingest()`` exists to enforce "no catalog rows pointing at # non-existent R2 objects" via the upload+register atomicity; # ``ingest_from_r2`` is the trust-the-manifest fast path, so we # have to verify the bytes are actually there ourselves. Without # this check, a session whose manifest landed but whose worker # files are absent (interrupted upload, accidental delete, # manifest-only fast-restore) would produce catalog rows that # always 404 on download. missing: list[str] = [] for rel in rel_paths: try: raw_ro.head(f"recordings/{rec_id}/{rel}") except ObjectNotFound: missing.append(rel) if missing: raise FileNotFoundError( f"manifest at {manifest_key!r} lists {len(missing)} file(s) " f"not present in R2 (first few: {missing[:5]!r}). Refusing " f"to register catalog rows that would 404 on download. " f"Either re-upload the session via `ingest()` or remove " f"the stale manifest from R2." ) payload = manifest_to_register_payload( manifest, rel_paths, participant_id=participant_id, profile=data._profile, ) display_name = payload.recording.metadata.get("participant_display_name") recording = _do_register( data, payload, participant_id=participant_id, participant_enrolled_at=payload.recording.start_time, participant_display_name=display_name if isinstance(display_name, str) else None, ) # ``files_skipped`` retains its ``ingest()`` semantics ("HEAD # confirmed identical"); ``ingest_from_r2`` only verifies presence # (S9). Zero it out so consumers branching on files_skipped don't # get a misleading signal — the verification is HEAD-presence, not # content match. return IngestResult( paused=False, recording=recording, files_uploaded=0, files_skipped=0, bytes_uploaded=0, )