Source code for ursa.layout

"""R2 storage layout conventions for Ursa.

All code that constructs or interprets R2 object keys (ingestion, query,
lifecycle) MUST use the functions here. Having a single module as the
canonical source prevents the key-structure from diverging across callers.

Two-store model (architecture v0.4)
-----------------------------------

Ursa is a **two-store** database backed by two R2 buckets:

* **Raw store** — ``constellation-data`` (this module's :data:`RAW_BUCKET`).
  Cold/infrequent-access tier. Whole-file objects exactly as data-engine
  writes them; no chunk indexing or temporal structure. Designed to be read
  exactly once, by Virgo's ingestion node, then rarely accessed again.
  Lifecycle policy can archive or delete raw segment files after a
  configurable retention window once Virgo has produced the processed
  artifact (tracked by ENG-1085).
* **Processed store** — ``constellation-assets`` (this module's
  :data:`ASSETS_BUCKET`). Hot tier. Populated by Virgo's ingestion node:
  Zarr arrays for regular continuous streams, Lance tables for irregular
  events and the catalog itself, MP4 + Lance frame indices for video.

Two buckets, two layouts:

**constellation-data** — raw recordings, written by data-engine rigs.
Ursa treats this bucket as read-only. A Cloudflare bucket lock is applied
to the ``recordings/`` prefix so rigs cannot modify already-uploaded objects.
Key structure (per data-engine PR #49)::

    recordings/<recording_id>/<worker_subdir>/<file>   raw segment files
    manifests/<recording_id>/manifest.json             upload commit marker
    _status/<hostname>.json                            uploader heartbeat
    nodes/<node_id>.json                               node registry

**constellation-assets** — Ursa-managed objects, organised by *repo header*
so each package owns a distinct prefix and permissions can be scoped
independently::

    virgo/<recording_hash>/<modality>.<ext>              Virgo canonical outputs
    ursa/catalog/<table>.lance                           Lance catalog tables
    orion/checkpoints/<checkpoint_id>/                   Orion model checkpoints
    orion/benchmark-suites/<name>/<version>.json         Benchmark suite configs
    orion/benchmark-results/<result_id>.json             Benchmark evaluation results

Per-modality lifecycle (architecture v0.4)
------------------------------------------

A :class:`~ursa.catalog.ModalityRow` carries two URI fields plus an
``ingestion_status`` enum:

* ``raw_storage_uri`` — the immutable cold-bucket pointer
  (``r2://constellation-data/recordings/...``). Set at registration and
  preserved forever, even after Virgo's ingestion node has produced the
  processed artifact, so re-ingestion is always possible.
* ``storage_uri`` — the *current authoritative* location. While
  ``ingestion_status="raw"`` it mirrors ``raw_storage_uri``; once Virgo's
  ingestion node runs, the row is upserted with
  ``ingestion_status="processed"``, ``storage_uri`` swapped to the
  ``constellation-assets`` key, and ``format`` / ``domain_intervals`` /
  ``channel_spec`` populated.

``recording_hash`` is the join key throughout — no re-keying during the
raw → processed transition.
"""

from __future__ import annotations

from urllib.parse import urlparse

from ursa.catalog.schemas import StorageFormat

