Manually ingesting SSD-copied sessions¶
Task: rescue sessions whose rig had no network — offload them to an SSD, plug it into a credentialed machine, point Ursa at the folder, and let it ingest every session it can into R2 + the Ursa catalog, using the same code path the data-engine daemon runs.
Note
This tutorial writes to production R2 — it needs both the assets_rw and
raw_rw roles, resolved from 1Password (see setup and the
constellation-utils secrets docs). The end-to-end run below is
unverified — pending example run against live credentials.
The data-engine uploader normally streams a session into R2 from the rig
itself, watches the manifest, and registers the catalog rows once every
file lands. When the rig has no network (offline rig, broken Wi-Fi, lab
with restricted egress) the uploader stays idle and the bytes sit on
local disk. The recovery path is to copy the session directories to an
external SSD, plug it into a machine with R2 credentials, and run
DataInterface.ingest() against each one — same code path the daemon
uses, just invoked by hand.
Getting the sessions onto the SSD¶
On the rig, use the data-engine deoffload tool rather than a raw
cp/rsync — it only ever writes to external/removable volumes
(never the rig’s boot disk), skips any recording still holding a
session.lock, and deletes each local copy only after the copy is
verified:
deoffload --dry-run # show the plan; transfer nothing
deoffload # guided: pick the SSD, confirm, offload + delete
deoffload --keep # copy only; leave the local copies in place
deoffload preserves each session directory verbatim — including the
manifest.json the daemon wrote at session-start — which is exactly what
ingest() needs on the other end.
What the SSD should contain¶
The ingest below scans a parent folder and treats every immediate subdirectory as a candidate session. A single-node rescue looks like:
/Volumes/rescue-ssd/
├── rec_20260507_143022_a1b2/
│ ├── manifest.json
│ ├── eeg_main/
│ │ └── ...
│ └── camera_webcam_0/
│ └── ...
├── rec_20260507_151140_c3d4/
│ ├── manifest.json
│ └── ...
└── ...
Each session’s manifest.json must carry schema_version >= 4,
recording_hash, node_hostname, and a populated files list — every
daemon since the v0.1.7 follow-up writes those live, so a session that
recorded normally will have them. The scan classifies each subdirectory:
has a
manifest.json→ ingestable, attempted below.a
rec_*dir with nomanifest.json→ manifest-less (a legacy session from before per-node manifests existed, or one whose recording aborted at startup). The scan can’t ingest it as-is, so it reports these separately rather than silently dropping them. Recover one by synthesizing a v4 manifest from the raw files on disk with the data-engine reconstruct script, then re-run the scan:# in a data-engine checkout, against the SSD mount uv run python scripts/reconstruct_legacy_manifest.py \ --data-dir /Volumes/rescue-ssd --yes
It walks the recordings dir, hashes every file with the same rule the rig uses (so the synthesized
recording_hashmatches byte-for-byte), and writes amanifest.json+.rig_completesentinel. The result carries areconstructionblock (requires_participant_backfill: true,reconstructed_ended_at: true) whoseended_atis a copy-completion proxy, not a true recording end — downstream consumers must treat it as such.
For a dual-node rig (Mac mini + EEG mini-PC), each node has its own
session directory and its own manifest.json. Copy both onto the SSD
under distinct folder names (e.g. rec_..._macmini/ and
rec_..._eegpc/) so the scan picks up each one — they ingest
independently and Ursa merges them by recording_hash server-side
regardless of the local directory names.
Running the ingest¶
Point the script at the SSD root. It scans for sessions, ingests each one independently (a failure on one never aborts the rest), and prints a summary of what succeeded, what was skipped, and what failed.
import json
from pathlib import Path
from ursa import DataInterface
root = Path("/Volumes/rescue-ssd")
data = DataInterface(profile="r2")
data.enable_writes(roles=("assets_rw", "raw_rw"))
def participant_for(session: Path, manifest: dict) -> str | None:
"""Mirror the daemon: read <session>/participant.txt, unless this was
a test recording. Returns None when absent/empty or testing_mode."""
if manifest.get("testing_mode"):
return None
txt = session / "participant.txt"
if not txt.exists():
return None
return txt.read_text().strip() or None
# Classify each immediate subdirectory of the SSD root.
sessions: list[Path] = []
manifestless: list[Path] = []
for d in sorted(p for p in root.iterdir() if p.is_dir()):
if (d / "manifest.json").exists():
sessions.append(d)
elif d.name.startswith("rec_"):
manifestless.append(d)
succeeded = [] # list[tuple[Path, IngestResult]]
failed = [] # list[tuple[Path, Exception]]
for session in sessions:
try:
manifest = json.loads((session / "manifest.json").read_text())
result = data.ingest(
manifest_path=session / "manifest.json",
source_dir=session,
participant=participant_for(session, manifest),
)
except Exception as e: # noqa: BLE001 — collect + continue, report at the end
# A bad manifest (corrupt JSON, unreadable file) or a failed
# upload is recorded and skipped — one session never aborts the rest.
failed.append((session, e))
continue
succeeded.append((session, result))
# --- summary ---------------------------------------------------------
print(f"\n=== ingest summary: {len(sessions)} session(s) found ===")
for session, r in succeeded:
print(
f" ok {session.name}: {r.files_uploaded} uploaded, "
f"{r.files_skipped} skipped, {r.bytes_uploaded:,} B "
f"({r.recording.recording_hash[:12]}…)"
)
for session, e in failed:
print(f" FAIL {session.name}: {type(e).__name__}: {e}")
for d in manifestless:
print(f" skip {d.name}: no manifest.json — reconstruct it first (see above)")
print(
f"\n{len(succeeded)} ok · {len(failed)} failed · "
f"{len(manifestless)} need reconstruction"
)
A few things worth knowing about each ingest() call:
ingest()HEAD-skips files already present in R2 with a matching size, so re-running the whole scan after a partial run (network blip, laptop sleep, one session that failed) is safe and cheap — already-uploaded files are skipped and only the missing bytes transfer.result.recordingis the registeredRecordingRow— always populated here. (ingest()can instead returnresult.paused = Truewithrecording = Noneand no catalog rows written, but only when you pass anis_active=callback — the daemon’s “back off while a recording is live” hook. This manual scan never wires one, so every session runs to completion.)participant_formirrors the daemon’sparticipant.txtconvention: it passes the raw operator-typed name (Ursa slugifies internally), orNonewhen there’s noparticipant.txtor the session was a test. ANoneingest still registers the recording row, withparticipant_idsleft for another node’s ingest to fill via themerge_participantspolicy. If you know a participant the file doesn’t capture, set it per-session instead of relying on the file.The per-node manifest is written to R2 alongside the data, so a later
ingest_from_r2()can re-derive catalog rows without the SSD.
Watching progress¶
For a long upload, hook the progress= callback per session. Each event
reports cumulative bytes; divide by bytes_total for a monotonic
0.0 → 1.0:
from ursa.register import UploadProgress
def on_progress(evt: UploadProgress) -> None:
pct = evt.bytes_processed / max(evt.bytes_total, 1)
print(f"[{pct:5.1%}] {evt.files_done}/{evt.files_total} {evt.current_file}")
Then pass progress=on_progress to the data.ingest(...) call in the
scan loop above.
You can also subscribe to the ursa.register.orchestrator logger for
ambient observability without threading progress= through every call
site — same pattern as the download tutorial.
What shows up in the FAIL column¶
The loop above catches per-session exceptions and reports them in the summary instead of aborting the run. The ones you’ll actually see:
WritesNotEnabled— you forgotenable_writes(roles=("assets_rw", "raw_rw")). Production ingest needs both roles:raw_rwfor the R2 PUTs,assets_rwfor the catalog writes. This fails every session, so fix it once and re-run.UnsupportedManifestVersion— manifest predatesschema_version=4. This shouldn’t happen for any session recorded on current data-engine; surface the rig + recording date so we can check what wrote it.ValueError: ... missing 'recording_hash' / 'files'— the daemon’s manifest aggregator never finished, or the manifest was hand-edited. The session has no uploadable content from this node; check whether a different node holds the data or whether the recording aborted at startup.ObjectStoreError— non-transient R2 failure (permissions, bad key). The failing key is in the message. Transient 5xx are retried internally with exponential backoff and only surface after the retry budget is exhausted. Because failures are isolated per session, re-running the scan retries only what didn’t complete.
Note
If you’d rather branch on specific failure types than catch Exception,
all of these are importable: WritesNotEnabled from ursa (or
ursa.catalog), UnsupportedManifestVersion from ursa.register, and
ObjectStoreError from ursa.store. The bare ValueError for a missing
recording_hash / files has no dedicated subclass.