"""R2 storage layout conventions for Ursa.
All code that constructs or interprets R2 object keys (ingestion, query,
lifecycle) MUST use the functions here. Having a single module as the
canonical source prevents the key-structure from diverging across callers.
Two-store model (architecture v0.4)
-----------------------------------
Ursa is a **two-store** database backed by two R2 buckets:
* **Raw store** — ``constellation-data`` (this module's :data:`RAW_BUCKET`).
Cold/infrequent-access tier. Whole-file objects exactly as data-engine
writes them; no chunk indexing or temporal structure. Designed to be read
exactly once, by Virgo's ingestion node, then rarely accessed again.
Lifecycle policy can archive or delete raw segment files after a
configurable retention window once Virgo has produced the processed
artifact (tracked by ENG-1085).
* **Processed store** — ``constellation-assets`` (this module's
:data:`ASSETS_BUCKET`). Hot tier. Populated by Virgo's ingestion node:
Zarr arrays for regular continuous streams, Lance tables for irregular
events and the catalog itself, MP4 + Lance frame indices for video.
Two buckets, two layouts:
**constellation-data** — raw recordings, written by data-engine rigs.
Ursa treats this bucket as read-only. A Cloudflare bucket lock is applied
to the ``recordings/`` prefix so rigs cannot modify already-uploaded objects.
Key structure (per data-engine PR #49)::
recordings/<recording_id>/<worker_subdir>/<file> raw segment files
manifests/<recording_id>/manifest.json upload commit marker
_status/<hostname>.json uploader heartbeat
nodes/<node_id>.json node registry
**constellation-assets** — Ursa-managed objects, organised by *repo header*
so each package owns a distinct prefix and permissions can be scoped
independently::
virgo/<recording_hash>/<modality>.<ext> Virgo canonical outputs
ursa/catalog/<table>.lance Lance catalog tables
orion/checkpoints/<checkpoint_id>/ Orion model checkpoints
orion/benchmark-suites/<name>/<version>.json Benchmark suite configs
orion/benchmark-results/<result_id>.json Benchmark evaluation results
Per-modality lifecycle (architecture v0.4)
------------------------------------------
A :class:`~ursa.catalog.ModalityRow` carries two URI fields plus an
``ingestion_status`` enum:
* ``raw_storage_uri`` — the immutable cold-bucket pointer
(``r2://constellation-data/recordings/...``). Set at registration and
preserved forever, even after Virgo's ingestion node has produced the
processed artifact, so re-ingestion is always possible.
* ``storage_uri`` — the *current authoritative* location. While
``ingestion_status="raw"`` it mirrors ``raw_storage_uri``; once Virgo's
ingestion node runs, the row is upserted with
``ingestion_status="processed"``, ``storage_uri`` swapped to the
``constellation-assets`` key, and ``format`` / ``domain_intervals`` /
``channel_spec`` populated.
``recording_hash`` is the join key throughout — no re-keying during the
raw → processed transition.
"""
from __future__ import annotations
from urllib.parse import urlparse
from ursa.catalog.schemas import StorageFormat
__all__ = [
# Bucket constants
"RAW_BUCKET",
"ASSETS_BUCKET",
# Extension map + canonical format set
"FORMAT_EXT",
"CANONICAL_FORMATS",
# Virgo canonical layout
"virgo_recording_prefix",
"virgo_modality_key",
"virgo_modality_uri",
# Ursa catalog layout
"catalog_prefix",
"catalog_table_key",
"catalog_table_uri",
# Catalog table name constants
"ALL_CATALOG_TABLES",
"TABLE_PARTICIPANTS",
"TABLE_RECORDINGS",
"TABLE_MODALITIES",
"TABLE_EVENTS",
"TABLE_EMBEDDINGS",
"TABLE_VIRGO_ASSETS",
"TABLE_CHECKPOINTS",
"TABLE_BENCHMARK_SUITES",
"TABLE_BENCHMARK_RESULTS",
# Orion layout
"orion_checkpoint_prefix",
"orion_checkpoint_uri",
"orion_checkpoint_data_hash_key",
"orion_benchmark_suite_key",
"orion_benchmark_suite_uri",
"orion_benchmark_result_key",
"orion_benchmark_result_uri",
# Raw layout helpers
"raw_recording_prefix",
"raw_modality_prefix",
"raw_modality_uri",
"raw_commit_marker_key",
"raw_status_key",
"raw_node_key",
# Validation
"validate_storage_uri",
]
# ---------------------------------------------------------------------------
# Bucket constants
# ---------------------------------------------------------------------------
#: Raw store (architecture v0.4). Cold/infrequent-access tier — raw
#: recordings written by data-engine rigs and read exactly once by
#: Virgo's ingestion node. Ursa reads but does not write here; only the
#: uploader writes. A Cloudflare bucket lock on the ``recordings/``
#: prefix prevents post-upload mutation by rigs. Lifecycle policy
#: (ENG-1085) may archive/delete raw segment files after Virgo ingestion
#: success + a retention window.
RAW_BUCKET = "constellation-data"
#: Processed store (architecture v0.4). Hot tier — populated by Virgo's
#: ingestion node (Zarr / MP4 / Lance) and by Ursa itself (the Lance
#: catalog under ``ursa/catalog/``). Each package owns a distinct prefix
#: (repo header) for permission scoping: ``virgo/`` for canonical Virgo
#: outputs, ``ursa/catalog/`` for catalog tables, ``orion/`` for
#: checkpoints and benchmark artifacts.
ASSETS_BUCKET = "constellation-assets"
# ---------------------------------------------------------------------------
# Extension map — StorageFormat → file extension
# ---------------------------------------------------------------------------
#: File extension for each StorageFormat. Canonical formats use the target
#: format's standard extension. Raw formats use the data-engine native
#: extension (these appear here only so FORMAT_EXT is exhaustive and the
#: completeness test can verify no StorageFormat value is missing — raw
#: modality URIs should be constructed via ``raw_modality_uri()``, not
#: ``virgo_modality_key()``).
#:
#: Every StorageFormat value must appear here — a completeness test
#: enforces this so a missing entry fails at test time, not write time.
FORMAT_EXT: dict[StorageFormat, str] = {
# Canonical
StorageFormat.ZARR: "zarr",
StorageFormat.LANCE: "lance",
StorageFormat.MP4_INDEX: "mp4",
StorageFormat.PARQUET: "parquet",
# Raw (data-engine native — see raw_modality_uri())
StorageFormat.RAW_BINARY: "bin",
StorageFormat.RAW_CSV: "csv",
StorageFormat.RAW_JSONL: "jsonl",
StorageFormat.RAW_AUDIO: "wav",
StorageFormat.RAW_VIDEO: "mkv",
}
#: Formats written by Virgo or ingestion conversion into ``constellation-assets``.
#: ``virgo_modality_key()`` only accepts these; passing a ``RAW_*`` format
#: raises ``ValueError`` because raw data lives in ``constellation-data``.
CANONICAL_FORMATS: frozenset[StorageFormat] = frozenset(
f for f in StorageFormat if not f.value.startswith("raw_")
)
# ---------------------------------------------------------------------------
# constellation-assets: Virgo canonical layout
# ---------------------------------------------------------------------------
[docs]
def virgo_recording_prefix(recording_hash: str) -> str:
"""Prefix for all Virgo-processed objects belonging to one recording.
Used by ingestion to list all canonical objects for a recording (e.g. before
lifecycle GC runs). Not used for individual object writes — call
:func:`virgo_modality_key` for those.
Example: ``virgo/abc123def456/``
"""
return f"virgo/{recording_hash}/"
[docs]
def virgo_modality_key(recording_hash: str, modality: str, fmt: StorageFormat) -> str:
"""Key for one modality's Virgo-processed object in ``constellation-assets``.
Only accepts canonical formats (``ZARR``, ``LANCE``, ``MP4_INDEX``,
``PARQUET``). Raises ``ValueError`` for ``RAW_*`` formats — those belong in
``constellation-data`` and must be addressed via :func:`raw_modality_uri`.
Example: ``virgo/abc123def456/eeg.zarr``
"""
if fmt not in CANONICAL_FORMATS:
raise ValueError(
f"StorageFormat.{fmt.name} is a raw format and belongs in "
f"{RAW_BUCKET!r}. Use raw_modality_uri() for Phase 1a data, or "
f"pass a canonical format (one of: {sorted(f.value for f in CANONICAL_FORMATS)})."
)
ext = FORMAT_EXT[fmt]
return f"virgo/{recording_hash}/{modality}.{ext}"
[docs]
def virgo_modality_uri(recording_hash: str, modality: str, fmt: StorageFormat) -> str:
"""Full ``r2://`` URI for a Virgo-processed modality object.
Example: ``r2://constellation-assets/virgo/abc123def456/eeg.zarr``
"""
return f"r2://{ASSETS_BUCKET}/{virgo_modality_key(recording_hash, modality, fmt)}"
# ---------------------------------------------------------------------------
# constellation-assets: Ursa catalog layout
# ---------------------------------------------------------------------------
[docs]
def catalog_prefix() -> str:
"""Prefix for all Lance catalog tables in ``constellation-assets``.
Tables live under ``ursa/catalog/`` — the ``ursa/`` repo header scopes
permissions for the Ursa package, mirroring how ``virgo/`` scopes Virgo
and ``orion/`` scopes Orion.
Example: ``ursa/catalog/``
"""
return "ursa/catalog/"
[docs]
def catalog_table_key(table_name: str) -> str:
"""Key for a named Lance catalog table.
Prefer using a :ref:`TABLE_* constant <catalog-table-constants>` over a
bare string so a rename is a single-file edit.
Example: ``ursa/catalog/recordings.lance``
"""
return f"ursa/catalog/{table_name}.lance"
[docs]
def catalog_table_uri(table_name: str) -> str:
"""Full ``r2://`` URI for a Lance catalog table.
Example: ``r2://constellation-assets/ursa/catalog/recordings.lance``
"""
return f"r2://{ASSETS_BUCKET}/{catalog_table_key(table_name)}"
# ---------------------------------------------------------------------------
# Catalog table name constants
#
# One constant per catalog table, matching the nine tables defined in
# ursa.catalog.schemas. Use these instead of bare strings so a table rename
# is a single-file edit and grep finds every reference.
#
# Grouped by owning system — the same grouping used in the catalog docs table.
# ---------------------------------------------------------------------------
# Ursa-owned — raw data layer
TABLE_PARTICIPANTS = "participants"
TABLE_RECORDINGS = "recordings"
TABLE_MODALITIES = "modalities"
TABLE_EVENTS = "events"
TABLE_EMBEDDINGS = "embeddings"
# Virgo-owned — preprocessing outputs
TABLE_VIRGO_ASSETS = "virgo_assets"
# Orion-owned — model training and evaluation
TABLE_CHECKPOINTS = "checkpoints"
TABLE_BENCHMARK_SUITES = "benchmark_suites"
TABLE_BENCHMARK_RESULTS = "benchmark_results"
#: All nine catalog table names, one entry per :class:`~ursa.catalog.CatalogRow`
#: subclass. Use to enumerate tables (e.g. when creating Lance datasets at
#: first boot). Use the individual ``TABLE_*`` constants when you need a
#: specific table name. A completeness test asserts ``len(ALL_CATALOG_TABLES)``
#: equals the number of ``TABLE_*`` constants in this module.
ALL_CATALOG_TABLES: tuple[str, ...] = (
TABLE_PARTICIPANTS,
TABLE_RECORDINGS,
TABLE_MODALITIES,
TABLE_EVENTS,
TABLE_EMBEDDINGS,
TABLE_VIRGO_ASSETS,
TABLE_CHECKPOINTS,
TABLE_BENCHMARK_SUITES,
TABLE_BENCHMARK_RESULTS,
)
# ---------------------------------------------------------------------------
# constellation-assets: Orion layout
# ---------------------------------------------------------------------------
[docs]
def orion_checkpoint_prefix(checkpoint_id: str) -> str:
"""Prefix for all objects belonging to one Orion model checkpoint.
``CheckpointRow.storage_uri`` should be set to this prefix. The
data-hash manifest lives at
``{orion_checkpoint_prefix(id)}data_hashes/manifest.json`` — use
:func:`orion_checkpoint_data_hash_key` to construct that path rather
than string-concatenating.
Example: ``orion/checkpoints/ckpt-abc123/``
"""
return f"orion/checkpoints/{checkpoint_id}/"
[docs]
def orion_checkpoint_uri(checkpoint_id: str) -> str:
"""Full ``r2://`` URI for an Orion checkpoint prefix.
Example: ``r2://constellation-assets/orion/checkpoints/ckpt-abc123/``
"""
return f"r2://{ASSETS_BUCKET}/{orion_checkpoint_prefix(checkpoint_id)}"
[docs]
def orion_checkpoint_data_hash_key(checkpoint_id: str) -> str:
"""Key for the data-hash manifest inside a checkpoint.
This is the file Orion writes that lists every recording consumed during
the training run, used for train/test overlap detection.
Example: ``orion/checkpoints/ckpt-abc123/data_hashes/manifest.json``
"""
return f"orion/checkpoints/{checkpoint_id}/data_hashes/manifest.json"
[docs]
def orion_benchmark_suite_key(suite_name: str, suite_version: int) -> str:
"""Key for a versioned benchmark suite configuration object.
``BenchmarkSuiteRow.storage_uri`` should point at this key. The object
contains the held-out query spec and metric definitions.
Example: ``orion/benchmark-suites/cognitive_load_eval/1.json``
"""
return f"orion/benchmark-suites/{suite_name}/{suite_version}.json"
[docs]
def orion_benchmark_suite_uri(suite_name: str, suite_version: int) -> str:
"""Full ``r2://`` URI for a benchmark suite configuration object.
Example:
``r2://constellation-assets/orion/benchmark-suites/cognitive_load_eval/1.json``
"""
return f"r2://{ASSETS_BUCKET}/{orion_benchmark_suite_key(suite_name, suite_version)}"
[docs]
def orion_benchmark_result_key(result_id: str) -> str:
"""Key for a benchmark evaluation result object.
``BenchmarkResultRow.storage_uri`` should point at this key.
Example: ``orion/benchmark-results/result-deadbeef.json``
"""
return f"orion/benchmark-results/{result_id}.json"
[docs]
def orion_benchmark_result_uri(result_id: str) -> str:
"""Full ``r2://`` URI for a benchmark evaluation result.
Example:
``r2://constellation-assets/orion/benchmark-results/result-deadbeef.json``
"""
return f"r2://{ASSETS_BUCKET}/{orion_benchmark_result_key(result_id)}"
# ---------------------------------------------------------------------------
# constellation-data: raw layout (data-engine-managed, read-only for Ursa)
# ---------------------------------------------------------------------------
[docs]
def raw_recording_prefix(recording_id: str) -> str:
"""Prefix for all raw segment files belonging to one recording.
Matches the key layout introduced in data-engine PR #49:
``recordings/<recording_id>/``. Note: ``manifests/`` is a sibling prefix
at the bucket root, not nested under ``recordings/``.
Example: ``recordings/rec_20260507_143022_a7f3/``
"""
return f"recordings/{recording_id}/"
[docs]
def raw_modality_prefix(recording_id: str, worker_subdir: str) -> str:
"""Prefix for one modality's raw segment files.
``worker_subdir`` is the per-worker directory data-engine creates, e.g.
``camera_front_cam`` or ``eeg_default``.
Example: ``recordings/rec_20260507_143022_a7f3/camera_front_cam/``
"""
return f"recordings/{recording_id}/{worker_subdir}/"
[docs]
def raw_modality_uri(recording_id: str, worker_subdir: str) -> str:
"""Full ``r2://`` URI for a raw modality prefix in ``constellation-data``.
The URI points at the *prefix* (trailing ``/``) — raw modalities consist
of multiple segment files. The ingestion step (ENG-888) resolves individual
objects within the prefix when building ``ModalityRow`` entries.
Example:
``r2://constellation-data/recordings/rec_20260507_.../camera_front_cam/``
"""
return f"r2://{RAW_BUCKET}/{raw_modality_prefix(recording_id, worker_subdir)}"
[docs]
def raw_commit_marker_key(recording_id: str) -> str:
"""Key for the upload commit marker written by the uploader after a complete session.
This is NOT under ``recordings/`` — the manifests prefix sits at the
bucket root alongside ``recordings/``, ``_status/``, and ``nodes/``.
Example: ``manifests/rec_20260507_143022_a7f3/manifest.json``
"""
return f"manifests/{recording_id}/manifest.json"
[docs]
def raw_status_key(hostname: str) -> str:
"""Key for a rig's uploader status heartbeat file.
Example: ``_status/green-mantis.json``
"""
return f"_status/{hostname}.json"
[docs]
def raw_node_key(node_id: str) -> str:
"""Key for a node's registry entry in ``constellation-data``.
Example: ``nodes/green-mantis.json``
"""
return f"nodes/{node_id}.json"
# ---------------------------------------------------------------------------
# Validation
# ---------------------------------------------------------------------------
#: Object-store URI schemes accepted by the catalog. **Must stay in sync
#: with** ``URI_PATTERN`` in :mod:`ursa.catalog.schemas` — that regex is the
#: syntactic gate Pydantic enforces, this tuple is the semantic gate
#: :func:`validate_storage_uri` uses. Adding a scheme (e.g. ``azure``)
#: requires updating both. ENG-1071 may consolidate these into a shared
#: source.
_VALID_URI_SCHEMES: tuple[str, ...] = ("r2", "s3", "gcs", "file")
[docs]
def validate_storage_uri(uri: str, fmt: StorageFormat) -> None:
"""Reject a ``storage_uri`` that doesn't match its ``StorageFormat`` tier.
Phase 1a (M2) callers register raw modalities (``RAW_*``) under
``constellation-data`` and canonical modalities (``ZARR``, ``LANCE``,
``MP4_INDEX``, ``PARQUET``) under ``constellation-assets``. This helper
enforces that contract before any catalog row is written.
Test-profile bucket suffixes (``-test``) are not yet recognised — see
`ENG-1071 <https://linear.app/constellationlab/issue/ENG-1071>`_.
Raises ``ValueError`` (the typed Pydantic ``URI_PATTERN`` regex would
catch malformed input upstream; this validator handles the
*semantic* mismatch where a syntactically-valid URI points at the
wrong bucket for its format).
"""
parsed = urlparse(uri)
if parsed.scheme not in _VALID_URI_SCHEMES:
raise ValueError(
f"storage_uri {uri!r} has unsupported scheme {parsed.scheme!r}; "
f"expected one of {_VALID_URI_SCHEMES}"
)
expected_bucket = ASSETS_BUCKET if fmt in CANONICAL_FORMATS else RAW_BUCKET
# ``file://`` URIs have no bucket — exempt them so local-fs stores work
# for tests and Polaris cache without a profile-aware mapping.
if parsed.scheme == "file":
return
if parsed.netloc != expected_bucket:
tier = "canonical" if fmt in CANONICAL_FORMATS else "raw"
raise ValueError(
f"storage_uri {uri!r} points at bucket {parsed.netloc!r}, but "
f"format {fmt.name} is a {tier} format and must live under "
f"{expected_bucket!r}"
)