Source code for ursa.register.payload

"""Manifest → catalog-row payload mapping (ENG-1129).

Pure utilities that turn a data-engine ``manifest.json`` plus a listing
of relative file paths under ``recordings/<rec_id>/`` into a
:class:`RegisterPayload` of catalog rows ready for
``register_recording`` + ``register_modality`` to insert.

No I/O: the caller is responsible for producing the file listing — by
walking a local ``rec_*/`` directory (rig-side ingest, ENG-1130) or by
listing the R2 prefix (backfill, ENG-1096). This keeps the
manifest-interpretation logic in one place and avoids two paths
drifting.

Format inference and modality-name collapse logic are extracted from
``examples/mvp_demo_register.py`` (ENG-1063) so the demo, the
orchestrator, and backfill all agree.
"""

from __future__ import annotations

from collections import Counter
from collections.abc import Iterable
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import PurePosixPath
from typing import Any, cast

from ursa.catalog import IngestionStatus, ModalityRow, RecordingRow, StorageFormat
from ursa.layout import raw_modality_uri

__all__ = [
    "ModalitySpec",
    "RegisterPayload",
    "discover_modalities_from_listing",
    "infer_format",
    "manifest_to_register_payload",
]


# Worker-subdir prefix -> StorageFormat. First match wins.
# Per data-engine layout: ``recordings/<rec_id>/<worker_subdir>/<file>``.
_FORMAT_HINTS: tuple[tuple[str, StorageFormat], ...] = (
    ("camera", StorageFormat.RAW_VIDEO),
    ("screen", StorageFormat.RAW_VIDEO),
    ("pupillabs", StorageFormat.RAW_VIDEO),
    ("mic", StorageFormat.RAW_AUDIO),
    ("eeg", StorageFormat.RAW_BINARY),
    ("samsung-watch", StorageFormat.RAW_JSONL),
    ("battery", StorageFormat.RAW_JSONL),
    ("browser", StorageFormat.RAW_JSONL),
    ("keyboard", StorageFormat.RAW_JSONL),
    ("mouse", StorageFormat.RAW_JSONL),
    ("location", StorageFormat.RAW_JSONL),
    ("notes", StorageFormat.RAW_JSONL),
)

# Extension -> StorageFormat. Used to corroborate the worker-name hint.
_EXT_HINTS: dict[str, StorageFormat] = {
    ".mkv": StorageFormat.RAW_VIDEO,
    ".mp4": StorageFormat.RAW_VIDEO,
    ".wav": StorageFormat.RAW_AUDIO,
    ".jsonl": StorageFormat.RAW_JSONL,
    ".ndjson": StorageFormat.RAW_JSONL,
    ".csv": StorageFormat.RAW_CSV,
    ".bin": StorageFormat.RAW_BINARY,
    ".dat": StorageFormat.RAW_BINARY,
}


