Offline ingest — manual NAS-side ingest when a rig has been offline¶
Danger
ADMIN-ONLY OPERATION. This procedure registers prod catalog rows and (for Path A) uploads raw bytes into the prod recordings bucket. Read it end-to-end before running, and announce the window in #engineering-support if you’ll be ingesting more than a handful of recordings.
When to use this¶
A rig loses network mid-session, or a rig is on a slow uplink and the uploader queue backs up. The operator pulls recordings off the rig via the NAS (or a USB drive) and brings them back to a laptop or office machine that does have R2 access. This runbook is how to get those local-only recordings into the catalog without waiting for the rig’s uploader to catch up later.
If the recordings already finished uploading to R2 from the rig (no NAS detour) and only the catalog row is missing, see Path B below.
Prerequisites¶
Ursa ≥ v0.3.0 installed on the machine you’re ingesting from (until
v0.3.0is tagged, pin thev0.3.0-devintegration branch):uv add 'ursa @ git+ssh://git@github.com/constellationlab/ursa@v0.3.0-dev'
1Password CLI (
op) authenticated for theEngineeringvault. R2 credentials flow throughconstellation-utils→op read→ in-memory; nothing is written to disk. On a laptop, the desktop-app integration is the cleanest path (Settings → Developer → Integrate with 1Password CLI).The recording directory is reachable at a local path. Typical layouts:
NAS mount:
/Volumes/Shared_Drive/constellation-data/recordings/rec_*/on macOS.USB drive: any path; subdirectories are named
rec_<YYYYMMDD>_<HHMMSS>_<hex>/.
Each recording directory must contain:
At least one worker subdirectory (
camera_*,eeg_*,mic_*,notes_*, etc.) with data files.participant.txtat the recording root if a participant is known (post-v0.1.7 convention — participant lives outside the manifest body).Either a top-level
manifest.json(rig-written aggregate) or per-node manifests under_node_manifests/<hostname>.json(dual-node sessions).
If the recording has no manifest at all, use Path C (below) to reconstruct first.
Config selection — no env vars¶
Ursa does not read a URSA_CONFIG env var. To point at the prod catalog, construct the DataInterface with the explicit profile="r2" kwarg. The catalog prefix is resolved at Catalog.open from the active-catalog pointer (<assets-bucket>/ursa/ACTIVE.json, src/ursa/catalog/_pointer.py), so you get whatever prefix is live — no override needed:
from ursa import DataInterface
data = DataInterface(profile="r2")
For a non-default catalog (test runs against r2-test, or a one-off staging prefix), pass config_path= explicitly:
from pathlib import Path
data = DataInterface(profile="r2", config_path=Path("/tmp/my-override.yaml"))
For prod-bucket ingest (the offline-ingest use case), the bare DataInterface(profile="r2") is what you want.
Path A — upload + register (files local only)¶
The recording’s bytes are on your local NAS or USB drive; they are not in R2 yet. You need to both upload them and write the catalog row.
from pathlib import Path
from ursa import DataInterface
data = DataInterface(profile="r2")
data.enable_writes(roles=("assets_rw", "raw_rw")) # ingest() uploads raw bytes — needs raw_rw
rec_dir = Path("/Volumes/Shared_Drive/constellation-data/recordings/rec_20260520_195628_b1dd")
manifest = rec_dir / "manifest.json"
participant_file = rec_dir / "participant.txt"
participant = participant_file.read_text().strip() if participant_file.exists() else None
result = data.ingest(
manifest_path=manifest,
source_dir=rec_dir,
participant=participant,
)
print(f"ingested: {result.recording.recording_hash}")
print(f"files uploaded: {result.files_uploaded}, skipped: {result.files_skipped}")
What this does:
Uploads every raw segment file under
rec_dirtos3://constellation-data/recordings/<rec_id>/...(or HEAD-checks + skips files that are already present —ingest()is the upload-or-skip-atomically variant).Writes the
RecordingRow+ModalityRow+ParticipantRowentries into the prod catalog at the active-catalog pointer’s prefix.Returns an
IngestResultwithrecording,files_uploaded,files_skipped,bytes_uploaded, andpaused(used by the rig-side daemon; ignore for one-off offline ingest).
By default ingest() refuses to overwrite an existing recording row. If you’re re-running ingest because the previous attempt failed mid-upload, pass overwrite=True:
result = data.ingest(
manifest_path=manifest,
source_dir=rec_dir,
participant=participant,
overwrite=True, # clobber a partial prior row
)
Path B — register only (files already in R2)¶
The rig’s uploader uploaded the recording’s bytes to R2 but the catalog row was never written (e.g. the daemon crashed after the multipart PUT completed but before the register_recording call). Use ingest_from_r2 to write the catalog row from the existing R2 state:
from ursa import DataInterface
data = DataInterface(profile="r2")
data.enable_writes()
result = data.ingest_from_r2(
rec_id="rec_20260520_195628_b1dd",
participant="Aria Lin", # or None if unknown
)
print(f"registered: {result.recording.recording_hash}")
What this does:
Reads
s3://constellation-data/recordings/<rec_id>/manifest.jsonfirst (legacy single-key location).If that’s missing, falls back to merging
recordings/<rec_id>/_node_manifests/*.jsonin memory (post-PR-76 multi-node backfill).HEAD-probes every file listed in the manifest to verify R2 has the bytes; refuses to register if anything is missing (refusing to write catalog rows that would 404 on download).
Writes the same
RecordingRow/ModalityRow/ParticipantRowentries as Path A — but skips the upload phase.
Use Path B when you know the bytes are on R2 (verified via aws s3 ls or equivalent). Use Path A when you have the bytes locally and aren’t sure what made it to R2.
Path C — no manifest: reconstruct then ingest¶
The recording has data files but no manifest.json (either at the top level or under _node_manifests/). Common causes: a v0.1.0-era recording that predates the daemon’s manifest-write path, or a dashboard-written placeholder stub with empty files[].
Reconstruct the manifest first with examples/reconstruct_missing_manifests.py:
cd /path/to/ursa
uv run python examples/reconstruct_missing_manifests.py \
--profile r2 \
--rec-id rec_20260520_195628_b1dd \
--yes
The script:
Lists every object under
recordings/<rec_id>/in R2.Streams each file through
hashlib.sha256(one GET per file; expensive for multi-GB recordings).Derives
recording_hashfrom the sorted per-file sha256s.Recovers the participant: reads
recordings/<rec_id>/participant.txtfrom R2 and writes the operator-typed display name into the manifest’sparticipantfield. Only when that file is absent or empty/whitespace-only does it fall back to the"__unknown__"sentinel, in which casereconstruction.reconstructed_participantis setTrue(it isFalse/absent when a real name was recovered). Note:ingest_from_r2does not readmanifest["participant"]— to attach the name to a catalog row, pass theparticipant=kwarg on theingest_from_r2call (or usescripts/backfill_participants.py).PUTs the reconstructed
manifest.jsonback torecordings/<rec_id>/manifest.json.
After reconstruction lands, run Path B (ingest_from_r2) to register the catalog row.
The reconstruction script is idempotent + resumable: re-running over a directory that already has a valid manifest is a no-op (one small GET + parse). Safe to interrupt and re-run; only the in-flight rec_id loses progress.
Progress reporting¶
For large ingests, surface progress via the progress= callback (Path A) or the structured ursa.register.orchestrator logger:
from ursa import DataInterface, UploadProgress
def on_progress(p: UploadProgress) -> None:
print(f" {p.current_file}: {p.bytes_uploaded:,}/{p.bytes_total:,}")
result = data.ingest(
manifest_path=manifest,
source_dir=rec_dir,
participant=participant,
progress=on_progress,
)
The download counterpart (DataInterface.download(..., progress=...)) fires DownloadProgress events at six phase boundaries (start, plan_done, modality_start, file_done, modality_done, finish); see the downloading tutorial for details.
Batch loop — many recordings at once¶
When pulling a backlog off the NAS, loop over recording directories:
from pathlib import Path
from ursa import DataInterface
data = DataInterface(profile="r2")
data.enable_writes(roles=("assets_rw", "raw_rw")) # ingest() uploads raw bytes — needs raw_rw
nas_root = Path("/Volumes/Shared_Drive/constellation-data/recordings")
for rec_dir in sorted(nas_root.glob("rec_*")):
manifest = rec_dir / "manifest.json"
if not manifest.exists():
print(f"SKIP {rec_dir.name}: no manifest (run reconstruct first)")
continue
pfile = rec_dir / "participant.txt"
participant = pfile.read_text().strip() if pfile.exists() else None
try:
result = data.ingest(
manifest_path=manifest,
source_dir=rec_dir,
participant=participant,
)
print(f"OK {rec_dir.name}: {result.files_uploaded} uploaded, {result.files_skipped} skipped")
except Exception as exc: # noqa: BLE001 — broad: we want to keep going
print(f"FAIL {rec_dir.name}: {type(exc).__name__}: {exc}")
Each iteration is independent; a single failed recording doesn’t poison the rest. Failures land in the operator’s stdout; re-run the loop after fixing the root cause (the upload-or-skip-atomically contract makes the second attempt cheap).
Conflict policies¶
The default overwrite=False is the safe path: a re-ingest of an existing recording aborts. To extend an existing recording’s catalog row with new modalities or participants instead of clobbering, use the lower-level registration verbs directly:
Merge a recording’s duration into an existing row:
data.register_recording(..., on_conflict="extend")— the backfill verb that updatesdurationtomax(existing, new). This touches therecordingstable only; it does not add modality rows. To add a new modality to an existing recording, calldata.register_modality(recording_hash=..., ..., on_conflict="error")(or"overwrite"to replace) — modalities are a separate table and are not touched byregister_recording. (register_modalityaccepts onlyon_conflict="error"/"overwrite"; it has no"extend"policy.)Add a participant to an existing recording:
data.register_recording(..., on_conflict="merge_participants"). Set-union ofparticipant_ids; recording-level fields must otherwise match.Overwrite duration/ended_at:
data.register_recording(..., on_conflict="overwrite")(used by the rebuild script’s pass-2 duration backfill).
Use on_conflict="overwrite" only when you’ve confirmed the existing row is wrong. The catalog-rebuild runbook (catalog-rebuild.md) describes when each policy applies during a fresh prefix bootstrap.
Verification¶
After an ingest, confirm the catalog has the rows you expect:
from ursa import DataInterface
data = DataInterface(profile="r2")
rec = data.get_recording("<recording_hash>")
# RecordingRow is keyed by recording_hash; the legacy data-engine rec_id is
# preserved in metadata["manifest_recording_id"].
legacy_id = (rec.metadata or {}).get("manifest_recording_id")
print(f"recording: rec_id={legacy_id}, participants={rec.participant_ids}")
mods = data.list_modalities(where={"recording_hash": rec.recording_hash})
print(f"modalities: {[(m.modality, m.worker_id) for m in mods]}")
Note (post-PR-#73 modality schema):
QueryResult.modalitiesdict keys are now"eeg"(single-worker) or"eeg_a"/"eeg_b"(dual-worker disambiguated) — themodalitycolumn is the device family andworker_idcarries the disambiguating suffix. Old notebooks that worked around the pre-v0.2.0"eeg_4559..."shape withm.split("_")[0]can drop the workaround.
Round-trip a file to confirm bytes resolve:
import tempfile
from pathlib import Path
results = data.query(recording_hash=rec.recording_hash, modalities=["camera"], status="raw")
data.download(results, dest=tempfile.mkdtemp(prefix="ursa_verify_"))
Dual-node caveat¶
If the recording was one half of a dual-node session (e.g. the green-mantis Mac mini host + green-motor EEG mini-PC pair) and you only have the host’s bytes on the NAS, ingest that half normally. The peer half can be ingested later under the same recording_hash and ursa merges via the merge_participants / extend conflict policies (post-PR-#61). Do not fabricate the missing half; let it land as a separate ingest when the peer comes back online.
See also¶
Catalog rebuild — the bulk re-derivation path used during the v0.2.0 cutover. Use that when you’re rebuilding the catalog from R2 raw bytes, not when you’re ingesting individual offline recordings.
Downloading tutorial — the
progress=callback contract +ursa.downloadlogger.Concepts — the day-to-day read-side API; this runbook is the write-side counterpart for one-off offline ingest.