__all__ = [
    # Bucket constants
    "RAW_BUCKET",
    "ASSETS_BUCKET",
    # Extension map + canonical format set
    "FORMAT_EXT",
    "CANONICAL_FORMATS",
    # Virgo canonical layout
    "virgo_recording_prefix",
    "virgo_modality_key",
    "virgo_modality_uri",
    # Ursa catalog layout
    "catalog_prefix",
    "catalog_table_key",
    "catalog_table_uri",
    # Catalog table name constants
    "ALL_CATALOG_TABLES",
    "TABLE_PARTICIPANTS",
    "TABLE_RECORDINGS",
    "TABLE_MODALITIES",
    "TABLE_EVENTS",
    "TABLE_EMBEDDINGS",
    "TABLE_VIRGO_ASSETS",
    "TABLE_CHECKPOINTS",
    "TABLE_BENCHMARK_SUITES",
    "TABLE_BENCHMARK_RESULTS",
    # Orion layout
    "orion_checkpoint_prefix",
    "orion_checkpoint_uri",
    "orion_checkpoint_data_hash_key",
    "orion_benchmark_suite_key",
    "orion_benchmark_suite_uri",
    "orion_benchmark_result_key",
    "orion_benchmark_result_uri",
    # Raw layout helpers
    "raw_recording_prefix",
    "raw_modality_prefix",
    "raw_modality_uri",
    "raw_commit_marker_key",
    "raw_status_key",
    "raw_node_key",
    # Validation
    "validate_storage_uri",
]

# ---------------------------------------------------------------------------
# Bucket constants
# ---------------------------------------------------------------------------

#: Raw store (architecture v0.4). Cold/infrequent-access tier — raw
#: recordings written by data-engine rigs and read exactly once by
#: Virgo's ingestion node. Ursa reads but does not write here; only the
#: uploader writes. A Cloudflare bucket lock on the ``recordings/``
#: prefix prevents post-upload mutation by rigs. Lifecycle policy
#: (ENG-1085) may archive/delete raw segment files after Virgo ingestion
#: success + a retention window.
RAW_BUCKET = "constellation-data"

#: Processed store (architecture v0.4). Hot tier — populated by Virgo's
#: ingestion node (Zarr / MP4 / Lance) and by Ursa itself (the Lance
#: catalog under ``ursa/catalog/``). Each package owns a distinct prefix
#: (repo header) for permission scoping: ``virgo/`` for canonical Virgo
#: outputs, ``ursa/catalog/`` for catalog tables, ``orion/`` for
#: checkpoints and benchmark artifacts.
ASSETS_BUCKET = "constellation-assets"

# ---------------------------------------------------------------------------
# Extension map — StorageFormat → file extension
# ---------------------------------------------------------------------------

#: File extension for each StorageFormat. Canonical formats use the target
#: format's standard extension. Raw formats use the data-engine native
#: extension (these appear here only so FORMAT_EXT is exhaustive and the
#: completeness test can verify no StorageFormat value is missing — raw
#: modality URIs should be constructed via ``raw_modality_uri()``, not
#: ``virgo_modality_key()``).
#:
#: Every StorageFormat value must appear here — a completeness test
#: enforces this so a missing entry fails at test time, not write time.
FORMAT_EXT: dict[StorageFormat, str] = {
    # Canonical
    StorageFormat.ZARR: "zarr",
    StorageFormat.LANCE: "lance",
    StorageFormat.MP4_INDEX: "mp4",
    StorageFormat.PARQUET: "parquet",
    # Raw (data-engine native — see raw_modality_uri())
    StorageFormat.RAW_BINARY: "bin",
    StorageFormat.RAW_CSV: "csv",
    StorageFormat.RAW_JSONL: "jsonl",
    StorageFormat.RAW_AUDIO: "wav",
    StorageFormat.RAW_VIDEO: "mkv",
}

#: Formats written by Virgo or ingestion conversion into ``constellation-assets``.
#: ``virgo_modality_key()`` only accepts these; passing a ``RAW_*`` format
#: raises ``ValueError`` because raw data lives in ``constellation-data``.
CANONICAL_FORMATS: frozenset[StorageFormat] = frozenset(
    f for f in StorageFormat if not f.value.startswith("raw_")
)

# ---------------------------------------------------------------------------
# constellation-assets: Virgo canonical layout
# ---------------------------------------------------------------------------


