Manually ingesting SSD-copied sessions

Task: rescue sessions whose rig had no network — offload them to an SSD, plug it into a credentialed machine, point Ursa at the folder, and let it ingest every session it can into R2 + the Ursa catalog, using the same code path the data-engine daemon runs.

Note

This tutorial writes to production R2 — it needs both the assets_rw and raw_rw roles, resolved from 1Password (see setup and the constellation-utils secrets docs). The end-to-end run below is unverified — pending example run against live credentials.

The data-engine uploader normally streams a session into R2 from the rig itself, watches the manifest, and registers the catalog rows once every file lands. When the rig has no network (offline rig, broken Wi-Fi, lab with restricted egress) the uploader stays idle and the bytes sit on local disk. The recovery path is to copy the session directories to an external SSD, plug it into a machine with R2 credentials, and run DataInterface.ingest() against each one — same code path the daemon uses, just invoked by hand.

Getting the sessions onto the SSD

On the rig, use the data-engine deoffload tool rather than a raw cp/rsync — it only ever writes to external/removable volumes (never the rig’s boot disk), skips any recording still holding a session.lock, and deletes each local copy only after the copy is verified:

deoffload --dry-run     # show the plan; transfer nothing
deoffload               # guided: pick the SSD, confirm, offload + delete
deoffload --keep        # copy only; leave the local copies in place

deoffload preserves each session directory verbatim — including the manifest.json the daemon wrote at session-start — which is exactly what ingest() needs on the other end.

What the SSD should contain

The ingest below scans a parent folder and treats every immediate subdirectory as a candidate session. A single-node rescue looks like:

/Volumes/rescue-ssd/
├── rec_20260507_143022_a1b2/
│   ├── manifest.json
│   ├── eeg_main/
│   │   └── ...
│   └── camera_webcam_0/
│       └── ...
├── rec_20260507_151140_c3d4/
│   ├── manifest.json
│   └── ...
└── ...

Each session’s manifest.json must carry schema_version >= 4, recording_hash, node_hostname, and a populated files list — every daemon since the v0.1.7 follow-up writes those live, so a session that recorded normally will have them. The scan classifies each subdirectory:

  • has a manifest.jsoningestable, attempted below.

  • a rec_* dir with no manifest.jsonmanifest-less (a legacy session from before per-node manifests existed, or one whose recording aborted at startup). The scan can’t ingest it as-is, so it reports these separately rather than silently dropping them. Recover one by synthesizing a v4 manifest from the raw files on disk with the data-engine reconstruct script, then re-run the scan:

    # in a data-engine checkout, against the SSD mount
    uv run python scripts/reconstruct_legacy_manifest.py \
        --data-dir /Volumes/rescue-ssd --yes
    

    It walks the recordings dir, hashes every file with the same rule the rig uses (so the synthesized recording_hash matches byte-for-byte), and writes a manifest.json + .rig_complete sentinel. The result carries a reconstruction block (requires_participant_backfill: true, reconstructed_ended_at: true) whose ended_at is a copy-completion proxy, not a true recording end — downstream consumers must treat it as such.

For a dual-node rig (Mac mini + EEG mini-PC), each node has its own session directory and its own manifest.json. Copy both onto the SSD under distinct folder names (e.g. rec_..._macmini/ and rec_..._eegpc/) so the scan picks up each one — they ingest independently and Ursa merges them by recording_hash server-side regardless of the local directory names.

Running the ingest

Point the script at the SSD root. It scans for sessions, ingests each one independently (a failure on one never aborts the rest), and prints a summary of what succeeded, what was skipped, and what failed.

import json
from pathlib import Path

from ursa import DataInterface

root = Path("/Volumes/rescue-ssd")

data = DataInterface(profile="r2")
data.enable_writes(roles=("assets_rw", "raw_rw"))


def participant_for(session: Path, manifest: dict) -> str | None:
    """Mirror the daemon: read <session>/participant.txt, unless this was
    a test recording. Returns None when absent/empty or testing_mode."""
    if manifest.get("testing_mode"):
        return None
    txt = session / "participant.txt"
    if not txt.exists():
        return None
    return txt.read_text().strip() or None


# Classify each immediate subdirectory of the SSD root.
sessions: list[Path] = []
manifestless: list[Path] = []
for d in sorted(p for p in root.iterdir() if p.is_dir()):
    if (d / "manifest.json").exists():
        sessions.append(d)
    elif d.name.startswith("rec_"):
        manifestless.append(d)

