"""Implementations of the four ``ursa.register.*`` write functions.
Each function:
1. Builds the corresponding ``*Row`` from keyword args (Pydantic validates
types, ranges, and `MetadataDict` shape on construction).
2. For :func:`modality`, additionally validates that ``storage_uri``
matches the bucket tier implied by ``format`` (raw → ``RAW_BUCKET``;
canonical → ``ASSETS_BUCKET``) via :func:`ursa.layout.validate_storage_uri`.
3. Inserts the row through the supplied :class:`CatalogWriter`.
Phase 1a (M2) is non-idempotent: a re-register with an existing primary
key raises :class:`ursa.catalog.CatalogRowExists`. Idempotent re-register
is tracked in `ENG-1074
<https://linear.app/constellationlab/issue/ENG-1074>`_.
"""
from __future__ import annotations
from datetime import datetime, timedelta
from typing import Any
from ursa.catalog import (
EventRow,
IngestionStatus,
ModalityRow,
ParticipantRow,
RecordingRow,
StorageFormat,
)
from ursa.layout import validate_storage_uri
from ursa.register._writer import CatalogWriter
__all__ = ["participant", "recording", "modality", "event"]
[docs]
def participant(
*,
participant_id: str,
enrolled_at: datetime,
metadata: dict[str, Any] | None = None,
catalog: CatalogWriter,
) -> ParticipantRow:
"""Register a new participant.
Not called by data-engine on every recording — the intended caller is
a separate Constellation enrollment surface (admin script or
dashboard). Data-engine's recording flow only calls
:func:`recording` with an existing ``participant_id``.
.. note::
Phase 1a (M2): non-idempotent. Re-register with the same
``participant_id`` raises
:class:`ursa.catalog.CatalogRowExists`. Idempotency is tracked
in `ENG-1074 <https://linear.app/constellationlab/issue/ENG-1074>`_.
"""
row = ParticipantRow(
participant_id=participant_id,
enrolled_at=enrolled_at,
metadata=metadata or {},
)
catalog.add_participant(row)
return row
[docs]
def recording(
*,
recording_hash: str,
participant_ids: list[str],
start_time: datetime,
duration: timedelta,
device_info: dict[str, Any] | None = None,
metadata: dict[str, Any] | None = None,
catalog: CatalogWriter,
) -> RecordingRow:
"""Register a recording session.
.. note::
Phase 1a (M2): ``recording_hash`` MUST conform to
``^[A-Za-z0-9_-]+$`` (validated by ``CatalogID``). Data-engine
passes its native ``recording_id`` verbatim
(e.g. ``rec_20260507_143022_a7f3``), which already conforms. Phase
1b will introduce content-addressed hashing under
`ENG-1070 <https://linear.app/constellationlab/issue/ENG-1070>`_;
the change is purely additive — the pattern relaxes, no caller
change required.
``participant_ids`` is a non-empty list (architecture v0.4): a
recording can cover multiple participants (multi-subject
experiments, dyad sessions, crowd recordings). Single-participant
recordings pass a one-element list. FK enforcement against the
``participants`` table at write time is a Phase 1b concern.
``device_info`` is the rig hardware manifest. data-engine
populates it from its ``nodes/<node_id>.json`` registry entry.
Encode per-channel structures as parallel lists
(``{"channel_names": [...], "polarity": [...]}``), not list-of-dicts —
that's the ``MetadataDict`` shape required by Lance MapType.
Non-idempotent: re-register raises
:class:`ursa.catalog.CatalogRowExists`
(`ENG-1074 <https://linear.app/constellationlab/issue/ENG-1074>`_).
"""
row = RecordingRow(
recording_hash=recording_hash,
participant_ids=participant_ids,
start_time=start_time,
duration=duration,
device_info=device_info or {},
metadata=metadata or {},
)
catalog.add_recording(row)
return row
[docs]
def modality(
*,
recording_hash: str,
modality: str,
raw_storage_uri: str,
storage_uri: str | None = None,
ingestion_status: IngestionStatus = IngestionStatus.RAW,
format: StorageFormat | None = None,
domain_intervals: list[tuple[float, float]] | None = None,
sampling_rate: float | None = None,
channel_spec: dict[str, Any] | None = None,
metadata: dict[str, Any] | None = None,
catalog: CatalogWriter,
) -> ModalityRow:
"""Register a data stream within a recording.
Phase 1a (M2) is **catalog-only**: the blob at ``raw_storage_uri`` is
assumed to already exist (data-engine uploads raw segments before
calling). Existence is not verified — a missing blob surfaces at first
read, not at register.
Architecture v0.4 splits a modality's lifecycle into two states:
* Default (``ingestion_status="raw"``) — register against a
cold-bucket raw file. ``raw_storage_uri`` is required;
``storage_uri`` defaults to mirror it. ``format`` may be a
``RAW_*`` value (when known at registration) or null.
``domain_intervals`` and ``channel_spec`` are typically null —
Virgo's ingestion node populates them.
* After Virgo ingestion (``ingestion_status="processed"``) — the
caller is Virgo's ingestion node, which converted the raw file to
a canonical format. ``storage_uri`` points at the processed object;
``format`` is canonical (non-``RAW_*``); ``domain_intervals`` and
``channel_spec`` are populated. ``raw_storage_uri`` stays as the
cold-bucket pointer for re-ingestion.
.. note::
Phase 1a (M2): URIs must match the bucket tier implied by ``format``:
- Raw formats (``RAW_*``) → ``r2://constellation-data/...``
- Canonical formats (``ZARR``, ``LANCE``, ``MP4_INDEX``,
``PARQUET``) → ``r2://constellation-assets/...``
Mismatch raises ``ValueError`` before any catalog write.
The canonical-write surface (``data=`` parameter that uploads the
blob and registers the row in one call) is deferred to
`ENG-1072 <https://linear.app/constellationlab/issue/ENG-1072>`_.
Virgo (M3) is the first real consumer of that path.
Non-idempotent: re-register raises
:class:`ursa.catalog.CatalogRowExists`
(`ENG-1074 <https://linear.app/constellationlab/issue/ENG-1074>`_).
"""
# Default storage_uri to raw_storage_uri so raw-state callers don't
# have to pass it twice. Once Virgo converts the file, the ingestion
# node calls upsert with the canonical storage_uri.
effective_storage_uri = storage_uri if storage_uri is not None else raw_storage_uri
# Pydantic first (URI_PATTERN catches scheme/shape errors), then the
# semantic bucket/format check on the row's stored value (so any
# future Pydantic normalisation of ``storage_uri`` flows through
# consistently). Order matters: the syntactic error message is more
# useful than "wrong bucket" when the URI is garbage.
row = ModalityRow(
recording_hash=recording_hash,
modality=modality,
ingestion_status=ingestion_status,
storage_uri=effective_storage_uri,
raw_storage_uri=raw_storage_uri,
format=format,
sampling_rate=sampling_rate,
domain_intervals=domain_intervals,
channel_spec=channel_spec,
metadata=metadata or {},
)
if row.format is not None:
validate_storage_uri(row.storage_uri, row.format)
catalog.add_modality(row)
return row
[docs]
def event(
*,
event_id: str,
recording_hash: str,
event_time: float,
event_type: str,
prompt: str | None = None,
response: str | None = None,
metadata: dict[str, Any] | None = None,
catalog: CatalogWriter,
) -> EventRow:
"""Register a time-stamped event within a recording.
.. note::
Phase 1a (M2): ``EventRow``'s primary key is currently
``(event_id,)`` — a single global namespace across every
recording. ``event_id`` is caller-generated and must be globally
unique. Recommended format:
``f"{recording_hash}_{seq:08d}"`` where ``seq`` is a per-recording
monotonic counter. The ``recording_hash`` prefix guarantees
cross-recording uniqueness.
`ENG-1073 <https://linear.app/constellationlab/issue/ENG-1073>`_
will scope the PK to ``(recording_hash, event_id)`` so callers
can use simple per-recording sequences.
Auto-generation in Ursa is intentionally not provided: data-engine
already has a per-modality sequence counter for ZMQ message
ordering and is the right place to mint IDs.
Non-idempotent: re-register with the same ``event_id`` raises
:class:`ursa.catalog.CatalogRowExists`
(`ENG-1074 <https://linear.app/constellationlab/issue/ENG-1074>`_).
"""
row = EventRow(
event_id=event_id,
recording_hash=recording_hash,
event_time=event_time,
event_type=event_type,
prompt=prompt,
response=response,
metadata=metadata or {},
)
catalog.add_event(row)
return row