Offline ingest — manual NAS-side ingest when a rig has been offline

Danger

ADMIN-ONLY OPERATION. This procedure registers prod catalog rows and (for Path A) uploads raw bytes into the prod recordings bucket. Read it end-to-end before running, and announce the window in #engineering-support if you’ll be ingesting more than a handful of recordings.

When to use this

A rig loses network mid-session, or a rig is on a slow uplink and the uploader queue backs up. The operator pulls recordings off the rig via the NAS (or a USB drive) and brings them back to a laptop or office machine that does have R2 access. This runbook is how to get those local-only recordings into the catalog without waiting for the rig’s uploader to catch up later.

If the recordings already finished uploading to R2 from the rig (no NAS detour) and only the catalog row is missing, see Path B below.

Prerequisites

  1. Ursa ≥ v0.3.0 installed on the machine you’re ingesting from (until v0.3.0 is tagged, pin the v0.3.0-dev integration branch):

    uv add 'ursa @ git+ssh://git@github.com/constellationlab/ursa@v0.3.0-dev'
    
  2. 1Password CLI (op) authenticated for the Engineering vault. R2 credentials flow through constellation-utilsop read → in-memory; nothing is written to disk. On a laptop, the desktop-app integration is the cleanest path (Settings → Developer → Integrate with 1Password CLI).

  3. The recording directory is reachable at a local path. Typical layouts:

    • NAS mount: /Volumes/Shared_Drive/constellation-data/recordings/rec_*/ on macOS.

    • USB drive: any path; subdirectories are named rec_<YYYYMMDD>_<HHMMSS>_<hex>/.

    Each recording directory must contain:

    • At least one worker subdirectory (camera_*, eeg_*, mic_*, notes_*, etc.) with data files.

    • participant.txt at the recording root if a participant is known (post-v0.1.7 convention — participant lives outside the manifest body).

    • Either a top-level manifest.json (rig-written aggregate) or per-node manifests under _node_manifests/<hostname>.json (dual-node sessions).

    If the recording has no manifest at all, use Path C (below) to reconstruct first.

Config selection — no env vars

Ursa does not read a URSA_CONFIG env var. To point at the prod catalog, construct the DataInterface with the explicit profile="r2" kwarg. The catalog prefix is resolved at Catalog.open from the active-catalog pointer (<assets-bucket>/ursa/ACTIVE.json, src/ursa/catalog/_pointer.py), so you get whatever prefix is live — no override needed:

from ursa import DataInterface
data = DataInterface(profile="r2")

For a non-default catalog (test runs against r2-test, or a one-off staging prefix), pass config_path= explicitly:

from pathlib import Path
data = DataInterface(profile="r2", config_path=Path("/tmp/my-override.yaml"))

For prod-bucket ingest (the offline-ingest use case), the bare DataInterface(profile="r2") is what you want.

Path A — upload + register (files local only)

The recording’s bytes are on your local NAS or USB drive; they are not in R2 yet. You need to both upload them and write the catalog row.

from pathlib import Path
from ursa import DataInterface

data = DataInterface(profile="r2")
data.enable_writes(roles=("assets_rw", "raw_rw"))  # ingest() uploads raw bytes — needs raw_rw

rec_dir = Path("/Volumes/Shared_Drive/constellation-data/recordings/rec_20260520_195628_b1dd")
manifest = rec_dir / "manifest.json"
participant_file = rec_dir / "participant.txt"
participant = participant_file.read_text().strip() if participant_file.exists() else None

result = data.ingest(
    manifest_path=manifest,
    source_dir=rec_dir,
    participant=participant,
)
print(f"ingested: {result.recording.recording_hash}")
print(f"files uploaded: {result.files_uploaded}, skipped: {result.files_skipped}")

What this does:

  • Uploads every raw segment file under rec_dir to s3://constellation-data/recordings/<rec_id>/... (or HEAD-checks + skips files that are already present — ingest() is the upload-or-skip-atomically variant).

  • Writes the RecordingRow + ModalityRow + ParticipantRow entries into the prod catalog at the active-catalog pointer’s prefix.

  • Returns an IngestResult with recording, files_uploaded, files_skipped, bytes_uploaded, and paused (used by the rig-side daemon; ignore for one-off offline ingest).