succeeded = []  # list[tuple[Path, IngestResult]]
failed = []     # list[tuple[Path, Exception]]

for session in sessions:
    try:
        manifest = json.loads((session / "manifest.json").read_text())
        result = data.ingest(
            manifest_path=session / "manifest.json",
            source_dir=session,
            participant=participant_for(session, manifest),
        )
    except Exception as e:  # noqa: BLE001 — collect + continue, report at the end
        # A bad manifest (corrupt JSON, unreadable file) or a failed
        # upload is recorded and skipped — one session never aborts the rest.
        failed.append((session, e))
        continue
    succeeded.append((session, result))

# --- summary ---------------------------------------------------------
print(f"\n=== ingest summary: {len(sessions)} session(s) found ===")
for session, r in succeeded:
    print(
        f"  ok    {session.name}: {r.files_uploaded} uploaded, "
        f"{r.files_skipped} skipped, {r.bytes_uploaded:,} B "
        f"({r.recording.recording_hash[:12]}…)"
    )
for session, e in failed:
    print(f"  FAIL  {session.name}: {type(e).__name__}: {e}")
for d in manifestless:
    print(f"  skip  {d.name}: no manifest.json — reconstruct it first (see above)")

print(
    f"\n{len(succeeded)} ok · {len(failed)} failed · "
    f"{len(manifestless)} need reconstruction"
)

A few things worth knowing about each ingest() call:

  • ingest() HEAD-skips files already present in R2 with a matching size, so re-running the whole scan after a partial run (network blip, laptop sleep, one session that failed) is safe and cheap — already-uploaded files are skipped and only the missing bytes transfer.

  • result.recording is the registered RecordingRow — always populated here. (ingest() can instead return result.paused = True with recording = None and no catalog rows written, but only when you pass an is_active= callback — the daemon’s “back off while a recording is live” hook. This manual scan never wires one, so every session runs to completion.)

  • participant_for mirrors the daemon’s participant.txt convention: it passes the raw operator-typed name (Ursa slugifies internally), or None when there’s no participant.txt or the session was a test. A None ingest still registers the recording row, with participant_ids left for another node’s ingest to fill via the merge_participants policy. If you know a participant the file doesn’t capture, set it per-session instead of relying on the file.

  • The per-node manifest is written to R2 alongside the data, so a later ingest_from_r2() can re-derive catalog rows without the SSD.

Watching progress

For a long upload, hook the progress= callback per session. Each event reports cumulative bytes; divide by bytes_total for a monotonic 0.0 → 1.0:

from ursa.register import UploadProgress

def on_progress(evt: UploadProgress) -> None:
    pct = evt.bytes_processed / max(evt.bytes_total, 1)
    print(f"[{pct:5.1%}] {evt.files_done}/{evt.files_total}  {evt.current_file}")

Then pass progress=on_progress to the data.ingest(...) call in the scan loop above.

You can also subscribe to the ursa.register.orchestrator logger for ambient observability without threading progress= through every call site — same pattern as the download tutorial.

What shows up in the FAIL column

The loop above catches per-session exceptions and reports them in the summary instead of aborting the run. The ones you’ll actually see:

  • WritesNotEnabled — you forgot enable_writes(roles=("assets_rw", "raw_rw")). Production ingest needs both roles: raw_rw for the R2 PUTs, assets_rw for the catalog writes. This fails every session, so fix it once and re-run.

  • UnsupportedManifestVersion — manifest predates schema_version=4. This shouldn’t happen for any session recorded on current data-engine; surface the rig + recording date so we can check what wrote it.

  • ValueError: ... missing 'recording_hash' / 'files' — the daemon’s manifest aggregator never finished, or the manifest was hand-edited. The session has no uploadable content from this node; check whether a different node holds the data or whether the recording aborted at startup.

  • ObjectStoreError — non-transient R2 failure (permissions, bad key). The failing key is in the message. Transient 5xx are retried internally with exponential backoff and only surface after the retry budget is exhausted. Because failures are isolated per session, re-running the scan retries only what didn’t complete.

Note

If you’d rather branch on specific failure types than catch Exception, all of these are importable: WritesNotEnabled from ursa (or ursa.catalog), UnsupportedManifestVersion from ursa.register, and ObjectStoreError from ursa.store. The bare ValueError for a missing recording_hash / files has no dedicated subclass.