[docs] @dataclass(frozen=True) class ModalitySpec: """One discovered modality within a recording. Attributes ---------- name Short modality name (e.g. ``"camera"``), after optional collapse from the worker_subdir's first underscore segment. worker_subdir Canonical R2 subdir (e.g. ``"camera_0"``) — the actual path segment, not the collapsed name. ``raw_storage_uri`` is built from this, not from :attr:`name`. format Inferred :class:`StorageFormat`. See :func:`infer_format`. raw_storage_uri Full ``r2://`` URI pointing at the modality prefix. Resolved via :func:`ursa.layout.raw_modality_uri` against the active profile. segment_count Number of files under this worker_subdir in the supplied listing. """ name: str worker_subdir: str format: StorageFormat raw_storage_uri: str segment_count: int
[docs] @dataclass(frozen=True) class RegisterPayload: """A recording + its modalities, ready to insert. Not a Pydantic model: this only crosses process-local boundaries (manifest util → orchestrator → DataInterface.register_*). The contained :class:`RecordingRow` / :class:`ModalityRow` are Pydantic-validated at construction; wrapping them again would just double-validate. """ recording: RecordingRow modalities: list[ModalityRow]
[docs] def infer_format(worker_subdir: str, files: Iterable[str]) -> StorageFormat: """Pick a :class:`StorageFormat` for a worker subdir. The worker-name prefix wins when it matches a known modality (e.g. ``camera_0`` → ``RAW_VIDEO``); otherwise the most-common file extension under the subdir picks. Falls back to ``RAW_BINARY`` when nothing matches — better to register with a coarse format and refine later than to refuse to register. Parameters ---------- worker_subdir The canonical worker subdir name (e.g. ``"camera_0"``). files Relative paths *within* the worker subdir, or full paths containing it. Only the basenames' extensions are inspected. """ # Match against the first underscore-delimited token, not a raw # ``startswith`` — `"mic_0"` matches `"mic"`, but `"micheal"` does # not. Data-engine's worker_subdir convention is # ``<modality>_<worker_id>`` (e.g. ``camera_d956b74f7625``), so # the head token is the canonical modality family. (S4) head = worker_subdir.split("_", 1)[0] for prefix, fmt in _FORMAT_HINTS: if head == prefix: return fmt exts = Counter(PurePosixPath(f).suffix.lower() for f in files if PurePosixPath(f).suffix) for ext, _ in exts.most_common(): if ext in _EXT_HINTS: return _EXT_HINTS[ext] return StorageFormat.RAW_BINARY
def _collapse_worker_names(workers: list[str]) -> dict[str, str]: """Map ``worker_subdir`` -> short modality name. Collapses to the first underscore segment (``camera_0`` -> ``camera``). Raises ``ValueError`` if two workers collapse to the same short name — callers should disable collapse when this happens. """ out: dict[str, str] = {} rev: dict[str, str] = {} for w in workers: short = w.split("_", 1)[0] if short in rev: raise ValueError( f"modality-name collision: workers {rev[short]!r} and {w!r} both " f"collapse to {short!r}; pass collapse_modalities=False to use " f"verbatim worker_subdir names" ) rev[short] = w out[w] = short return out
[docs] def discover_modalities_from_listing( recording_id: str, rel_paths: Iterable[str], *, collapse_modalities: bool = True, profile: str | None = None, ) -> list[ModalitySpec]: """Group a flat file listing into per-modality :class:`ModalitySpec`\\ s. Parameters ---------- recording_id Used to construct ``raw_storage_uri`` for each modality. rel_paths File paths relative to ``recordings/<recording_id>/`` — typically ``"camera_0/camera_0000_*.mkv"``. Files at the recording root (no ``/`` in the rel path; e.g. ``manifest.json``) are skipped silently. collapse_modalities When ``True`` (the default), collapse worker-subdir names to their first underscore segment. Set to ``False`` to register with the full ``worker_subdir`` name as the modality. profile Optional profile override threaded to :func:`raw_modality_uri`. When ``None``, ``URSA_PROFILE`` env var is consulted. """ by_worker: dict[str, list[str]] = {} for rel in rel_paths: if "/" not in rel: # Files at the recording root (manifest.json copy, etc.) — skip. continue worker, _, _ = rel.partition("/") by_worker.setdefault(worker, []).append(rel) if not by_worker: return [] if collapse_modalities: worker_to_modality = _collapse_worker_names(sorted(by_worker.keys())) else: worker_to_modality = {w: w for w in sorted(by_worker.keys())} specs: list[ModalitySpec] = [] for worker, files in sorted(by_worker.items()): modality_name = worker_to_modality[worker] fmt = infer_format(worker, files) uri = raw_modality_uri(recording_id, worker, profile=profile) specs.append( ModalitySpec( name=modality_name, worker_subdir=worker, format=fmt, raw_storage_uri=uri, segment_count=len(files), ) ) return specs
def _parse_dt(value: Any) -> datetime: """Parse an ISO-8601 string or pass-through a :class:`datetime`. Naive datetimes are interpreted as UTC for parity with ``examples/mvp_demo_register.py``. """ if isinstance(value, datetime): return value if value.tzinfo else value.replace(tzinfo=timezone.utc) if isinstance(value, str): dt = datetime.fromisoformat(value) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt raise TypeError(f"expected ISO-8601 string or datetime; got {type(value).__name__}")
[docs] def manifest_to_register_payload( manifest: dict[str, Any], rel_paths: Iterable[str], *, participant_id: str, profile: str | None = None, collapse_modalities: bool = True, ) -> RegisterPayload: """Build a :class:`RegisterPayload` from a manifest + file listing. The manifest is the data-engine ``manifest.json`` contents as a dict; ``rel_paths`` is the listing under ``recordings/<rec_id>/``. Required manifest fields: * ``recording_id`` — informational; ``manifest_recording_id`` metadata for auditability. * ``recording_hash`` — content-addressed digest from ``data-engine/uploader/hashing.py:compute_recording_hash``. No fallback — a manifest without this field is rejected with ``KeyError`` (B4 from round 2 — silent fallback to ``recording_id`` could orphan a row when a re-augment produces different content under the same recording_id). * ``started_at``, ``ended_at`` — ISO-8601 strings. Optional manifest fields: * ``participant`` — display-name string; copied to ``metadata`` for auditability but the catalog ``participant_ids`` always comes from the explicit ``participant_id=`` kwarg. * ``files`` — list of ``{worker_id, rel_path, size, sha256}`` entries (added by data-engine's ``augment_manifest``). Currently unused by this util; the orchestrator's hash-skip path consumes it directly from the manifest dict. Parameters ---------- manifest The manifest dict. rel_paths File paths relative to ``recordings/<rec_id>/`` (output of ``store.list`` or ``find rec_*/``). participant_id The catalog ID for the participant. Required: the schema currently enforces ``participant_ids`` non-empty. Null-participant support for legacy recordings is tracked in ENG-1099. profile Optional profile override for URI construction. collapse_modalities See :func:`discover_modalities_from_listing`. Raises ------ KeyError If the manifest is missing ``recording_id`` / ``started_at`` / ``ended_at``. ValueError If no modality directories are discovered under ``recordings/<rec_id>/`` (likely a bad listing or a non-rig prefix). """ if "recording_id" not in manifest: raise KeyError("manifest missing required field 'recording_id'") if "started_at" not in manifest: raise KeyError("manifest missing required field 'started_at'") if "ended_at" not in manifest: raise KeyError("manifest missing required field 'ended_at'") recording_id: str = manifest["recording_id"] # ``recording_hash`` is the content-addressed digest of every per-file # sha256, stamped by ``data-engine/uploader/manifest.py:augment_manifest`` # before ingest. Refusing to fall back to ``recording_id`` here is the # no-orphan invariant from B4: a re-augmented session whose contents # changed gets a *different* recording_hash and lands as a fresh row, # so R2 bytes and catalog rows never disagree. Direct callers of # ``manifest_to_register_payload`` that hand in a pre-augment manifest # are violating the contract and should fail loudly here rather than # quietly using recording_id and risking an orphan write. if not manifest.get("recording_hash"): raise KeyError( "manifest missing required field 'recording_hash'; run " "data-engine/uploader.manifest.augment_manifest before " "calling manifest_to_register_payload (the recording_hash " "is the content-addressed digest of per-file sha256s)." ) recording_hash: str = manifest["recording_hash"] started_at = _parse_dt(manifest["started_at"]) ended_at = _parse_dt(manifest["ended_at"]) duration = ended_at - started_at # B2 (round 4): negative durations silently break downstream # slicing math. Pydantic's ``timedelta`` field has no sign # constraint by default, so a manifest with ``ended_at < # started_at`` (clock skew, manual edit, backwards worker # timestamp) would pass validation and land a degenerate row. # Refuse here so the operator sees a clear error instead. if duration < timedelta(0): raise ValueError( f"manifest has ended_at < started_at " f"({manifest['ended_at']!r} < {manifest['started_at']!r}); " "recording duration would be negative. Check the rig's " "clock or fix the manifest before retrying." ) specs = discover_modalities_from_listing( recording_id, rel_paths, collapse_modalities=collapse_modalities, profile=profile, ) if not specs: raise ValueError( f"no modality subdirs discovered under recordings/{recording_id}/; " "every file in the listing was at the recording root" ) participant_display_name = manifest.get("participant") recording_row = RecordingRow( recording_hash=recording_hash, participant_ids=[participant_id], # ``EpochNs`` coerces datetime → int64 ns via a BeforeValidator at # construction; mypy can't see through the Annotated coercer, so # cast here matches the pattern in ``_register_recording``. start_time=cast(int, started_at), duration=duration, metadata={ "participant_display_name": participant_display_name, "manifest_recording_id": recording_id, }, ) modality_rows: list[ModalityRow] = [] for spec in specs: metadata: dict[str, Any] = {} if spec.name != spec.worker_subdir: metadata["worker_subdir"] = spec.worker_subdir modality_rows.append( ModalityRow( recording_hash=recording_hash, modality=spec.name, raw_storage_uri=spec.raw_storage_uri, storage_uri=spec.raw_storage_uri, ingestion_status=IngestionStatus.RAW, format=spec.format, metadata=metadata, ) ) return RegisterPayload(recording=recording_row, modalities=modality_rows)