By default ingest() refuses to overwrite an existing recording row. If you’re re-running ingest because the previous attempt failed mid-upload, pass overwrite=True:

result = data.ingest(
    manifest_path=manifest,
    source_dir=rec_dir,
    participant=participant,
    overwrite=True,  # clobber a partial prior row
)

Path B — register only (files already in R2)

The rig’s uploader uploaded the recording’s bytes to R2 but the catalog row was never written (e.g. the daemon crashed after the multipart PUT completed but before the register_recording call). Use ingest_from_r2 to write the catalog row from the existing R2 state:

from ursa import DataInterface

data = DataInterface(profile="r2")
data.enable_writes()

result = data.ingest_from_r2(
    rec_id="rec_20260520_195628_b1dd",
    participant="Aria Lin",  # or None if unknown
)
print(f"registered: {result.recording.recording_hash}")

What this does:

  • Reads s3://constellation-data/recordings/<rec_id>/manifest.json first (legacy single-key location).

  • If that’s missing, falls back to merging recordings/<rec_id>/_node_manifests/*.json in memory (post-PR-76 multi-node backfill).

  • HEAD-probes every file listed in the manifest to verify R2 has the bytes; refuses to register if anything is missing (refusing to write catalog rows that would 404 on download).

  • Writes the same RecordingRow / ModalityRow / ParticipantRow entries as Path A — but skips the upload phase.

Use Path B when you know the bytes are on R2 (verified via aws s3 ls or equivalent). Use Path A when you have the bytes locally and aren’t sure what made it to R2.

Path C — no manifest: reconstruct then ingest

The recording has data files but no manifest.json (either at the top level or under _node_manifests/). Common causes: a v0.1.0-era recording that predates the daemon’s manifest-write path, or a dashboard-written placeholder stub with empty files[].

Reconstruct the manifest first with examples/reconstruct_missing_manifests.py:

cd /path/to/ursa
uv run python examples/reconstruct_missing_manifests.py \
    --profile r2 \
    --rec-id rec_20260520_195628_b1dd \
    --yes

The script:

  • Lists every object under recordings/<rec_id>/ in R2.

  • Streams each file through hashlib.sha256 (one GET per file; expensive for multi-GB recordings).

  • Derives recording_hash from the sorted per-file sha256s.

  • Recovers the participant: reads recordings/<rec_id>/participant.txt from R2 and writes the operator-typed display name into the manifest’s participant field. Only when that file is absent or empty/whitespace-only does it fall back to the "__unknown__" sentinel, in which case reconstruction.reconstructed_participant is set True (it is False/absent when a real name was recovered). Note: ingest_from_r2 does not read manifest["participant"] — to attach the name to a catalog row, pass the participant= kwarg on the ingest_from_r2 call (or use scripts/backfill_participants.py).

  • PUTs the reconstructed manifest.json back to recordings/<rec_id>/manifest.json.

After reconstruction lands, run Path B (ingest_from_r2) to register the catalog row.

The reconstruction script is idempotent + resumable: re-running over a directory that already has a valid manifest is a no-op (one small GET + parse). Safe to interrupt and re-run; only the in-flight rec_id loses progress.

Progress reporting

For large ingests, surface progress via the progress= callback (Path A) or the structured ursa.register.orchestrator logger:

from ursa import DataInterface, UploadProgress

def on_progress(p: UploadProgress) -> None:
    print(f"  {p.current_file}: {p.bytes_uploaded:,}/{p.bytes_total:,}")

result = data.ingest(
    manifest_path=manifest,
    source_dir=rec_dir,
    participant=participant,
    progress=on_progress,
)

The download counterpart (DataInterface.download(..., progress=...)) fires DownloadProgress events at six phase boundaries (start, plan_done, modality_start, file_done, modality_done, finish); see the downloading tutorial for details.

Batch loop — many recordings at once

When pulling a backlog off the NAS, loop over recording directories:

from pathlib import Path
from ursa import DataInterface

data = DataInterface(profile="r2")
data.enable_writes(roles=("assets_rw", "raw_rw"))  # ingest() uploads raw bytes — needs raw_rw

nas_root = Path("/Volumes/Shared_Drive/constellation-data/recordings")
for rec_dir in sorted(nas_root.glob("rec_*")):
    manifest = rec_dir / "manifest.json"
    if not manifest.exists():
        print(f"SKIP {rec_dir.name}: no manifest (run reconstruct first)")
        continue
    pfile = rec_dir / "participant.txt"
    participant = pfile.read_text().strip() if pfile.exists() else None
    try:
        result = data.ingest(
            manifest_path=manifest,
            source_dir=rec_dir,
            participant=participant,
        )
        print(f"OK   {rec_dir.name}: {result.files_uploaded} uploaded, {result.files_skipped} skipped")
    except Exception as exc:  # noqa: BLE001  — broad: we want to keep going
        print(f"FAIL {rec_dir.name}: {type(exc).__name__}: {exc}")

Each iteration is independent; a single failed recording doesn’t poison the rest. Failures land in the operator’s stdout; re-run the loop after fixing the root cause (the upload-or-skip-atomically contract makes the second attempt cheap).

Conflict policies

The default overwrite=False is the safe path: a re-ingest of an existing recording aborts. To extend an existing recording’s catalog row with new modalities or participants instead of clobbering, use the lower-level registration verbs directly:

  • Merge a recording’s duration into an existing row: data.register_recording(..., on_conflict="extend") — the backfill verb that updates duration to max(existing, new). This touches the recordings table only; it does not add modality rows. To add a new modality to an existing recording, call data.register_modality(recording_hash=..., ..., on_conflict="error") (or "overwrite" to replace) — modalities are a separate table and are not touched by register_recording. (register_modality accepts only on_conflict="error" / "overwrite"; it has no "extend" policy.)

  • Add a participant to an existing recording: data.register_recording(..., on_conflict="merge_participants"). Set-union of participant_ids; recording-level fields must otherwise match.

  • Overwrite duration/ended_at: data.register_recording(..., on_conflict="overwrite") (used by the rebuild script’s pass-2 duration backfill).

Use on_conflict="overwrite" only when you’ve confirmed the existing row is wrong. The catalog-rebuild runbook (catalog-rebuild.md) describes when each policy applies during a fresh prefix bootstrap.

Verification

After an ingest, confirm the catalog has the rows you expect:

from ursa import DataInterface

data = DataInterface(profile="r2")
rec = data.get_recording("<recording_hash>")
# RecordingRow is keyed by recording_hash; the legacy data-engine rec_id is
# preserved in metadata["manifest_recording_id"].
legacy_id = (rec.metadata or {}).get("manifest_recording_id")
print(f"recording: rec_id={legacy_id}, participants={rec.participant_ids}")
mods = data.list_modalities(where={"recording_hash": rec.recording_hash})
print(f"modalities: {[(m.modality, m.worker_id) for m in mods]}")

Note (post-PR-#73 modality schema): QueryResult.modalities dict keys are now "eeg" (single-worker) or "eeg_a" / "eeg_b" (dual-worker disambiguated) — the modality column is the device family and worker_id carries the disambiguating suffix. Old notebooks that worked around the pre-v0.2.0 "eeg_4559..." shape with m.split("_")[0] can drop the workaround.

Round-trip a file to confirm bytes resolve:

import tempfile
from pathlib import Path
results = data.query(recording_hash=rec.recording_hash, modalities=["camera"], status="raw")
data.download(results, dest=tempfile.mkdtemp(prefix="ursa_verify_"))

Dual-node caveat

If the recording was one half of a dual-node session (e.g. the green-mantis Mac mini host + green-motor EEG mini-PC pair) and you only have the host’s bytes on the NAS, ingest that half normally. The peer half can be ingested later under the same recording_hash and ursa merges via the merge_participants / extend conflict policies (post-PR-#61). Do not fabricate the missing half; let it land as a separate ingest when the peer comes back online.

See also

  • Catalog rebuild — the bulk re-derivation path used during the v0.2.0 cutover. Use that when you’re rebuilding the catalog from R2 raw bytes, not when you’re ingesting individual offline recordings.

  • Downloading tutorial — the progress= callback contract + ursa.download logger.

  • Concepts — the day-to-day read-side API; this runbook is the write-side counterpart for one-off offline ingest.