"""Manifest → catalog-row payload mapping (ENG-1129).
Pure utilities that turn a data-engine ``manifest.json`` plus a listing
of relative file paths under ``recordings/<rec_id>/`` into a
:class:`RegisterPayload` of catalog rows ready for
``register_recording`` + ``register_modality`` to insert.
No I/O: the caller is responsible for producing the file listing — by
walking a local ``rec_*/`` directory (rig-side ingest, ENG-1130) or by
listing the R2 prefix (backfill, ENG-1096). This keeps the
manifest-interpretation logic in one place and avoids two paths
drifting.
Format inference and modality-name collapse logic are extracted from
``examples/mvp_demo_register.py`` (ENG-1063) so the demo, the
orchestrator, and backfill all agree.
"""
from __future__ import annotations
from collections import Counter
from collections.abc import Iterable
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import PurePosixPath
from typing import Any, cast
from ursa.catalog import IngestionStatus, ModalityRow, RecordingRow, StorageFormat
from ursa.layout import raw_modality_uri
__all__ = [
"ModalitySpec",
"RegisterPayload",
"discover_modalities_from_listing",
"infer_format",
"manifest_to_register_payload",
]
# Worker-subdir prefix -> StorageFormat. First match wins.
# Per data-engine layout: ``recordings/<rec_id>/<worker_subdir>/<file>``.
_FORMAT_HINTS: tuple[tuple[str, StorageFormat], ...] = (
("camera", StorageFormat.RAW_VIDEO),
("screen", StorageFormat.RAW_VIDEO),
("pupillabs", StorageFormat.RAW_VIDEO),
("mic", StorageFormat.RAW_AUDIO),
("eeg", StorageFormat.RAW_BINARY),
("samsung-watch", StorageFormat.RAW_JSONL),
("battery", StorageFormat.RAW_JSONL),
("browser", StorageFormat.RAW_JSONL),
("keyboard", StorageFormat.RAW_JSONL),
("mouse", StorageFormat.RAW_JSONL),
("location", StorageFormat.RAW_JSONL),
("notes", StorageFormat.RAW_JSONL),
)
# Extension -> StorageFormat. Used to corroborate the worker-name hint.
_EXT_HINTS: dict[str, StorageFormat] = {
".mkv": StorageFormat.RAW_VIDEO,
".mp4": StorageFormat.RAW_VIDEO,
".wav": StorageFormat.RAW_AUDIO,
".jsonl": StorageFormat.RAW_JSONL,
".ndjson": StorageFormat.RAW_JSONL,
".csv": StorageFormat.RAW_CSV,
".bin": StorageFormat.RAW_BINARY,
".dat": StorageFormat.RAW_BINARY,
}
[docs]
@dataclass(frozen=True)
class ModalitySpec:
"""One discovered modality within a recording.
Attributes
----------
name
Short modality name (e.g. ``"camera"``), after optional collapse
from the worker_subdir's first underscore segment.
worker_subdir
Canonical R2 subdir (e.g. ``"camera_0"``) — the actual path
segment, not the collapsed name. ``raw_storage_uri`` is built
from this, not from :attr:`name`.
format
Inferred :class:`StorageFormat`. See :func:`infer_format`.
raw_storage_uri
Full ``r2://`` URI pointing at the modality prefix. Resolved via
:func:`ursa.layout.raw_modality_uri` against the active profile.
segment_count
Number of files under this worker_subdir in the supplied listing.
"""
name: str
worker_subdir: str
format: StorageFormat
raw_storage_uri: str
segment_count: int
[docs]
@dataclass(frozen=True)
class RegisterPayload:
"""A recording + its modalities, ready to insert.
Not a Pydantic model: this only crosses process-local boundaries
(manifest util → orchestrator → DataInterface.register_*). The
contained :class:`RecordingRow` / :class:`ModalityRow` are
Pydantic-validated at construction; wrapping them again would just
double-validate.
"""
recording: RecordingRow
modalities: list[ModalityRow]
def _collapse_worker_names(workers: list[str]) -> dict[str, str]:
"""Map ``worker_subdir`` -> short modality name.
Collapses to the first underscore segment (``camera_0`` -> ``camera``).
Raises ``ValueError`` if two workers collapse to the same short
name — callers should disable collapse when this happens.
"""
out: dict[str, str] = {}
rev: dict[str, str] = {}
for w in workers:
short = w.split("_", 1)[0]
if short in rev:
raise ValueError(
f"modality-name collision: workers {rev[short]!r} and {w!r} both "
f"collapse to {short!r}; pass collapse_modalities=False to use "
f"verbatim worker_subdir names"
)
rev[short] = w
out[w] = short
return out
[docs]
def discover_modalities_from_listing(
recording_id: str,
rel_paths: Iterable[str],
*,
collapse_modalities: bool = True,
profile: str | None = None,
) -> list[ModalitySpec]:
"""Group a flat file listing into per-modality :class:`ModalitySpec`\\ s.
Parameters
----------
recording_id
Used to construct ``raw_storage_uri`` for each modality.
rel_paths
File paths relative to ``recordings/<recording_id>/`` —
typically ``"camera_0/camera_0000_*.mkv"``. Files at the recording
root (no ``/`` in the rel path; e.g. ``manifest.json``) are
skipped silently.
collapse_modalities
When ``True`` (the default), collapse worker-subdir names to
their first underscore segment. Set to ``False`` to register
with the full ``worker_subdir`` name as the modality.
profile
Optional profile override threaded to :func:`raw_modality_uri`.
When ``None``, ``URSA_PROFILE`` env var is consulted.
"""
by_worker: dict[str, list[str]] = {}
for rel in rel_paths:
if "/" not in rel:
# Files at the recording root (manifest.json copy, etc.) — skip.
continue
worker, _, _ = rel.partition("/")
by_worker.setdefault(worker, []).append(rel)
if not by_worker:
return []
if collapse_modalities:
worker_to_modality = _collapse_worker_names(sorted(by_worker.keys()))
else:
worker_to_modality = {w: w for w in sorted(by_worker.keys())}
specs: list[ModalitySpec] = []
for worker, files in sorted(by_worker.items()):
modality_name = worker_to_modality[worker]
fmt = infer_format(worker, files)
uri = raw_modality_uri(recording_id, worker, profile=profile)
specs.append(
ModalitySpec(
name=modality_name,
worker_subdir=worker,
format=fmt,
raw_storage_uri=uri,
segment_count=len(files),
)
)
return specs
def _parse_dt(value: Any) -> datetime:
"""Parse an ISO-8601 string or pass-through a :class:`datetime`.
Naive datetimes are interpreted as UTC for parity with
``examples/mvp_demo_register.py``.
"""
if isinstance(value, datetime):
return value if value.tzinfo else value.replace(tzinfo=timezone.utc)
if isinstance(value, str):
dt = datetime.fromisoformat(value)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
raise TypeError(f"expected ISO-8601 string or datetime; got {type(value).__name__}")
[docs]
def manifest_to_register_payload(
manifest: dict[str, Any],
rel_paths: Iterable[str],
*,
participant_id: str,
profile: str | None = None,
collapse_modalities: bool = True,
) -> RegisterPayload:
"""Build a :class:`RegisterPayload` from a manifest + file listing.
The manifest is the data-engine ``manifest.json`` contents as a
dict; ``rel_paths`` is the listing under ``recordings/<rec_id>/``.
Required manifest fields:
* ``recording_id`` — informational; ``manifest_recording_id``
metadata for auditability.
* ``recording_hash`` — content-addressed digest from
``data-engine/uploader/hashing.py:compute_recording_hash``.
No fallback — a manifest without this field is rejected with
``KeyError`` (B4 from round 2 — silent fallback to
``recording_id`` could orphan a row when a re-augment produces
different content under the same recording_id).
* ``started_at``, ``ended_at`` — ISO-8601 strings.
Optional manifest fields:
* ``participant`` — display-name string; copied to ``metadata`` for
auditability but the catalog ``participant_ids`` always comes from
the explicit ``participant_id=`` kwarg.
* ``files`` — list of ``{worker_id, rel_path, size, sha256}``
entries (added by data-engine's ``augment_manifest``). Currently
unused by this util; the orchestrator's hash-skip path consumes
it directly from the manifest dict.
Parameters
----------
manifest
The manifest dict.
rel_paths
File paths relative to ``recordings/<rec_id>/`` (output of
``store.list`` or ``find rec_*/``).
participant_id
The catalog ID for the participant. Required: the schema
currently enforces ``participant_ids`` non-empty. Null-participant
support for legacy recordings is tracked in ENG-1099.
profile
Optional profile override for URI construction.
collapse_modalities
See :func:`discover_modalities_from_listing`.
Raises
------
KeyError
If the manifest is missing ``recording_id`` / ``started_at`` /
``ended_at``.
ValueError
If no modality directories are discovered under
``recordings/<rec_id>/`` (likely a bad listing or a non-rig
prefix).
"""
if "recording_id" not in manifest:
raise KeyError("manifest missing required field 'recording_id'")
if "started_at" not in manifest:
raise KeyError("manifest missing required field 'started_at'")
if "ended_at" not in manifest:
raise KeyError("manifest missing required field 'ended_at'")
recording_id: str = manifest["recording_id"]
# ``recording_hash`` is the content-addressed digest of every per-file
# sha256, stamped by ``data-engine/uploader/manifest.py:augment_manifest``
# before ingest. Refusing to fall back to ``recording_id`` here is the
# no-orphan invariant from B4: a re-augmented session whose contents
# changed gets a *different* recording_hash and lands as a fresh row,
# so R2 bytes and catalog rows never disagree. Direct callers of
# ``manifest_to_register_payload`` that hand in a pre-augment manifest
# are violating the contract and should fail loudly here rather than
# quietly using recording_id and risking an orphan write.
if not manifest.get("recording_hash"):
raise KeyError(
"manifest missing required field 'recording_hash'; run "
"data-engine/uploader.manifest.augment_manifest before "
"calling manifest_to_register_payload (the recording_hash "
"is the content-addressed digest of per-file sha256s)."
)
recording_hash: str = manifest["recording_hash"]
started_at = _parse_dt(manifest["started_at"])
ended_at = _parse_dt(manifest["ended_at"])
duration = ended_at - started_at
# B2 (round 4): negative durations silently break downstream
# slicing math. Pydantic's ``timedelta`` field has no sign
# constraint by default, so a manifest with ``ended_at <
# started_at`` (clock skew, manual edit, backwards worker
# timestamp) would pass validation and land a degenerate row.
# Refuse here so the operator sees a clear error instead.
if duration < timedelta(0):
raise ValueError(
f"manifest has ended_at < started_at "
f"({manifest['ended_at']!r} < {manifest['started_at']!r}); "
"recording duration would be negative. Check the rig's "
"clock or fix the manifest before retrying."
)
specs = discover_modalities_from_listing(
recording_id,
rel_paths,
collapse_modalities=collapse_modalities,
profile=profile,
)
if not specs:
raise ValueError(
f"no modality subdirs discovered under recordings/{recording_id}/; "
"every file in the listing was at the recording root"
)
participant_display_name = manifest.get("participant")
recording_row = RecordingRow(
recording_hash=recording_hash,
participant_ids=[participant_id],
# ``EpochNs`` coerces datetime → int64 ns via a BeforeValidator at
# construction; mypy can't see through the Annotated coercer, so
# cast here matches the pattern in ``_register_recording``.
start_time=cast(int, started_at),
duration=duration,
metadata={
"participant_display_name": participant_display_name,
"manifest_recording_id": recording_id,
},
)
modality_rows: list[ModalityRow] = []
for spec in specs:
metadata: dict[str, Any] = {}
if spec.name != spec.worker_subdir:
metadata["worker_subdir"] = spec.worker_subdir
modality_rows.append(
ModalityRow(
recording_hash=recording_hash,
modality=spec.name,
raw_storage_uri=spec.raw_storage_uri,
storage_uri=spec.raw_storage_uri,
ingestion_status=IngestionStatus.RAW,
format=spec.format,
metadata=metadata,
)
)
return RegisterPayload(recording=recording_row, modalities=modality_rows)