"""``DataInterface.ingest()`` orchestrator (ENG-1130).
The orchestrator is the single high-level write surface that owns *both*
the R2 upload of raw segment files and the catalog write — atomically,
in the sense that catalog rows only appear *after* every modality byte
is in R2. This guarantees ``DataInterface.query()`` never returns a
row whose download would 404.
It inherits the data-engine uploader's existing semantics:
* **Mid-file cooperative cancellation.** Each file body is wrapped in
a :class:`_CancellableReader` that polls ``is_active()`` before
every chunk read. When it flips ``True`` (a rig worker started
recording), the next ``read()`` raises :class:`UploadInterrupted`,
the backend's ``put`` propagates it out, and :func:`ingest` returns
``IngestResult(paused=True, recording=None, ...)``. Latency is
bounded by the chunks-in-flight that obstore has already pulled from
the reader: ``chunk_size`` (5 MiB default) × ``max_concurrency``
(12 default) ≈ 60 MiB of in-flight bytes per file. Those finish
before the cancellation propagates; subsequent chunks fault on
the next ``read()`` call. So for an obstore-default 500 MiB PUT
the worst-case preemption is "remaining time to flush the
~60 MiB already in flight" (~1–2 s at typical R2 throughput),
not "remaining time to finish the whole file" (~30 s). For small
single-read files the wrapper still polls before the read; sub-
multipart files complete fast enough that they're effectively
atomic. **No catalog rows are written on pause.** If preemption
latency matters more than throughput for a particular caller,
pass ``max_concurrent_files=1`` and consider plumbing
``chunk_size`` / ``max_concurrency`` overrides through
``ObjectStore.put`` (not exposed in v0.1.0).
* **Resume.** Re-running with the same manifest is idempotent
end-to-end: hash-skip + ``_register_*`` skip-on-match. Partially-
uploaded objects whose ``put`` was interrupted leave nothing intact
in R2 (obstore aborts the multipart) and the next call re-uploads
them via the normal path.
The orchestrator trusts the manifest's pre-computed sha256s (written by
``data-engine/uploader/manifest.py:augment_manifest`` before ingest is
called). This avoids re-hashing every file — per-file hashing already
happened at augment time. If the manifest is missing ``files`` or
``recording_hash``, :func:`ingest` raises a clear error pointing to
``augment_manifest``.
The **resume-skip signal is size-only** (obstore's ``head()`` doesn't
surface ``x-amz-meta-sha256`` user metadata; see ``_head_matches``).
The per-file sha256 from the manifest is still written into
``x-amz-meta-sha256`` on each PUT for post-hoc audit via ``get``, but
it is *not* what decides "is this object already in R2?" on a
subsequent ``ingest()`` call. Content drift is caught at the catalog
layer instead: the manifest's content-addressed ``recording_hash``
changes whenever any file's bytes change, so a re-augmented session
with modified content lands as a fresh row rather than overwriting.
For backfill (ENG-1096), :func:`ingest_from_r2` reads the manifest +
file listing from R2 directly and runs only the register step (no
upload).
"""
from __future__ import annotations
import json
import logging
import os
import threading
from collections.abc import Callable
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Any, BinaryIO
from ursa.catalog import CatalogRowExists, RecordingRow
from ursa.register.payload import RegisterPayload, manifest_to_register_payload
from ursa.store import ObjectNotFound, ObjectStore
if TYPE_CHECKING:
from ursa.data_interface import DataInterface
logger = logging.getLogger("ursa.register.orchestrator")
__all__ = [
"IngestResult",
"UploadInterrupted",
"UploadProgress",
"ingest",
"ingest_from_r2",
]
[docs]
class UploadInterrupted(RuntimeError):
"""Raised internally when a :class:`_CancellableReader` sees
``is_active()`` flip ``True`` during a chunk read.
Propagates out of the backend's ``put`` call; :func:`ingest`
catches it and converts to ``IngestResult(paused=True)``. Mirrors
``uploader.r2_client.UploadInterrupted`` in data-engine, where the
same exception aborts a boto3 ``Callback=``-instrumented PUT.
Public for callers who want to differentiate "the wrapped read
was interrupted" from other errors (typically: don't); the
orchestrator catches it before bubbling to user code.
"""
class _CancellableReader:
"""File-body proxy whose ``read()`` polls ``is_active()`` before
each chunk.
obstore's streaming ``put`` reads :class:`BinaryIO` chunk-by-chunk
(especially in multipart mode for files above the threshold,
~8 MiB). Each chunk-read is preceded by a callback check; flipping
``is_active`` between two chunks raises :class:`UploadInterrupted`,
which propagates out of ``put`` and out of :func:`_upload_one` and
is caught by :func:`ingest`'s outer loop. For very small files
that complete in one ``read(-1)`` call, the cancellation point is
the pre-call ``is_active()`` check — they're small enough that
"either we abort instantly or we finish instantly" is acceptable.
Delegates non-``read`` attributes via ``__getattr__`` so seekable /
tell / readable probes the backend does at setup pass through to
the underlying file. Not subclassed from :class:`io.IOBase` because
obstore accepts duck-typed file-like objects and the explicit
delegate keeps the wrapper's surface tight.
"""
__slots__ = ("_fileobj", "_is_active", "_abort")
def __init__(
self,
fileobj: BinaryIO,
is_active: Callable[[], bool],
abort: threading.Event | None = None,
) -> None:
self._fileobj = fileobj
self._is_active = is_active
self._abort = abort
def read(self, size: int = -1) -> bytes:
# The orchestrator-wide ``abort`` event flips when *any* worker
# raised (e.g. an early R2 5xx). Without this check the
# ThreadPoolExecutor's __exit__ would block waiting for every
# other in-flight PUT to drain — minutes of wasted bandwidth on
# the error path (B3). The ``is_active`` callback is the
# recording-active cancel path; ``abort`` is the error path.
if self._abort is not None and self._abort.is_set():
raise UploadInterrupted("orchestrator aborted in-flight uploads")
if self._is_active():
raise UploadInterrupted("recording started; aborting in-flight upload")
return self._fileobj.read(size)
def readable(self) -> bool:
return True
def __getattr__(self, name: str) -> Any:
# Delegate seek / tell / fileno / close to the wrapped file so
# backend probes (obstore's content-length probe, fsspec hooks)
# see a fully functional file-like object.
#
# Dunder + private-attribute guard (B2): if anything probes
# ``__class__`` or another private name before ``__init__``
# has set ``_fileobj`` (traceback formatting, copy, debugger
# introspection), the naive delegation recurses into
# ``__getattr__`` forever. Raising AttributeError on the
# underscore prefix breaks the cycle.
if name.startswith("_"):
raise AttributeError(name)
return getattr(self._fileobj, name)
[docs]
@dataclass(frozen=True, slots=True)
class UploadProgress:
"""Per-file progress signal threaded to caller-supplied callbacks.
Daemon mappers (data-engine ENG-1131) turn this into
``extra={"uploader.*": ...}`` structured logs.
``bytes_uploaded`` counts only PUT bytes (excludes hash-skipped
files); ``bytes_processed`` counts every file's full size whether
uploaded or hash-skipped. The daemon's batch progress bar reads
``bytes_processed`` so resume-after-pause climbs through skips,
while ``bytes_uploaded`` drives the per-session network counter.
Both are *cumulative* across the events emitted for one session;
consumers that need deltas should diff against the prior event.
"""
files_done: int
files_total: int
bytes_uploaded: int
bytes_processed: int
bytes_total: int
current_file: str
[docs]
@dataclass(frozen=True, slots=True)
class IngestResult:
"""Return value of :func:`ingest` / :func:`ingest_from_r2`.
``paused`` is the daemon's signal to leave the session "pending"
and retry later. When ``paused=True``, ``recording`` is ``None``
and **no catalog rows were written**. When ``paused=False``,
``recording`` is the registered :class:`RecordingRow`.
"""
paused: bool
recording: RecordingRow | None
files_uploaded: int
files_skipped: int
bytes_uploaded: int
def _read_manifest(manifest_path: Path) -> dict[str, Any]:
with manifest_path.open() as f:
data = json.load(f)
if not isinstance(data, dict):
raise ValueError(f"manifest at {manifest_path!r} is not a JSON object")
return data
def _require_augmented(manifest: dict[str, Any], manifest_path: Path) -> None:
"""Reject manifests written by the dashboard's `_finalize_session`
*before* the uploader's ``augment_manifest`` step has run."""
# Truthy check (not ``not in``) so a manifest with
# ``"recording_hash": ""`` or ``"files": []`` fails *here* with
# the clear error, not later inside ``manifest_to_register_payload``
# with a less obvious one (S12).
missing = [k for k in ("recording_hash", "files") if not manifest.get(k)]
if missing:
raise ValueError(
f"manifest at {manifest_path!r} is missing {missing!r}; call "
f"augment_manifest(session, files) before ingest(). The base manifest "
f"written by the dashboard carries display metadata only; per-file "
f"sha256s + recording_hash are added at upload time."
)
def _r2_key_for(rec_id: str, rel_path: str) -> str:
"""R2 object key for a session-relative file (mirrors data-engine layout)."""
return f"recordings/{rec_id}/{rel_path}"
def _head_matches(store: ObjectStore, key: str, *, expected_size: int) -> bool:
"""True if R2 already has an object at ``key`` that we should skip.
**Size-based skip only.** obstore's ``head()`` does not surface
``x-amz-meta-sha256`` user metadata (see
``ursa.store.backends._obstore._meta_from_obstore``);
``ObjectMeta.sha256`` is always ``None`` from a head response.
We therefore key the skip decision on size match alone. The
original PUT stored the sha256 as user metadata on the object,
so content integrity is still verifiable post-hoc via ``get`` —
we just can't read it cheaply during a HEAD-only resume sweep.
Catalog-side defense in depth: the manifest's ``recording_hash``
is the content-addressed digest of every per-file sha256 (see
``data-engine/uploader/hashing.py:compute_recording_hash``). A
rig that re-augments a session whose contents changed produces a
different ``recording_hash`` and lands as a fresh catalog row;
there is no path where size-match-but-content-differ silently
overwrites an existing catalog row.
"""
try:
meta = store.head(key)
except ObjectNotFound:
return False
return meta.size == expected_size
def _process_one_file(
store: ObjectStore,
*,
key: str,
abs_path: str,
expected_size: int,
expected_sha256: str,
is_active: Callable[[], bool] | None = None,
abort: threading.Event | None = None,
overwrite: bool = False,
) -> tuple[str, int]:
"""HEAD-skip + maybe-upload one file. Worker-thread entry point.
Returns ``(action, bytes_uploaded)`` where action is ``"uploaded"``,
``"skipped"``, or ``"interrupted"``. Putting HEAD + PUT in one
work unit lets the pool parallelize the HEADs alongside the
uploads (matches the prior data-engine ``_process_one``).
``abort`` is the orchestrator-wide error-path signal — when set,
in-flight workers raise :class:`UploadInterrupted` on the next
chunk read so the executor's ``__exit__`` doesn't block draining
every other PUT to completion (B3).
Raises propagate (corrupt manifest, R2 5xx, etc.); the caller
treats unexpected raises as a hard error.
"""
if not overwrite and _head_matches(store, key, expected_size=expected_size):
return ("skipped", 0)
try:
n = _upload_one(
store,
key=key,
abs_path=abs_path,
sha256=expected_sha256,
is_active=is_active,
abort=abort,
)
except UploadInterrupted:
return ("interrupted", 0)
return ("uploaded", n)
def _upload_one(
store: ObjectStore,
*,
key: str,
abs_path: str,
sha256: str,
is_active: Callable[[], bool] | None = None,
abort: threading.Event | None = None,
) -> int:
"""Upload one file to R2. Returns bytes uploaded.
When ``is_active`` is supplied, the file body is wrapped in a
:class:`_CancellableReader` that lets a recording-start preempt
the in-flight PUT mid-chunk; ``abort`` is the orchestrator-wide
error-path signal that does the same when *any* worker raised.
Raises :class:`UploadInterrupted` in either case; the caller is
responsible for handling it.
"""
with open(abs_path, "rb") as f:
# Stream the file body to the store. ``put`` accepts BinaryIO
# and the obstore backend pipes it without buffering the whole
# file in memory. ``sha256=`` lands in x-amz-meta-sha256 so the
# next ``head()`` can hash-skip without re-fetching the body.
if is_active is not None or abort is not None:
body: BinaryIO = _CancellableReader( # type: ignore[assignment]
f,
is_active if is_active is not None else _never_active,
abort,
)
else:
body = f
store.put(key, body, sha256=sha256)
return os.path.getsize(abs_path)
def _never_active() -> bool:
"""Sentinel callback for ``_CancellableReader`` when only the
``abort`` signal is in play (no ``is_active`` callback supplied).
"""
return False
def _maybe_emit(progress: Callable[[UploadProgress], None] | None, evt: UploadProgress) -> None:
if progress is None:
return
try:
progress(evt)
except Exception: # noqa: BLE001
# Progress callbacks must not break the upload. Log and continue.
logger.exception("progress callback raised; continuing upload")
def _do_register(
data: DataInterface,
payload: RegisterPayload,
*,
participant_id: str,
participant_enrolled_at: int,
participant_display_name: str | None,
) -> RecordingRow:
"""Run the participant + recording + modality writes idempotently.
Each underlying ``register_*`` is check-and-add: identical re-write
returns the existing row, mismatched re-write raises
:class:`CatalogRowExists`.
Asymmetric handling by row type:
* **Recording / modality**: any payload divergence propagates as
``CatalogRowExists``. The orchestrator does NOT catch — let the
daemon's outer error path log + abort. Divergence here means a
real conflict (two rigs claiming the same recording_hash with
different metadata, etc.) and silently swallowing it would mask
a bug.
* **Participant**: divergence is swallowed and logged at INFO.
This is deliberate. The dashboard's free-form ``participant``
display-name slugs into a CatalogID that may collide across
sessions for the same person; the *first* session's
``enrolled_at`` + display-name metadata wins, subsequent ones
are no-ops. Tracked separately under ENG-1099 (proper
participant tracking with stable IDs).
"""
# 1. Participant (idempotent on exact match; divergent metadata
# logged + skipped, see docstring).
try:
data.register_participant(
participant_id=participant_id,
enrolled_at=participant_enrolled_at,
metadata={"display_name": participant_display_name} if participant_display_name else {},
)
except CatalogRowExists:
logger.info(
"participant %s already registered with divergent metadata; "
"skipping update (ENG-1099 follow-up).",
participant_id,
)
# 2. Recording.
data.register_recording(
recording_hash=payload.recording.recording_hash,
participant_ids=payload.recording.participant_ids,
start_time=payload.recording.start_time,
duration=payload.recording.duration,
device_info=payload.recording.device_info,
metadata=payload.recording.metadata,
)
# 3. Modalities.
for m in payload.modalities:
data.register_modality(
recording_hash=m.recording_hash,
modality=m.modality,
raw_storage_uri=m.raw_storage_uri,
storage_uri=m.storage_uri,
ingestion_status=m.ingestion_status,
format=m.format,
metadata=m.metadata,
)
return payload.recording
[docs]
def ingest(
data: DataInterface,
manifest_path: Path | str,
*,
source_dir: Path | str,
participant_id: str,
is_active: Callable[[], bool] | None = None,
progress: Callable[[UploadProgress], None] | None = None,
overwrite: bool = False,
collapse_modalities: bool = True,
max_concurrent_files: int = 4,
) -> IngestResult:
"""Upload a session's raw files to R2 and register the catalog rows.
Reads the (augmented) manifest at ``manifest_path``, uploads every
file listed in ``manifest['files']`` from ``source_dir`` to the raw
R2 bucket via hash-skip + streaming PUT, and registers the catalog
rows once every byte is uploaded. Cancellation cooperates with the
data-engine daemon's worker-activity watcher: when ``is_active()``
flips True between files, the function drains pending work and
returns ``paused=True`` *without* writing catalog rows.
Parameters
----------
data
A :class:`DataInterface`. Must have called
``enable_writes(roles=("assets_rw", "raw_rw"))`` first —
:func:`ingest` raises :class:`WritesNotEnabled` otherwise.
manifest_path
Path to the local ``manifest.json`` (typically
``<source_dir>/manifest.json``). Must already be augmented with
``recording_hash`` + ``files`` (data-engine's ``augment_manifest``).
source_dir
Root of the on-disk session, ``rec_<id>/`` typically. File
paths in ``manifest['files'][*].rel_path`` are resolved against
this directory.
participant_id
Catalog ID for the participant. Schema currently requires
non-empty ``participant_ids`` on a :class:`RecordingRow`; legacy
null-participant support is tracked separately (ENG-1099).
is_active
Optional cooperative-cancellation callback. Polled in three
places: (1) a pre-flight check before the upload pool runs,
(2) inside each ``_CancellableReader.read()`` before every
chunk the backend pulls from disk, and (3) on each completed
``as_completed`` future. Returning ``True`` aborts in-flight
PUTs mid-chunk (the next ``read`` raises
:class:`UploadInterrupted`), cancels pending futures, and
triggers a paused return *without writing catalog rows*.
**Thread-safety contract**: ``is_active`` is invoked from
``ThreadPoolExecutor`` worker threads as well as the main
thread. Callers must ensure their implementation is safe
under concurrent calls (B8). The data-engine integration
(``RecordingActivityWatcher.is_recording_active``) reads a
single bool and is fine; ad-hoc lambdas that mutate state
need their own synchronisation.
progress
Optional per-file progress callback. Receives an
:class:`UploadProgress` after each file's status is decided
(uploaded or skipped). Exceptions raised inside the callback
are caught and logged so they cannot break the upload.
overwrite
If ``True``, skip the hash-skip step and re-upload every file.
Mostly useful for tests; production callers leave it False.
collapse_modalities
When ``True`` (default), worker subdir names are collapsed to
their first underscore segment for the catalog modality name
(``camera_0`` → ``camera``). If two worker subdirs collapse to
the same short name (e.g. multi-camera rig ``camera_aaaa`` +
``camera_bbbb`` → both ``camera``), the function automatically
retries with ``collapse_modalities=False`` and logs at WARNING.
Modality names then equal the worker subdir verbatim.
max_concurrent_files
Worker-pool size for the upload step. Default 4 matches the
prior data-engine uploader (``uploader/config.yaml:71``); set
to 1 for deterministic test ordering.
"""
manifest_path = Path(manifest_path)
source_dir = Path(source_dir)
data._require_role("raw_rw")
data._require_role("assets_rw")
raw_store = data._raw_rw_store
# Belt-and-braces (N4): unreachable in practice because
# ``_require_role("raw_rw")`` above either passed (so
# ``_raw_rw_store`` was set by ``enable_writes``) or raised. The
# one case where it can be None is ``profile="local"``, which
# is the explicit branch immediately below; everywhere else
# this is a torn invariant worth shouting about.
if raw_store is None and data._profile != "local":
raise RuntimeError("internal: _raw_rw_store is None despite role check")
manifest = _read_manifest(manifest_path)
_require_augmented(manifest, manifest_path)
rec_id: str = manifest["recording_id"]
files: list[dict[str, Any]] = manifest["files"]
# Build the payload eagerly so manifest-shape errors surface before
# we put bytes on the wire. Auto-retry with collapse_modalities=False
# if the rig has two workers of the same modality family (e.g.
# ``camera_aaaa`` + ``camera_bbbb`` both collapse to ``camera`` and
# trip the collision check). Logs the fallback so operators can see
# which sessions ended up with verbatim worker_subdir modality names.
rel_paths = [f["rel_path"] for f in files]
try:
payload = manifest_to_register_payload(
manifest,
rel_paths,
participant_id=participant_id,
profile=data._profile,
collapse_modalities=collapse_modalities,
)
except ValueError as e:
if collapse_modalities and "modality-name collision" in str(e):
logger.warning(
"ingest: %s; retrying with collapse_modalities=False for rec=%s",
e,
rec_id,
)
payload = manifest_to_register_payload(
manifest,
rel_paths,
participant_id=participant_id,
profile=data._profile,
collapse_modalities=False,
)
else:
raise
bytes_total = sum(int(f["size"]) for f in files)
files_total = len(files)
files_uploaded = 0
files_skipped = 0
bytes_uploaded = 0
bytes_processed = 0 # uploaded + skipped, for batch-bar progress
paused = False
if raw_store is None:
# profile="local" — no real R2; emit a single info log and
# short-circuit to the register step. Used by tests that don't
# care about the upload mechanics but want to exercise the
# register flow.
logger.info("ingest: profile=local; skipping R2 upload step for rec=%s", rec_id)
elif is_active is not None and is_active():
# Pre-flight is_active check — if a recording is already active
# when ingest() is called, return paused immediately without
# touching the pool.
logger.info(
"ingest: is_active=True at start; pausing rec=%s without uploads",
rec_id,
)
return IngestResult(
paused=True,
recording=None,
files_uploaded=0,
files_skipped=0,
bytes_uploaded=0,
)
else:
# Parallel uploads via a worker pool; matches the prior
# data-engine ``_process_files`` semantics (default 4 workers).
# Each worker's HEAD-skip-or-PUT is independent; all share the
# same ``is_active`` callback so a worker start propagates to
# every in-flight :class:`_CancellableReader` at once.
max_workers = max(1, max_concurrent_files)
# Shared error-path signal (B3). Set when any worker raises so
# every other in-flight :class:`_CancellableReader` aborts on
# the next chunk read — otherwise the executor's ``__exit__``
# waits for every in-flight PUT to drain before propagating
# the exception (minutes of wasted bandwidth on an early 5xx).
abort = threading.Event()
with ThreadPoolExecutor(max_workers=max_workers) as pool:
futures = {
pool.submit(
_process_one_file,
raw_store,
key=_r2_key_for(rec_id, entry["rel_path"]),
abs_path=str(source_dir / entry["rel_path"]),
expected_size=int(entry["size"]),
expected_sha256=entry["sha256"],
is_active=is_active,
abort=abort,
overwrite=overwrite,
): entry
for entry in files
}
for fut in as_completed(futures):
if fut.cancelled():
continue
entry = futures[fut]
rel_path = entry["rel_path"]
try:
action, n = fut.result()
except Exception:
# Unexpected raises (R2 5xx, OS error). Set the
# shared abort signal so every in-flight reader
# aborts on its next chunk (B3) instead of running
# the rest of its PUT to completion under
# ThreadPoolExecutor.__exit__. Then cancel pending
# and re-raise for the daemon's error path
# (matches ``_abort_session`` semantics in the
# prior uploader).
abort.set()
for fut_to_cancel in futures:
fut_to_cancel.cancel()
raise
if action == "uploaded":
files_uploaded += 1
bytes_uploaded += n
bytes_processed += n
elif action == "skipped":
files_skipped += 1
# Hash-skipped files contribute their full size to
# bytes_processed (so the dashboard's batch bar
# advances on resume-after-pause) but not to
# bytes_uploaded (no network bytes flowed).
bytes_processed += int(entry["size"])
elif action == "interrupted":
# Mid-PUT abort — _CancellableReader fired because
# is_active flipped. Other in-flight workers will
# see the same flag on their next chunk; just mark
# paused and let `as_completed` finish collecting.
paused = True
_maybe_emit(
progress,
UploadProgress(
files_done=files_uploaded + files_skipped,
files_total=files_total,
bytes_uploaded=bytes_uploaded,
bytes_processed=bytes_processed,
bytes_total=bytes_total,
current_file=rel_path,
),
)
# Cancel pending the first time we observe paused;
# subsequent iterations skip the cancel sweep. Already-
# running futures continue and abort themselves via the
# shared ``is_active`` callback in their _CancellableReader.
if not paused and is_active is not None and is_active():
paused = True
for fut_to_cancel in futures:
fut_to_cancel.cancel()
elif paused and action == "interrupted":
# The interrupted future flipped paused this iteration —
# cancel pending now (one sweep, not per iteration).
for fut_to_cancel in futures:
fut_to_cancel.cancel()
if paused:
logger.info(
"ingest: paused rec=%s files_uploaded=%d files_skipped=%d before catalog write",
rec_id,
files_uploaded,
files_skipped,
)
return IngestResult(
paused=True,
recording=None,
files_uploaded=files_uploaded,
files_skipped=files_skipped,
bytes_uploaded=bytes_uploaded,
)
# Upload succeeded for every file. Register the catalog rows.
enrolled_at = payload.recording.start_time
display_name = payload.recording.metadata.get("participant_display_name")
recording = _do_register(
data,
payload,
participant_id=participant_id,
participant_enrolled_at=enrolled_at,
participant_display_name=display_name if isinstance(display_name, str) else None,
)
logger.info(
"ingest: rec=%s files_uploaded=%d files_skipped=%d bytes_uploaded=%d recording_hash=%s",
rec_id,
files_uploaded,
files_skipped,
bytes_uploaded,
recording.recording_hash,
)
return IngestResult(
paused=False,
recording=recording,
files_uploaded=files_uploaded,
files_skipped=files_skipped,
bytes_uploaded=bytes_uploaded,
)
[docs]
def ingest_from_r2(
data: DataInterface,
rec_id: str,
*,
participant_id: str,
) -> IngestResult:
"""Register a recording that's already in R2 — backfill / ENG-1096.
Reads ``recordings/<rec_id>/manifest.json`` from the raw bucket,
requires it to be augmented (``recording_hash`` + ``files``
present), and runs only the register step. No bytes uploaded.
For legacy recordings without an augmented manifest, a separate
code path (tracked alongside ENG-1096) lists R2 directly and
recomputes ``recording_hash`` from object sha256 user metadata. That
fallback is **not** implemented in this entrypoint to keep the
contract sharp; ENG-1096 may add it as a sibling function.
Raises ``ValueError`` for ``profile="local"`` — backfill only makes
sense against an R2-backed catalog.
"""
if data._profile == "local":
raise NotImplementedError(
"ingest_from_r2 requires an R2 profile (r2 / r2-test); "
"got profile='local'. There is no R2 to read the manifest "
"from — use `ingest()` with a local source_dir instead."
)
data._require_role("assets_rw")
# Reading the manifest from R2 needs the raw_ro role rather than
# raw_rw. Cached on the DataInterface so successive
# ``ingest_from_r2`` calls (the typical backfill loop pattern)
# reuse the underlying obstore connection pool instead of leaking
# a fresh handle per call (S3). No ``enable_writes`` gate needed —
# raw_ro is a read role.
if data._raw_ro_store is None:
from ursa.store import get_store
data._raw_ro_store = get_store("raw_ro", profile=data._profile)
raw_ro = data._raw_ro_store
manifest_key = f"recordings/{rec_id}/manifest.json"
try:
body = raw_ro.get(manifest_key)
except ObjectNotFound as e:
raise FileNotFoundError(
f"no manifest at r2://.../{manifest_key}; "
f"recording {rec_id!r} is missing or pre-augment-era. "
f"Legacy backfill without an augmented manifest is tracked "
f"alongside ENG-1096."
) from e
manifest = json.loads(body.decode("utf-8"))
if not isinstance(manifest, dict):
raise ValueError(f"manifest at {manifest_key!r} is not a JSON object")
_require_augmented(manifest, Path(manifest_key))
rel_paths = [f["rel_path"] for f in manifest["files"]]
# HEAD-probe every file in the manifest before writing catalog rows
# (S8). ``ingest()`` exists to enforce "no catalog rows pointing at
# non-existent R2 objects" via the upload+register atomicity;
# ``ingest_from_r2`` is the trust-the-manifest fast path, so we
# have to verify the bytes are actually there ourselves. Without
# this check, a session whose manifest landed but whose worker
# files are absent (interrupted upload, accidental delete,
# manifest-only fast-restore) would produce catalog rows that
# always 404 on download.
missing: list[str] = []
for rel in rel_paths:
try:
raw_ro.head(f"recordings/{rec_id}/{rel}")
except ObjectNotFound:
missing.append(rel)
if missing:
raise FileNotFoundError(
f"manifest at {manifest_key!r} lists {len(missing)} file(s) "
f"not present in R2 (first few: {missing[:5]!r}). Refusing "
f"to register catalog rows that would 404 on download. "
f"Either re-upload the session via `ingest()` or remove "
f"the stale manifest from R2."
)
payload = manifest_to_register_payload(
manifest,
rel_paths,
participant_id=participant_id,
profile=data._profile,
)
display_name = payload.recording.metadata.get("participant_display_name")
recording = _do_register(
data,
payload,
participant_id=participant_id,
participant_enrolled_at=payload.recording.start_time,
participant_display_name=display_name if isinstance(display_name, str) else None,
)
# ``files_skipped`` retains its ``ingest()`` semantics ("HEAD
# confirmed identical"); ``ingest_from_r2`` only verifies presence
# (S9). Zero it out so consumers branching on files_skipped don't
# get a misleading signal — the verification is HEAD-presence, not
# content match.
return IngestResult(
paused=False,
recording=recording,
files_uploaded=0,
files_skipped=0,
bytes_uploaded=0,
)