Downloading session bytes to disk

Task: pull one recording’s bytes from R2 to a local directory, with a live progress UI.

DataInterface.download(target, dest) streams a recording’s bytes to a local directory — raw segments for not-yet-ingested rows, the processed asset (Zarr / Lance / MP4 / Parquet) for ingested rows, selectable via source= (see Choosing raw vs processed bytes). The call is well-suited to interactive UIs (notebooks, Streamlit apps) because it exposes two complementary surfaces for observing progress: a structured progress= callback, and a stable ursa.download logger. Pick whichever shape your front end prefers.

Note

Every snippet on this page talks to production R2 (profile="r2") and needs 1Password credentials resolved on the machine — see setup and the constellation-utils secrets docs. The end-to-end runs below are unverified — pending example run against a live catalog.

Minimal call

import tempfile
from ursa import DataInterface

data = DataInterface(profile="r2")
results = data.query(
    recording_hash="1ad200d69a2f8d36424e5ad00e1b4196f45c30fca791b623f6a060bddca446a3",
)
paths = data.download(results, dest=tempfile.mkdtemp(prefix="ursa_session_"))

query() defaults to status="ingested", so this resolves the recording’s processed rows and download() (with the default source="auto") writes each row’s processed asset. For a session that hasn’t been ingested yet, pass status="raw" to query(); to pull the original raw segments of an already-ingested session, keep the default query and pass source="raw" to download() (see below).

query() is the sole catalog-resolution verb; download() is one of three delivery verbs (materialize / stream / download) that consume the resolved QueryResult(s). Files land under dest in by_recording layout by default (pass layout="by_modality" or "flat" to change the on-disk tree).

Choosing raw vs processed bytes

source= selects which bytes each modality row contributes:

source=

raw row

processed row

"auto" (default)

raw segments

processed asset (storage_uri)

"raw"

raw segments

raw segments (raw_storage_uri is kept after ingestion)

"processed"

ValueError

processed asset

ingesting / failed rows (only reachable via status="all") raise ValueError under every source — they are catalog markers, not delivery inputs. Processed downloads need real assets-bucket credentials, i.e. profile="r2" or "r2-test" (profile="local" has no assets store).

Processed assets keep their basename on disk, so the result opens in place without re-stitching — under the default by_recording layout:

dest/<recording_hash>/eeg/eeg.zarr/…        # zarr.open(...)
dest/<recording_hash>/events/events.lance/… # lance.dataset(...)
dest/<recording_hash>/camera/camera.mp4     # plays as-is, with…
dest/<recording_hash>/camera/camera.mp4.lance/…  # …its frame index alongside

download() copies bytes verbatim. To re-assemble processed rows into in-memory Data objects (clipped, windowed, decoded), use materialize() / stream() instead.

paths is a list[Path] in the same order the segments are streamed. download() returns only after every file has landed (or after raising ObjectStoreError if a transient transport failure exhausts its retry budget — see ObjectStoreTransientError).

Driving a Streamlit progress bar with progress=

progress= accepts a callable invoked once per phase boundary plus once per file. Events are typed as ursa.DownloadProgress:

import streamlit as st
from ursa import DataInterface, DownloadProgress

status = st.status("Downloading session…", expanded=True)
bar = st.progress(0.0)

def on_progress(event: DownloadProgress) -> None:
    if event.phase == "start":
        status.update(label="Preparing… (planning object list)")
    elif event.phase == "modality_start":
        status.update(label=f"Downloading {event.modality} / {event.worker_id or 'default'}")
    if event.bytes_total:  # set from plan_done onward
        bar.progress(event.bytes_done / event.bytes_total)
    if event.phase == "finish":
        status.update(label=f"Done — {event.bytes_done:,} bytes in {event.elapsed_s:.1f}s",
                       state="complete")

data = DataInterface(profile="r2")
results = data.query(
    recording_hash="1ad200d69a2f8d36424e5ad00e1b4196f45c30fca791b623f6a060bddca446a3",
)
data.download(results, dest="/tmp/session", progress=on_progress)

Phase order: startplan_done → (modality_startfile_done* → modality_done)+ → finish. bytes_done is cumulative across the entire call — it never resets at modality boundaries, so dividing by bytes_total (set from plan_done onward) yields a monotonic 0.0 → 1.0 progress value.

Callback exceptions are caught (except ExceptionKeyboardInterrupt and SystemExit still tear the download down) and logged to ursa.download at WARNING; streaming continues. There is no dedupe — a callback that raises on every file_done produces N WARNINGs for N files.

Callbacks are invoked synchronously on the thread that called download() — safe for direct Streamlit / Tk / Qt widget mutation.

Subscribing to the ursa.download logger

The same events also flow through the ursa.download logger. Useful when you don’t want to thread progress= through every call site — e.g. ambient observability in a CLI tool or background worker:

import logging

logging.getLogger("ursa.download").setLevel(logging.INFO)
logging.getLogger("ursa.download").addHandler(logging.StreamHandler())

logger.info fires at recording / modality boundaries; logger.debug fires per file. Bump the level to DEBUG only when you need the per-segment trace — a session with thousands of segments otherwise floods the handler.

Observability contract

The logger name and structured field set are part of Ursa’s public contract; rename or removal is SemVer-major.

Property

Stable contract

Logger name

ursa.download

INFO phases

start, plan_done, modality_start, modality_done, finish

DEBUG phases

file_done

extra= keys

phase, recording_hash, modality, worker_id, key, bytes_done, bytes_total, elapsed_s — append-only across minor versions

Callback failure

one WARNING per occurrence, extra={"callback_error": "..."}

Message strings

NOT part of the contract — bind to extra= keys, do not parse the message

worker_id=None only appears on call-scoped phases (start, plan_done, finish); on modality_start, file_done, and modality_done it is always a non-None str (possibly "", the single-worker sentinel — see ursa.catalog.ModalityRow for the canonical convention).

Error handling

Errors do not route through the callback or the logger. They propagate as exceptions; callers wrap download() in try/except to react. The two terminal classes you’ll see:

  • ObjectStoreError — non-transient (the failing key is named in the message); typically a permissions or pathing issue.

  • FileExistsError — destination collision; pass overwrite=True to replace.

Transient transport failures (ObjectStoreTransientError) are retried automatically inside _stream_to_disk with exponential backoff; only a genuine exhaustion surfaces, as ObjectStoreError.