[docs] def virgo_recording_prefix(recording_hash: str) -> str: """Prefix for all Virgo-processed objects belonging to one recording. Used by ingestion to list all canonical objects for a recording (e.g. before lifecycle GC runs). Not used for individual object writes — call :func:`virgo_modality_key` for those. Example: ``virgo/abc123def456/`` """ return f"virgo/{recording_hash}/"
[docs] def virgo_modality_key(recording_hash: str, modality: str, fmt: StorageFormat) -> str: """Key for one modality's Virgo-processed object in ``constellation-assets``. Only accepts canonical formats (``ZARR``, ``LANCE``, ``MP4_INDEX``, ``PARQUET``). Raises ``ValueError`` for ``RAW_*`` formats — those belong in ``constellation-data`` and must be addressed via :func:`raw_modality_uri`. Example: ``virgo/abc123def456/eeg.zarr`` """ if fmt not in CANONICAL_FORMATS: raise ValueError( f"StorageFormat.{fmt.name} is a raw format and belongs in " f"{RAW_BUCKET!r}. Use raw_modality_uri() for Phase 1a data, or " f"pass a canonical format (one of: {sorted(f.value for f in CANONICAL_FORMATS)})." ) ext = FORMAT_EXT[fmt] return f"virgo/{recording_hash}/{modality}.{ext}"
[docs] def virgo_modality_uri(recording_hash: str, modality: str, fmt: StorageFormat) -> str: """Full ``r2://`` URI for a Virgo-processed modality object. Example: ``r2://constellation-assets/virgo/abc123def456/eeg.zarr`` """ return f"r2://{ASSETS_BUCKET}/{virgo_modality_key(recording_hash, modality, fmt)}"
# --------------------------------------------------------------------------- # constellation-assets: Ursa catalog layout # ---------------------------------------------------------------------------
[docs] def catalog_prefix() -> str: """Prefix for all Lance catalog tables in ``constellation-assets``. Tables live under ``ursa/catalog/`` — the ``ursa/`` repo header scopes permissions for the Ursa package, mirroring how ``virgo/`` scopes Virgo and ``orion/`` scopes Orion. Example: ``ursa/catalog/`` """ return "ursa/catalog/"
[docs] def catalog_table_key(table_name: str) -> str: """Key for a named Lance catalog table. Prefer using a :ref:`TABLE_* constant <catalog-table-constants>` over a bare string so a rename is a single-file edit. Example: ``ursa/catalog/recordings.lance`` """ return f"ursa/catalog/{table_name}.lance"
[docs] def catalog_table_uri(table_name: str) -> str: """Full ``r2://`` URI for a Lance catalog table. Example: ``r2://constellation-assets/ursa/catalog/recordings.lance`` """ return f"r2://{ASSETS_BUCKET}/{catalog_table_key(table_name)}"
# --------------------------------------------------------------------------- # Catalog table name constants # # One constant per catalog table, matching the nine tables defined in # ursa.catalog.schemas. Use these instead of bare strings so a table rename # is a single-file edit and grep finds every reference. # # Grouped by owning system — the same grouping used in the catalog docs table. # --------------------------------------------------------------------------- # Ursa-owned — raw data layer TABLE_PARTICIPANTS = "participants" TABLE_RECORDINGS = "recordings" TABLE_MODALITIES = "modalities" TABLE_EVENTS = "events" TABLE_EMBEDDINGS = "embeddings" # Virgo-owned — preprocessing outputs TABLE_VIRGO_ASSETS = "virgo_assets" # Orion-owned — model training and evaluation TABLE_CHECKPOINTS = "checkpoints" TABLE_BENCHMARK_SUITES = "benchmark_suites" TABLE_BENCHMARK_RESULTS = "benchmark_results" #: All nine catalog table names, one entry per :class:`~ursa.catalog.CatalogRow` #: subclass. Use to enumerate tables (e.g. when creating Lance datasets at #: first boot). Use the individual ``TABLE_*`` constants when you need a #: specific table name. A completeness test asserts ``len(ALL_CATALOG_TABLES)`` #: equals the number of ``TABLE_*`` constants in this module. ALL_CATALOG_TABLES: tuple[str, ...] = ( TABLE_PARTICIPANTS, TABLE_RECORDINGS, TABLE_MODALITIES, TABLE_EVENTS, TABLE_EMBEDDINGS, TABLE_VIRGO_ASSETS, TABLE_CHECKPOINTS, TABLE_BENCHMARK_SUITES, TABLE_BENCHMARK_RESULTS, ) # --------------------------------------------------------------------------- # constellation-assets: Orion layout # ---------------------------------------------------------------------------
[docs] def orion_checkpoint_prefix(checkpoint_id: str) -> str: """Prefix for all objects belonging to one Orion model checkpoint. ``CheckpointRow.storage_uri`` should be set to this prefix. The data-hash manifest lives at ``{orion_checkpoint_prefix(id)}data_hashes/manifest.json`` — use :func:`orion_checkpoint_data_hash_key` to construct that path rather than string-concatenating. Example: ``orion/checkpoints/ckpt-abc123/`` """ return f"orion/checkpoints/{checkpoint_id}/"
[docs] def orion_checkpoint_uri(checkpoint_id: str) -> str: """Full ``r2://`` URI for an Orion checkpoint prefix. Example: ``r2://constellation-assets/orion/checkpoints/ckpt-abc123/`` """ return f"r2://{ASSETS_BUCKET}/{orion_checkpoint_prefix(checkpoint_id)}"
[docs] def orion_checkpoint_data_hash_key(checkpoint_id: str) -> str: """Key for the data-hash manifest inside a checkpoint. This is the file Orion writes that lists every recording consumed during the training run, used for train/test overlap detection. Example: ``orion/checkpoints/ckpt-abc123/data_hashes/manifest.json`` """ return f"orion/checkpoints/{checkpoint_id}/data_hashes/manifest.json"
[docs] def orion_benchmark_suite_key(suite_name: str, suite_version: int) -> str: """Key for a versioned benchmark suite configuration object. ``BenchmarkSuiteRow.storage_uri`` should point at this key. The object contains the held-out query spec and metric definitions. Example: ``orion/benchmark-suites/cognitive_load_eval/1.json`` """ return f"orion/benchmark-suites/{suite_name}/{suite_version}.json"
[docs] def orion_benchmark_suite_uri(suite_name: str, suite_version: int) -> str: """Full ``r2://`` URI for a benchmark suite configuration object. Example: ``r2://constellation-assets/orion/benchmark-suites/cognitive_load_eval/1.json`` """ return f"r2://{ASSETS_BUCKET}/{orion_benchmark_suite_key(suite_name, suite_version)}"
[docs] def orion_benchmark_result_key(result_id: str) -> str: """Key for a benchmark evaluation result object. ``BenchmarkResultRow.storage_uri`` should point at this key. Example: ``orion/benchmark-results/result-deadbeef.json`` """ return f"orion/benchmark-results/{result_id}.json"
[docs] def orion_benchmark_result_uri(result_id: str) -> str: """Full ``r2://`` URI for a benchmark evaluation result. Example: ``r2://constellation-assets/orion/benchmark-results/result-deadbeef.json`` """ return f"r2://{ASSETS_BUCKET}/{orion_benchmark_result_key(result_id)}"
# --------------------------------------------------------------------------- # constellation-data: raw layout (data-engine-managed, read-only for Ursa) # ---------------------------------------------------------------------------
[docs] def raw_recording_prefix(recording_id: str) -> str: """Prefix for all raw segment files belonging to one recording. Matches the key layout introduced in data-engine PR #49: ``recordings/<recording_id>/``. Note: ``manifests/`` is a sibling prefix at the bucket root, not nested under ``recordings/``. Example: ``recordings/rec_20260507_143022_a7f3/`` """ return f"recordings/{recording_id}/"
[docs] def raw_modality_prefix(recording_id: str, worker_subdir: str) -> str: """Prefix for one modality's raw segment files. ``worker_subdir`` is the per-worker directory data-engine creates, e.g. ``camera_front_cam`` or ``eeg_default``. Example: ``recordings/rec_20260507_143022_a7f3/camera_front_cam/`` """ return f"recordings/{recording_id}/{worker_subdir}/"
[docs] def raw_modality_uri(recording_id: str, worker_subdir: str) -> str: """Full ``r2://`` URI for a raw modality prefix in ``constellation-data``. The URI points at the *prefix* (trailing ``/``) — raw modalities consist of multiple segment files. The ingestion step (ENG-888) resolves individual objects within the prefix when building ``ModalityRow`` entries. Example: ``r2://constellation-data/recordings/rec_20260507_.../camera_front_cam/`` """ return f"r2://{RAW_BUCKET}/{raw_modality_prefix(recording_id, worker_subdir)}"
[docs] def raw_commit_marker_key(recording_id: str) -> str: """Key for the upload commit marker written by the uploader after a complete session. This is NOT under ``recordings/`` — the manifests prefix sits at the bucket root alongside ``recordings/``, ``_status/``, and ``nodes/``. Example: ``manifests/rec_20260507_143022_a7f3/manifest.json`` """ return f"manifests/{recording_id}/manifest.json"
[docs] def raw_status_key(hostname: str) -> str: """Key for a rig's uploader status heartbeat file. Example: ``_status/green-mantis.json`` """ return f"_status/{hostname}.json"
[docs] def raw_node_key(node_id: str) -> str: """Key for a node's registry entry in ``constellation-data``. Example: ``nodes/green-mantis.json`` """ return f"nodes/{node_id}.json"
# --------------------------------------------------------------------------- # Validation # --------------------------------------------------------------------------- #: Object-store URI schemes accepted by the catalog. **Must stay in sync #: with** ``URI_PATTERN`` in :mod:`ursa.catalog.schemas` — that regex is the #: syntactic gate Pydantic enforces, this tuple is the semantic gate #: :func:`validate_storage_uri` uses. Adding a scheme (e.g. ``azure``) #: requires updating both. ENG-1071 may consolidate these into a shared #: source. _VALID_URI_SCHEMES: tuple[str, ...] = ("r2", "s3", "gcs", "file")
[docs] def validate_storage_uri(uri: str, fmt: StorageFormat) -> None: """Reject a ``storage_uri`` that doesn't match its ``StorageFormat`` tier. Phase 1a (M2) callers register raw modalities (``RAW_*``) under ``constellation-data`` and canonical modalities (``ZARR``, ``LANCE``, ``MP4_INDEX``, ``PARQUET``) under ``constellation-assets``. This helper enforces that contract before any catalog row is written. Test-profile bucket suffixes (``-test``) are not yet recognised — see `ENG-1071 <https://linear.app/constellationlab/issue/ENG-1071>`_. Raises ``ValueError`` (the typed Pydantic ``URI_PATTERN`` regex would catch malformed input upstream; this validator handles the *semantic* mismatch where a syntactically-valid URI points at the wrong bucket for its format). """ parsed = urlparse(uri) if parsed.scheme not in _VALID_URI_SCHEMES: raise ValueError( f"storage_uri {uri!r} has unsupported scheme {parsed.scheme!r}; " f"expected one of {_VALID_URI_SCHEMES}" ) expected_bucket = ASSETS_BUCKET if fmt in CANONICAL_FORMATS else RAW_BUCKET # ``file://`` URIs have no bucket — exempt them so local-fs stores work # for tests and Polaris cache without a profile-aware mapping. if parsed.scheme == "file": return if parsed.netloc != expected_bucket: tier = "canonical" if fmt in CANONICAL_FORMATS else "raw" raise ValueError( f"storage_uri {uri!r} points at bucket {parsed.netloc!r}, but " f"format {fmt.name} is a {tier} format and must live under " f"{expected_bucket!r}" )