Downloading session bytes to disk¶
Task: pull one recording’s bytes from R2 to a local directory, with a live progress UI.
DataInterface.download(target, dest) streams a recording’s bytes to a
local directory — raw segments for not-yet-ingested rows, the processed
asset (Zarr / Lance / MP4 / Parquet) for ingested rows, selectable via source=
(see Choosing raw vs processed bytes).
The call is well-suited to interactive UIs (notebooks, Streamlit apps)
because it exposes two complementary surfaces for observing progress: a
structured progress= callback, and a stable ursa.download logger. Pick
whichever shape your front end prefers.
Note
Every snippet on this page talks to production R2 (profile="r2") and
needs 1Password credentials resolved on the machine — see
setup and the constellation-utils secrets docs. The
end-to-end runs below are unverified — pending example run against a
live catalog.
Minimal call¶
import tempfile
from ursa import DataInterface
data = DataInterface(profile="r2")
results = data.query(
recording_hash="1ad200d69a2f8d36424e5ad00e1b4196f45c30fca791b623f6a060bddca446a3",
)
paths = data.download(results, dest=tempfile.mkdtemp(prefix="ursa_session_"))
query() defaults to status="ingested", so this resolves the recording’s
processed rows and download() (with the default source="auto") writes
each row’s processed asset. For a session that hasn’t been ingested yet,
pass status="raw" to query(); to pull the original raw segments of an
already-ingested session, keep the default query and pass source="raw"
to download() (see below).
query() is the sole catalog-resolution verb; download() is one of three
delivery verbs (materialize / stream / download) that consume the
resolved QueryResult(s). Files land under dest
in by_recording layout by default (pass layout="by_modality" or "flat"
to change the on-disk tree).
Choosing raw vs processed bytes¶
source= selects which bytes each modality row contributes:
|
|
|
|---|---|---|
|
raw segments |
processed asset ( |
|
raw segments |
raw segments ( |
|
|
processed asset |
ingesting / failed rows (only reachable via status="all") raise
ValueError under every source — they are catalog markers, not delivery
inputs. Processed downloads need real assets-bucket credentials, i.e.
profile="r2" or "r2-test" (profile="local" has no assets store).
Processed assets keep their basename on disk, so the result opens in place
without re-stitching — under the default by_recording layout:
dest/<recording_hash>/eeg/eeg.zarr/… # zarr.open(...)
dest/<recording_hash>/events/events.lance/… # lance.dataset(...)
dest/<recording_hash>/camera/camera.mp4 # plays as-is, with…
dest/<recording_hash>/camera/camera.mp4.lance/… # …its frame index alongside
download() copies bytes verbatim. To re-assemble processed rows into
in-memory Data objects (clipped, windowed, decoded), use
materialize() / stream() instead.
paths is a list[Path] in the same order the segments are streamed.
download() returns only after every file has landed (or after raising
ObjectStoreError if a transient transport failure exhausts its retry
budget — see ObjectStoreTransientError).
Driving a Streamlit progress bar with progress=¶
progress= accepts a callable invoked once per phase boundary plus once
per file. Events are typed as ursa.DownloadProgress:
import streamlit as st
from ursa import DataInterface, DownloadProgress
status = st.status("Downloading session…", expanded=True)
bar = st.progress(0.0)
def on_progress(event: DownloadProgress) -> None:
if event.phase == "start":
status.update(label="Preparing… (planning object list)")
elif event.phase == "modality_start":
status.update(label=f"Downloading {event.modality} / {event.worker_id or 'default'}")
if event.bytes_total: # set from plan_done onward
bar.progress(event.bytes_done / event.bytes_total)
if event.phase == "finish":
status.update(label=f"Done — {event.bytes_done:,} bytes in {event.elapsed_s:.1f}s",
state="complete")
data = DataInterface(profile="r2")
results = data.query(
recording_hash="1ad200d69a2f8d36424e5ad00e1b4196f45c30fca791b623f6a060bddca446a3",
)
data.download(results, dest="/tmp/session", progress=on_progress)
Phase order: start → plan_done → (modality_start → file_done* →
modality_done)+ → finish. bytes_done is cumulative across the
entire call — it never resets at modality boundaries, so dividing by
bytes_total (set from plan_done onward) yields a monotonic
0.0 → 1.0 progress value.
Callback exceptions are caught (except Exception — KeyboardInterrupt
and SystemExit still tear the download down) and logged to
ursa.download at WARNING; streaming continues. There is no dedupe —
a callback that raises on every file_done produces N WARNINGs for N
files.
Callbacks are invoked synchronously on the thread that called
download() — safe for direct Streamlit / Tk / Qt widget mutation.
Subscribing to the ursa.download logger¶
The same events also flow through the ursa.download logger. Useful when
you don’t want to thread progress= through every call site — e.g.
ambient observability in a CLI tool or background worker:
import logging
logging.getLogger("ursa.download").setLevel(logging.INFO)
logging.getLogger("ursa.download").addHandler(logging.StreamHandler())
logger.info fires at recording / modality boundaries; logger.debug
fires per file. Bump the level to DEBUG only when you need the
per-segment trace — a session with thousands of segments otherwise floods
the handler.
Observability contract¶
The logger name and structured field set are part of Ursa’s public contract; rename or removal is SemVer-major.
Property |
Stable contract |
|---|---|
Logger name |
|
INFO phases |
|
DEBUG phases |
|
|
|
Callback failure |
one WARNING per occurrence, |
Message strings |
NOT part of the contract — bind to |
worker_id=None only appears on call-scoped phases (start, plan_done,
finish); on modality_start, file_done, and modality_done it is
always a non-None str (possibly "", the single-worker sentinel —
see ursa.catalog.ModalityRow for the canonical convention).
Error handling¶
Errors do not route through the callback or the logger. They
propagate as exceptions; callers wrap download() in try/except to
react. The two terminal classes you’ll see:
ObjectStoreError— non-transient (the failing key is named in the message); typically a permissions or pathing issue.FileExistsError— destination collision; passoverwrite=Trueto replace.
Transient transport failures (ObjectStoreTransientError) are retried
automatically inside _stream_to_disk with exponential backoff; only a
genuine exhaustion surfaces, as ObjectStoreError.