ursa.register.orchestrator

DataInterface.ingest() orchestrator (ENG-1130).

The orchestrator is the single high-level write surface that owns both the R2 upload of raw segment files and the catalog write — atomically, in the sense that catalog rows only appear after every modality byte is in R2. This guarantees DataInterface.query() never returns a row whose download would 404.

It inherits the data-engine uploader’s existing semantics:

  • Mid-file cooperative cancellation. Each file body is wrapped in a :class:_CancellableReader that polls is_active() before every chunk read. When it flips True (a rig worker started recording), the next read() raises :class:UploadInterrupted, the backend’s put propagates it out, and :func:ingest returns IngestResult(paused=True, recording=None, ...). Latency is bounded by the chunks-in-flight that obstore has already pulled from the reader: chunk_size (5 MiB default) × max_concurrency (12 default) ≈ 60 MiB of in-flight bytes per file. Those finish before the cancellation propagates; subsequent chunks fault on the next read() call. So for an obstore-default 500 MiB PUT the worst-case preemption is “remaining time to flush the ~60 MiB already in flight” (~1–2 s at typical R2 throughput), not “remaining time to finish the whole file” (~30 s). For small single-read files the wrapper still polls before the read; sub- multipart files complete fast enough that they’re effectively atomic. No catalog rows are written on pause. If preemption latency matters more than throughput for a particular caller, pass max_concurrent_files=1 and consider plumbing chunk_size / max_concurrency overrides through ObjectStore.put (not exposed in v0.1.0).

  • Resume. Re-running with the same manifest is idempotent end-to-end: hash-skip + _register_* skip-on-match. Partially- uploaded objects whose put was interrupted leave nothing intact in R2 (obstore aborts the multipart) and the next call re-uploads them via the normal path.

The orchestrator trusts the manifest’s pre-computed sha256s (written by data-engine/uploader/manifest.py:augment_manifest before ingest is called). This avoids re-hashing every file — per-file hashing already happened at augment time. If the manifest is missing files or recording_hash, :func:ingest raises a clear error pointing to augment_manifest.

The resume-skip signal is size-only (obstore’s head() doesn’t surface x-amz-meta-sha256 user metadata; see _head_matches). The per-file sha256 from the manifest is still written into x-amz-meta-sha256 on each PUT for post-hoc audit via get, but it is not what decides “is this object already in R2?” on a subsequent ingest() call. Content drift is caught at the catalog layer instead: the manifest’s content-addressed recording_hash changes whenever any file’s bytes change, so a re-augmented session with modified content lands as a fresh row rather than overwriting.

For backfill (ENG-1096), :func:ingest_from_r2 reads the manifest + file listing from R2 directly and runs only the register step (no upload).

Module Contents

Classes

IngestResult

Return value of :func:ingest / :func:ingest_from_r2.

UploadProgress

Per-file progress signal threaded to caller-supplied callbacks.

Functions

ingest

Upload a session’s raw files to R2 and register the catalog rows.

ingest_from_r2

Register a recording that’s already in R2 — backfill / ENG-1096.

API

class ursa.register.orchestrator.IngestResult[source]

Return value of :func:ingest / :func:ingest_from_r2.

paused is the daemon’s signal to leave the session “pending” and retry later. When paused=True, recording is None and no catalog rows were written. When paused=False, recording is the registered :class:RecordingRow.

paused: bool

None

recording: ursa.catalog.RecordingRow | None

None

files_uploaded: int

None

files_skipped: int

None

bytes_uploaded: int

None

exception ursa.register.orchestrator.UploadInterrupted[source]

Bases: RuntimeError

Raised internally when a :class:_CancellableReader sees is_active() flip True during a chunk read.

Propagates out of the backend’s put call; :func:ingest catches it and converts to IngestResult(paused=True). Mirrors uploader.r2_client.UploadInterrupted in data-engine, where the same exception aborts a boto3 Callback=-instrumented PUT. Public for callers who want to differentiate “the wrapped read was interrupted” from other errors (typically: don’t); the orchestrator catches it before bubbling to user code.

Initialization

Initialize self. See help(type(self)) for accurate signature.

class ursa.register.orchestrator.UploadProgress[source]

Per-file progress signal threaded to caller-supplied callbacks.

Daemon mappers (data-engine ENG-1131) turn this into extra={"uploader.*": ...} structured logs.

