ursa.register.orchestrator¶
DataInterface.ingest() orchestrator (ENG-1130).
The orchestrator is the single high-level write surface that owns both
the R2 upload of raw segment files and the catalog write — atomically,
in the sense that catalog rows only appear after every modality byte
is in R2. This guarantees DataInterface.query() never returns a
row whose download would 404.
It inherits the data-engine uploader’s existing semantics:
Mid-file cooperative cancellation. Each file body is wrapped in a :class:
_CancellableReaderthat pollsis_active()before every chunk read. When it flipsTrue(a rig worker started recording), the nextread()raises :class:UploadInterrupted, the backend’sputpropagates it out, and :func:ingestreturnsIngestResult(paused=True, recording=None, ...). Latency is bounded by the chunks-in-flight that obstore has already pulled from the reader:chunk_size(5 MiB default) ×max_concurrency(12 default) ≈ 60 MiB of in-flight bytes per file. Those finish before the cancellation propagates; subsequent chunks fault on the nextread()call. So for an obstore-default 500 MiB PUT the worst-case preemption is “remaining time to flush the ~60 MiB already in flight” (~1–2 s at typical R2 throughput), not “remaining time to finish the whole file” (~30 s). For small single-read files the wrapper still polls before the read; sub- multipart files complete fast enough that they’re effectively atomic. No catalog rows are written on pause. If preemption latency matters more than throughput for a particular caller, passmax_concurrent_files=1and consider plumbingchunk_size/max_concurrencyoverrides throughObjectStore.put(not exposed in v0.1.0).Resume. Re-running with the same manifest is idempotent end-to-end: hash-skip +
_register_*skip-on-match. Partially- uploaded objects whoseputwas interrupted leave nothing intact in R2 (obstore aborts the multipart) and the next call re-uploads them via the normal path.
The orchestrator trusts the manifest’s pre-computed sha256s (written by
data-engine/uploader/manifest.py:augment_manifest before ingest is
called). This avoids re-hashing every file — per-file hashing already
happened at augment time. If the manifest is missing files or
recording_hash, :func:ingest raises a clear error pointing to
augment_manifest.
The resume-skip signal is size-only (obstore’s head() doesn’t
surface x-amz-meta-sha256 user metadata; see _head_matches).
The per-file sha256 from the manifest is still written into
x-amz-meta-sha256 on each PUT for post-hoc audit via get, but
it is not what decides “is this object already in R2?” on a
subsequent ingest() call. Content drift is caught at the catalog
layer instead: the manifest’s content-addressed recording_hash
changes whenever any file’s bytes change, so a re-augmented session
with modified content lands as a fresh row rather than overwriting.
For backfill (ENG-1096), :func:ingest_from_r2 reads the manifest +
file listing from R2 directly and runs only the register step (no
upload).
Module Contents¶
Classes¶
Return value of :func: |
|
Per-file progress signal threaded to caller-supplied callbacks. |
Functions¶
Upload a session’s raw files to R2 and register the catalog rows. |
|
Register a recording that’s already in R2 — backfill / ENG-1096. |
API¶
- class ursa.register.orchestrator.IngestResult[source]¶
Return value of :func:
ingest/ :func:ingest_from_r2.pausedis the daemon’s signal to leave the session “pending” and retry later. Whenpaused=True,recordingisNoneand no catalog rows were written. Whenpaused=False,recordingis the registered :class:RecordingRow.- paused: bool¶
None
- recording: ursa.catalog.RecordingRow | None¶
None
- files_uploaded: int¶
None
- files_skipped: int¶
None
- bytes_uploaded: int¶
None
- exception ursa.register.orchestrator.UploadInterrupted[source]¶
Bases:
RuntimeErrorRaised internally when a :class:
_CancellableReaderseesis_active()flipTrueduring a chunk read.Propagates out of the backend’s
putcall; :func:ingestcatches it and converts toIngestResult(paused=True). Mirrorsuploader.r2_client.UploadInterruptedin data-engine, where the same exception aborts a boto3Callback=-instrumented PUT. Public for callers who want to differentiate “the wrapped read was interrupted” from other errors (typically: don’t); the orchestrator catches it before bubbling to user code.Initialization
Initialize self. See help(type(self)) for accurate signature.
- class ursa.register.orchestrator.UploadProgress[source]¶
Per-file progress signal threaded to caller-supplied callbacks.
Daemon mappers (data-engine ENG-1131) turn this into
extra={"uploader.*": ...}structured logs.bytes_uploadedcounts only PUT bytes (excludes hash-skipped files);bytes_processedcounts every file’s full size whether uploaded or hash-skipped. The daemon’s batch progress bar readsbytes_processedso resume-after-pause climbs through skips, whilebytes_uploadeddrives the per-session network counter. Both are cumulative across the events emitted for one session; consumers that need deltas should diff against the prior event.- files_done: int¶
None
- files_total: int¶
None
- bytes_uploaded: int¶
None
- bytes_processed: int¶
None
- bytes_total: int¶
None
- current_file: str¶
None
- ursa.register.orchestrator.ingest(data: ursa.data_interface.DataInterface, manifest_path: pathlib.Path | str, *, source_dir: pathlib.Path | str, participant_id: str, is_active: collections.abc.Callable[[], bool] | None = None, progress: collections.abc.Callable[[ursa.register.orchestrator.UploadProgress], None] | None = None, overwrite: bool = False, collapse_modalities: bool = True, max_concurrent_files: int = 4) ursa.register.orchestrator.IngestResult[source]¶
Upload a session’s raw files to R2 and register the catalog rows.
Reads the (augmented) manifest at
manifest_path, uploads every file listed inmanifest['files']fromsource_dirto the raw R2 bucket via hash-skip + streaming PUT, and registers the catalog rows once every byte is uploaded. Cancellation cooperates with the data-engine daemon’s worker-activity watcher: whenis_active()flips True between files, the function drains pending work and returnspaused=Truewithout writing catalog rows.Parameters
data A :class:
DataInterface. Must have calledenable_writes(roles=("assets_rw", "raw_rw"))first — :func:ingestraises :class:WritesNotEnabledotherwise. manifest_path Path to the localmanifest.json(typically<source_dir>/manifest.json). Must already be augmented withrecording_hash+files(data-engine’saugment_manifest). source_dir Root of the on-disk session,rec_<id>/typically. File paths inmanifest['files'][*].rel_pathare resolved against this directory. participant_id Catalog ID for the participant. Schema currently requires non-emptyparticipant_idson a :class:RecordingRow; legacy null-participant support is tracked separately (ENG-1099). is_active Optional cooperative-cancellation callback. Polled in three places: (1) a pre-flight check before the upload pool runs, (2) inside each_CancellableReader.read()before every chunk the backend pulls from disk, and (3) on each completedas_completedfuture. ReturningTrueaborts in-flight PUTs mid-chunk (the nextreadraises :class:UploadInterrupted), cancels pending futures, and triggers a paused return without writing catalog rows.**Thread-safety contract**: ``is_active`` is invoked from ``ThreadPoolExecutor`` worker threads as well as the main thread. Callers must ensure their implementation is safe under concurrent calls (B8). The data-engine integration (``RecordingActivityWatcher.is_recording_active``) reads a single bool and is fine; ad-hoc lambdas that mutate state need their own synchronisation.
progress Optional per-file progress callback. Receives an :class:
UploadProgressafter each file’s status is decided (uploaded or skipped). Exceptions raised inside the callback are caught and logged so they cannot break the upload. overwrite IfTrue, skip the hash-skip step and re-upload every file. Mostly useful for tests; production callers leave it False. collapse_modalities WhenTrue(default), worker subdir names are collapsed to their first underscore segment for the catalog modality name (camera_0→camera). If two worker subdirs collapse to the same short name (e.g. multi-camera rigcamera_aaaa+camera_bbbb→ bothcamera), the function automatically retries withcollapse_modalities=Falseand logs at WARNING. Modality names then equal the worker subdir verbatim. max_concurrent_files Worker-pool size for the upload step. Default 4 matches the prior data-engine uploader (uploader/config.yaml:71); set to 1 for deterministic test ordering.
- ursa.register.orchestrator.ingest_from_r2(data: ursa.data_interface.DataInterface, rec_id: str, *, participant_id: str) ursa.register.orchestrator.IngestResult[source]¶
Register a recording that’s already in R2 — backfill / ENG-1096.
Reads
recordings/<rec_id>/manifest.jsonfrom the raw bucket, requires it to be augmented (recording_hash+filespresent), and runs only the register step. No bytes uploaded.For legacy recordings without an augmented manifest, a separate code path (tracked alongside ENG-1096) lists R2 directly and recomputes
recording_hashfrom object sha256 user metadata. That fallback is not implemented in this entrypoint to keep the contract sharp; ENG-1096 may add it as a sibling function.Raises
ValueErrorforprofile="local"— backfill only makes sense against an R2-backed catalog.