Source code for ursa.register.api

"""Implementations of the four ``ursa.register.*`` write functions.

Each function:

1. Builds the corresponding ``*Row`` from keyword args (Pydantic validates
   types, ranges, and `MetadataDict` shape on construction).
2. For :func:`modality`, additionally validates that ``storage_uri``
   matches the bucket tier implied by ``format`` (raw → ``RAW_BUCKET``;
   canonical → ``ASSETS_BUCKET``) via :func:`ursa.layout.validate_storage_uri`.
3. Inserts the row through the supplied :class:`CatalogWriter`.

Phase 1a (M2) is non-idempotent: a re-register with an existing primary
key raises :class:`ursa.catalog.CatalogRowExists`. Idempotent re-register
is tracked in `ENG-1074
<https://linear.app/constellationlab/issue/ENG-1074>`_.
"""

from __future__ import annotations

from datetime import datetime, timedelta
from typing import Any

from ursa.catalog import (
    EventRow,
    IngestionStatus,
    ModalityRow,
    ParticipantRow,
    RecordingRow,
    StorageFormat,
)
from ursa.layout import validate_storage_uri
from ursa.register._writer import CatalogWriter

__all__ = ["participant", "recording", "modality", "event"]


[docs] def participant( *, participant_id: str, enrolled_at: datetime, metadata: dict[str, Any] | None = None, catalog: CatalogWriter, ) -> ParticipantRow: """Register a new participant. Not called by data-engine on every recording — the intended caller is a separate Constellation enrollment surface (admin script or dashboard). Data-engine's recording flow only calls :func:`recording` with an existing ``participant_id``. .. note:: Phase 1a (M2): non-idempotent. Re-register with the same ``participant_id`` raises :class:`ursa.catalog.CatalogRowExists`. Idempotency is tracked in `ENG-1074 <https://linear.app/constellationlab/issue/ENG-1074>`_. """ row = ParticipantRow( participant_id=participant_id, enrolled_at=enrolled_at, metadata=metadata or {}, ) catalog.add_participant(row) return row
[docs] def recording( *, recording_hash: str, participant_ids: list[str], start_time: datetime, duration: timedelta, device_info: dict[str, Any] | None = None, metadata: dict[str, Any] | None = None, catalog: CatalogWriter, ) -> RecordingRow: """Register a recording session. .. note:: Phase 1a (M2): ``recording_hash`` MUST conform to ``^[A-Za-z0-9_-]+$`` (validated by ``CatalogID``). Data-engine passes its native ``recording_id`` verbatim (e.g. ``rec_20260507_143022_a7f3``), which already conforms. Phase 1b will introduce content-addressed hashing under `ENG-1070 <https://linear.app/constellationlab/issue/ENG-1070>`_; the change is purely additive — the pattern relaxes, no caller change required. ``participant_ids`` is a non-empty list (architecture v0.4): a recording can cover multiple participants (multi-subject experiments, dyad sessions, crowd recordings). Single-participant recordings pass a one-element list. FK enforcement against the ``participants`` table at write time is a Phase 1b concern. ``device_info`` is the rig hardware manifest. data-engine populates it from its ``nodes/<node_id>.json`` registry entry. Encode per-channel structures as parallel lists (``{"channel_names": [...], "polarity": [...]}``), not list-of-dicts — that's the ``MetadataDict`` shape required by Lance MapType. Non-idempotent: re-register raises :class:`ursa.catalog.CatalogRowExists` (`ENG-1074 <https://linear.app/constellationlab/issue/ENG-1074>`_). """ row = RecordingRow( recording_hash=recording_hash, participant_ids=participant_ids, start_time=start_time, duration=duration, device_info=device_info or {}, metadata=metadata or {}, ) catalog.add_recording(row) return row
[docs] def modality( *, recording_hash: str, modality: str, raw_storage_uri: str, storage_uri: str | None = None, ingestion_status: IngestionStatus = IngestionStatus.RAW, format: StorageFormat | None = None, domain_intervals: list[tuple[float, float]] | None = None, sampling_rate: float | None = None, channel_spec: dict[str, Any] | None = None, metadata: dict[str, Any] | None = None, catalog: CatalogWriter, ) -> ModalityRow: """Register a data stream within a recording. Phase 1a (M2) is **catalog-only**: the blob at ``raw_storage_uri`` is assumed to already exist (data-engine uploads raw segments before calling). Existence is not verified — a missing blob surfaces at first read, not at register. Architecture v0.4 splits a modality's lifecycle into two states: * Default (``ingestion_status="raw"``) — register against a cold-bucket raw file. ``raw_storage_uri`` is required; ``storage_uri`` defaults to mirror it. ``format`` may be a ``RAW_*`` value (when known at registration) or null. ``domain_intervals`` and ``channel_spec`` are typically null — Virgo's ingestion node populates them. * After Virgo ingestion (``ingestion_status="processed"``) — the caller is Virgo's ingestion node, which converted the raw file to a canonical format. ``storage_uri`` points at the processed object; ``format`` is canonical (non-``RAW_*``); ``domain_intervals`` and ``channel_spec`` are populated. ``raw_storage_uri`` stays as the cold-bucket pointer for re-ingestion. .. note:: Phase 1a (M2): URIs must match the bucket tier implied by ``format``: - Raw formats (``RAW_*``) → ``r2://constellation-data/...`` - Canonical formats (``ZARR``, ``LANCE``, ``MP4_INDEX``, ``PARQUET``) → ``r2://constellation-assets/...`` Mismatch raises ``ValueError`` before any catalog write. The canonical-write surface (``data=`` parameter that uploads the blob and registers the row in one call) is deferred to `ENG-1072 <https://linear.app/constellationlab/issue/ENG-1072>`_. Virgo (M3) is the first real consumer of that path. Non-idempotent: re-register raises :class:`ursa.catalog.CatalogRowExists` (`ENG-1074 <https://linear.app/constellationlab/issue/ENG-1074>`_). """ # Default storage_uri to raw_storage_uri so raw-state callers don't # have to pass it twice. Once Virgo converts the file, the ingestion # node calls upsert with the canonical storage_uri. effective_storage_uri = storage_uri if storage_uri is not None else raw_storage_uri # Pydantic first (URI_PATTERN catches scheme/shape errors), then the # semantic bucket/format check on the row's stored value (so any # future Pydantic normalisation of ``storage_uri`` flows through # consistently). Order matters: the syntactic error message is more # useful than "wrong bucket" when the URI is garbage. row = ModalityRow( recording_hash=recording_hash, modality=modality, ingestion_status=ingestion_status, storage_uri=effective_storage_uri, raw_storage_uri=raw_storage_uri, format=format, sampling_rate=sampling_rate, domain_intervals=domain_intervals, channel_spec=channel_spec, metadata=metadata or {}, ) if row.format is not None: validate_storage_uri(row.storage_uri, row.format) catalog.add_modality(row) return row
[docs] def event( *, event_id: str, recording_hash: str, event_time: float, event_type: str, prompt: str | None = None, response: str | None = None, metadata: dict[str, Any] | None = None, catalog: CatalogWriter, ) -> EventRow: """Register a time-stamped event within a recording. .. note:: Phase 1a (M2): ``EventRow``'s primary key is currently ``(event_id,)`` — a single global namespace across every recording. ``event_id`` is caller-generated and must be globally unique. Recommended format: ``f"{recording_hash}_{seq:08d}"`` where ``seq`` is a per-recording monotonic counter. The ``recording_hash`` prefix guarantees cross-recording uniqueness. `ENG-1073 <https://linear.app/constellationlab/issue/ENG-1073>`_ will scope the PK to ``(recording_hash, event_id)`` so callers can use simple per-recording sequences. Auto-generation in Ursa is intentionally not provided: data-engine already has a per-modality sequence counter for ZMQ message ordering and is the right place to mint IDs. Non-idempotent: re-register with the same ``event_id`` raises :class:`ursa.catalog.CatalogRowExists` (`ENG-1074 <https://linear.app/constellationlab/issue/ENG-1074>`_). """ row = EventRow( event_id=event_id, recording_hash=recording_hash, event_time=event_time, event_type=event_type, prompt=prompt, response=response, metadata=metadata or {}, ) catalog.add_event(row) return row