bytes_uploaded counts only PUT bytes (excludes hash-skipped files); bytes_processed counts every file’s full size whether uploaded or hash-skipped. The daemon’s batch progress bar reads bytes_processed so resume-after-pause climbs through skips, while bytes_uploaded drives the per-session network counter. Both are cumulative across the events emitted for one session; consumers that need deltas should diff against the prior event.

files_done: int

None

files_total: int

None

bytes_uploaded: int

None

bytes_processed: int

None

bytes_total: int

None

current_file: str

None

ursa.register.orchestrator.ingest(data: ursa.data_interface.DataInterface, manifest_path: pathlib.Path | str, *, source_dir: pathlib.Path | str, participant_id: str, is_active: collections.abc.Callable[[], bool] | None = None, progress: collections.abc.Callable[[ursa.register.orchestrator.UploadProgress], None] | None = None, overwrite: bool = False, collapse_modalities: bool = True, max_concurrent_files: int = 4) ursa.register.orchestrator.IngestResult[source]

Upload a session’s raw files to R2 and register the catalog rows.

Reads the (augmented) manifest at manifest_path, uploads every file listed in manifest['files'] from source_dir to the raw R2 bucket via hash-skip + streaming PUT, and registers the catalog rows once every byte is uploaded. Cancellation cooperates with the data-engine daemon’s worker-activity watcher: when is_active() flips True between files, the function drains pending work and returns paused=True without writing catalog rows.

Parameters

data A :class:DataInterface. Must have called enable_writes(roles=("assets_rw", "raw_rw")) first — :func:ingest raises :class:WritesNotEnabled otherwise. manifest_path Path to the local manifest.json (typically <source_dir>/manifest.json). Must already be augmented with recording_hash + files (data-engine’s augment_manifest). source_dir Root of the on-disk session, rec_<id>/ typically. File paths in manifest['files'][*].rel_path are resolved against this directory. participant_id Catalog ID for the participant. Schema currently requires non-empty participant_ids on a :class:RecordingRow; legacy null-participant support is tracked separately (ENG-1099). is_active Optional cooperative-cancellation callback. Polled in three places: (1) a pre-flight check before the upload pool runs, (2) inside each _CancellableReader.read() before every chunk the backend pulls from disk, and (3) on each completed as_completed future. Returning True aborts in-flight PUTs mid-chunk (the next read raises :class:UploadInterrupted), cancels pending futures, and triggers a paused return without writing catalog rows.

**Thread-safety contract**: ``is_active`` is invoked from
``ThreadPoolExecutor`` worker threads as well as the main
thread. Callers must ensure their implementation is safe
under concurrent calls (B8). The data-engine integration
(``RecordingActivityWatcher.is_recording_active``) reads a
single bool and is fine; ad-hoc lambdas that mutate state
need their own synchronisation.

progress Optional per-file progress callback. Receives an :class:UploadProgress after each file’s status is decided (uploaded or skipped). Exceptions raised inside the callback are caught and logged so they cannot break the upload. overwrite If True, skip the hash-skip step and re-upload every file. Mostly useful for tests; production callers leave it False. collapse_modalities When True (default), worker subdir names are collapsed to their first underscore segment for the catalog modality name (camera_0camera). If two worker subdirs collapse to the same short name (e.g. multi-camera rig camera_aaaa + camera_bbbb → both camera), the function automatically retries with collapse_modalities=False and logs at WARNING. Modality names then equal the worker subdir verbatim. max_concurrent_files Worker-pool size for the upload step. Default 4 matches the prior data-engine uploader (uploader/config.yaml:71); set to 1 for deterministic test ordering.

ursa.register.orchestrator.ingest_from_r2(data: ursa.data_interface.DataInterface, rec_id: str, *, participant_id: str) ursa.register.orchestrator.IngestResult[source]

Register a recording that’s already in R2 — backfill / ENG-1096.

Reads recordings/<rec_id>/manifest.json from the raw bucket, requires it to be augmented (recording_hash + files present), and runs only the register step. No bytes uploaded.

For legacy recordings without an augmented manifest, a separate code path (tracked alongside ENG-1096) lists R2 directly and recomputes recording_hash from object sha256 user metadata. That fallback is not implemented in this entrypoint to keep the contract sharp; ENG-1096 may add it as a sibling function.

Raises ValueError for profile="local" — backfill only makes sense against an R2-backed catalog.