"""Derive a modality's first-sample wall-clock epoch from its raw files.
Companion to :mod:`ursa.recovery.timing` (which derives recording *end*
times for duration fixes). ``scripts/fix_modalities_start_epoch.py``
re-anchors legacy zero-origin ``ModalityRow.domain_intervals`` onto the
shared recording origin (``RecordingRow.start_time``), which requires each
modality's own first-sample absolute epoch — the value Virgo's
shared-origin parsers record as ``start_epoch_ns`` at ingest. The
per-modality derivations here replicate those parsers' first-sample
anchoring (``virgo/src/virgo/adapters/data_engine_parsers/*``) from the raw
*timestamps sidecars alone* — no bulk data is read, so a full-catalog
backfill touches only headers, first CSV rows, and 16-byte binary anchors.
Probe semantics mirror ``timing.py``: a probe returns ``None`` when no
parseable signal exists (missing/empty sidecar, unknown header); read or
parse *failures* raise and are caught per-row by the caller. Derived epochs
are intentionally **approximate to the parser's value at sub-millisecond
level** (identical formulas, independent float paths); the consuming script
additionally bounds every epoch against the recording's own time window, so
a glitched first row (e.g. a Pupil-SDK clock step) downgrades its row to
*underivable* instead of committing a wild shift.
Modality routing:
* ``video-webcam`` / ``camera`` / ``screen`` are **excluded** — their MP4
assets must be re-ingested (the sidecar timestamps change with the
origin), so the catalog-only fix does not apply.
* ``pupillabs-scene`` is **excluded** — its Pupil Labs
``world_timestamps_*.csv`` carries no joinable PTS column, so the
modality has no aligned first-sample epoch to record.
Per-modality first-sample sources (segment 0 = lowest segment index):
==================== ======================================== =====================================
Modality slug(s) Raw source (segment 0) First-sample epoch formula
==================== ======================================== =====================================
``eeg`` (new fmt) ``eeg_0000_timestamps.bin`` pair 0 ``time`` (float64 Unix s) × 1e9
``eeg-old-format`` amp0 ``…_timestamps.bin`` anchor 0 ``(time_s − offset/rate)`` × 1e9
``eeg-cascaded`` ``…_<serial>_timestamps.bin`` anchor 0 ``(time_s − offset/rate)`` × 1e9
``microphone``/`mic` ``mic_timestamps_0000_*.csv`` anchor 0 ``epoch_ns − offset × 1e9/rate``
``samsung-watch-…`` ``imu_0000_*.csv`` row 0 col0 (int ns) value
``pupillabs-gaze``… ``gaze_0000_*.csv``/``gaze.csv`` row 0 ``timestamp_unix_s`` × 1e9
``pupillabs-imu`` ``imu_0000_*.csv`` row 0 ``timestamp_unix_s`` × 1e9
``samsungwatch`` min across stream CSVs' row 0 ``timestamp`` ns / ``ts`` s × 1e9
``mouse``/…/`battery` first data row ``timestamp`` (float s) value × 1e9
``notes`` min ``ts_capture`` across all segments value × 1e9
``browser`` ``events.jsonl`` line 0 ``timestamp`` value (ms) × 1e6
``location``/`env…` ``environment_0000_*.jsonl`` line 0 ``timestamp`` ns / legacy ``ts`` × 1e9
==================== ======================================== =====================================
"""
from __future__ import annotations
import csv
import json
import re
import struct
from collections.abc import Callable
from urllib.parse import urlparse
from ursa.recovery.timing import _list_keys_under
from ursa.store import ObjectStore
__all__ = [
"PERMANENT_NULL_SLUGS",
"VIDEO_REINGEST_SLUGS",
"derive_first_sample_epoch_ns",
"supported_slugs",
]
#: Video modalities whose backfill is a re-ingest (the MP4 frame-index
#: sidecar must be rewritten together with the domain origin) — out of
#: scope for the catalog-only fix.
VIDEO_REINGEST_SLUGS = frozenset({"video-webcam", "camera", "screen"})
#: Modalities with no derivable aligned first-sample epoch; their rows stay
#: ``start_epoch_ns IS NULL`` permanently (part of the expected-NULL set in
#: the backfill's definition of done).
PERMANENT_NULL_SLUGS = frozenset({"pupillabs-scene"})
# Head size for first-row CSV/JSONL reads. First data rows sit within the
# first few hundred bytes; 8 KiB tolerates wide headers and long rows.
_HEAD_BYTES = 8192
def _prefix_of(raw_storage_uri: str) -> str:
"""Modality URI → ``store.list``-ready key prefix (with trailing slash)."""
key = urlparse(raw_storage_uri).path.lstrip("/")
return key if key.endswith("/") else key + "/"
def _pick_first_by_index(
files: list[tuple[str, int]],
*,
index_regex: re.Pattern[str],
) -> tuple[str, int] | None:
"""Pick the ``(key, size)`` whose basename has the *smallest* numeric
segment index (min-analog of ``timing._pick_latest_by_index``).
Non-matching basenames sort last (sentinel ``+inf``-like index) so a
stray file can't out-rank a real segment-0 file. Ties (e.g. old-format
per-amplifier EEG, several ``…_ampN_…`` files at index 0) break
lexicographically, which prefers ``amp0``.
"""
if not files:
return None
def _key(kv: tuple[str, int]) -> tuple[int, str]:
basename = kv[0].rsplit("/", 1)[-1]
m = index_regex.search(basename)
if m is None:
return (1 << 62, kv[0])
try:
return (int(m.group(1)), kv[0])
except (ValueError, IndexError):
return (1 << 62, kv[0])
return min(files, key=_key)
def _head_lines(store: ObjectStore, key: str, size: int) -> list[str]:
"""First complete text lines of *key* (up to :data:`_HEAD_BYTES`).
Drops the final line when the read stopped mid-file (it may be
truncated); blank and ``#``-comment lines are skipped.
"""
length = min(size, _HEAD_BYTES)
head = store.get_range(key, start=0, length=length)
text = head.decode("utf-8", errors="ignore")
lines = text.splitlines()
if length < size and lines:
lines = lines[:-1]
return [ln for ln in lines if ln.strip() and not ln.lstrip().startswith("#")]
def _head_csv(store: ObjectStore, key: str, size: int) -> tuple[list[str], list[list[str]]] | None:
"""``(header, data_rows)`` from the head of a CSV, or ``None`` if the
file has fewer than 2 non-comment lines (no header row or no data row)."""
lines = _head_lines(store, key, size)
if len(lines) < 2:
return None
rows = list(csv.reader(lines))
return [h.strip() for h in rows[0]], rows[1:]
def _wav_sample_rate(header: bytes) -> int | None:
"""Sample rate from a RIFF/WAVE header's ``fmt`` chunk, or ``None``."""
if len(header) < 44 or header[:4] != b"RIFF" or header[8:12] != b"WAVE":
return None
offset = 12
while offset + 8 <= len(header):
chunk_id = header[offset : offset + 4]
(chunk_size,) = struct.unpack("<I", header[offset + 4 : offset + 8])
body_offset = offset + 8
if chunk_id == b"fmt ":
if body_offset + 16 > len(header):
return None
_fmt, _ch, sample_rate, _br, _ba, _bps = struct.unpack(
"<HHIIHH", header[body_offset : body_offset + 16]
)
return int(sample_rate)
offset = body_offset + chunk_size + (chunk_size & 1)
return None
# --------------------------------------------------------------------------
# EEG — all three layouts read a 16-byte (int64 offset, float64 time) pair.
# --------------------------------------------------------------------------
_EEG_TS_INDEX_RE = re.compile(r"^eeg_(\d+)")
# New format: per-sample pairs, no epoch suffix in the name.
_EEG_TS_NEW_RE = re.compile(r"^eeg_\d+_timestamps\.bin$")
def _first_epoch_eeg(store: ObjectStore, prefix: str) -> int | None:
files = [
(k, s)
for (k, s) in _list_keys_under(store, prefix)
if k.endswith("_timestamps.bin") and s >= 16
]
first = _pick_first_by_index(files, index_regex=_EEG_TS_INDEX_RE)
if first is None:
return None
key, _size = first
offset, time_val = struct.unpack("<qd", store.get_range(key, start=0, length=16))
basename = key.rsplit("/", 1)[-1]
if _EEG_TS_NEW_RE.match(basename):
# New format: per-sample pairs; pair 0's time IS sample 0's epoch.
return int(time_val * 1e9)
# Old per-amplifier / cascaded: anchor pairs — back-compute sample 0 via
# the sampling rate. A 0-offset anchor needs no rate at all.
if offset == 0:
return int(time_val * 1e9)
rate = _eeg_sampling_rate(store, prefix)
if rate is None or rate <= 0:
return None
return int((time_val - offset / rate) * 1e9)
def _eeg_sampling_rate(store: ObjectStore, prefix: str) -> float | None:
"""``sampling_rate`` from the first (sorted-key order) of the cascaded
top-level ``metadata.json`` / per-segment ``*_meta.json`` files that
parses with a positive rate — all carry the same value."""
metas = [
(k, s)
for (k, s) in _list_keys_under(store, prefix)
if k.endswith("metadata.json") or k.endswith("_meta.json")
]
for key, _size in sorted(metas):
try:
doc = json.loads(store.get(key))
except ValueError:
# Malformed JSON in one candidate: try the next. Read failures
# propagate per the module contract (caught per-row by the caller).
continue
rate = doc.get("sampling_rate")
if isinstance(rate, int | float) and rate > 0:
return float(rate)
return None
# --------------------------------------------------------------------------
# Microphone — anchor CSV back-computed via the WAV's sample rate.
# --------------------------------------------------------------------------
_MIC_WAV_INDEX_RE = re.compile(r"^(?:mic|segment)_(\d+)_")
# Timestamp sidecars only ever use the ``mic_timestamps_*`` name. The WAV
# itself may carry the legacy ``segment_*`` name (above), but the CSV filter
# below requires ``timestamps`` in the basename, so a ``segment_*`` arm here
# would be dead.
_MIC_TS_INDEX_RE = re.compile(r"^mic_timestamps_(\d+)_")
def _first_epoch_mic(store: ObjectStore, prefix: str) -> int | None:
listing = _list_keys_under(store, prefix)
wavs = [(k, s) for (k, s) in listing if k.endswith(".wav") and s > 44]
first_wav = _pick_first_by_index(wavs, index_regex=_MIC_WAV_INDEX_RE)
if first_wav is None:
return None
wav_key, _wav_size = first_wav
csvs = [
(k, s)
for (k, s) in listing
if k.endswith(".csv") and ("timestamps" in k.rsplit("/", 1)[-1])
]
first_csv = _pick_first_by_index(csvs, index_regex=_MIC_TS_INDEX_RE)
if first_csv is not None:
parsed = _head_csv(store, *first_csv)
if parsed is not None:
header, rows = parsed
row = rows[0]
if header[:2] == ["sample_offset", "epoch_ns"] and len(row) >= 2:
sample_offset, epoch_ns = int(row[0]), int(row[1])
if sample_offset == 0:
return epoch_ns
rate = _wav_sample_rate(store.get_range(wav_key, start=0, length=256))
if rate:
return int(float(epoch_ns) - float(sample_offset) * 1e9 / rate)
# rate unreadable -> fall through to the WAV filename fallback.
elif header[:2] == ["sample_offset", "timestamp_s"] and len(row) >= 2:
sample_offset, t_s = int(row[0]), float(row[1])
if sample_offset == 0:
return int(t_s * 1e9)
rate = _wav_sample_rate(store.get_range(wav_key, start=0, length=256))
if rate:
return int((t_s - sample_offset / rate) * 1e9)
# rate unreadable -> fall through to the WAV filename fallback.
# No usable anchor (missing/unparseable CSV, unknown header, or a non-zero
# offset whose WAV sample rate couldn't be read): fall back to the WAV
# filename's 1-second-truncated epoch suffix — consistent with what a
# re-ingest would record, at 1 s precision.
# 9-10 digits = Unix *seconds* (2001-2286); rejects ns-precision suffixes
# that would otherwise be multiplied into an absurd epoch below.
m = re.search(r"_(\d{9,10})\.wav$", wav_key)
if m is None:
return None
return int(m.group(1)) * 1_000_000_000
# --------------------------------------------------------------------------
# CSV / JSONL first-row families.
# --------------------------------------------------------------------------
def _first_epoch_csv(
store: ObjectStore,
prefix: str,
*,
file_re: re.Pattern[str],
index_re: re.Pattern[str],
ts_column: str,
unit: str,
) -> int | None:
"""First data row's *ts_column* from the min-index file matching
*file_re*, converted to epoch-ns per *unit* (``"ns"`` or ``"s"``)."""
files = [
(k, s) for (k, s) in _list_keys_under(store, prefix) if file_re.match(k.rsplit("/", 1)[-1])
]
first = _pick_first_by_index(files, index_regex=index_re)
if first is None:
return None
parsed = _head_csv(store, *first)
if parsed is None:
return None
header, rows = parsed
if ts_column not in header:
return None
col = header.index(ts_column)
row = rows[0]
if col >= len(row):
return None
raw = row[col]
if unit == "ns":
return int(raw)
return int(float(raw) * 1e9)
def _first_epoch_notes(store: ObjectStore, prefix: str) -> int | None:
"""Min ``ts_capture`` across all ``notes_*.csv`` segments (notes arrive
out of capture order; the parser sorts before anchoring). Notes files
are tiny, so full reads are fine."""
files = [
(k, s)
for (k, s) in _list_keys_under(store, prefix)
if re.match(r"^notes_\d+_\d+\.csv$", k.rsplit("/", 1)[-1])
]
best: float | None = None
for key, _size in files:
text = store.get(key).decode("utf-8", errors="ignore")
rows = list(csv.reader(text.splitlines()))
if len(rows) < 2 or "ts_capture" not in [h.strip() for h in rows[0]]:
continue
col = [h.strip() for h in rows[0]].index("ts_capture")
for row in rows[1:]:
if col < len(row) and row[col].strip():
try:
val = float(row[col])
except ValueError:
continue
best = val if best is None else min(best, val)
return int(best * 1e9) if best is not None else None
_WATCH_STREAMS = ("imu", "ppg", "heart_rate", "eda", "skin_temperature", "battery")
_WATCH_INDEX_RE = re.compile(r"^[a-z_]+_(\d+)_")
def _first_epoch_samsungwatch(store: ObjectStore, prefix: str) -> int | None:
"""Min first-sample epoch across the watch's per-stream CSVs (the parser
anchors the multiplexed series at the global minimum timestamp)."""
listing = _list_keys_under(store, prefix)
best: int | None = None
for stream in _WATCH_STREAMS:
stream_re = re.compile(rf"^{stream}_\d+_\d+\.csv$")
files = [(k, s) for (k, s) in listing if stream_re.match(k.rsplit("/", 1)[-1])]
first = _pick_first_by_index(files, index_regex=_WATCH_INDEX_RE)
if first is None:
continue
parsed = _head_csv(store, *first)
if parsed is None:
continue
header, rows = parsed
row = rows[0]
if "timestamp" in header: # current wire format: int64 ns
col = header.index("timestamp")
if col < len(row):
candidate = int(row[col])
else:
continue
elif "ts" in header: # older wire format: float64 s
col = header.index("ts")
if col < len(row):
candidate = int(float(row[col]) * 1e9)
else:
continue
else:
continue
best = candidate if best is None else min(best, candidate)
return best
def _first_epoch_browser(store: ObjectStore, prefix: str) -> int | None:
"""First rrweb event's ``timestamp`` (milliseconds) from ``events.jsonl``."""
files = [(k, s) for (k, s) in _list_keys_under(store, prefix) if k.endswith("events.jsonl")]
if not files:
return None
key, size = sorted(files)[0]
for line in _head_lines(store, key, size):
try:
obj = json.loads(line)
except ValueError:
continue
if "timestamp" in obj:
val = obj["timestamp"] # ms → ns; integer path avoids float64 rounding
return int(val) * 1_000_000 if isinstance(val, int) else int(float(val) * 1e6)
return None
_ENV_INDEX_RE = re.compile(r"^environment_(\d+)_")
def _first_epoch_environment(store: ObjectStore, prefix: str) -> int | None:
"""First poll record from the min-index ``environment_*.jsonl``
(canonical ``timestamp`` int ns; legacy alias ``ts`` float s)."""
files = [
(k, s)
for (k, s) in _list_keys_under(store, prefix)
if re.match(r"^environment_\d+_\d+\.jsonl$", k.rsplit("/", 1)[-1])
]
first = _pick_first_by_index(files, index_regex=_ENV_INDEX_RE)
if first is None:
return None
for line in _head_lines(store, *first):
try:
obj = json.loads(line)
except ValueError:
continue
if "timestamp" in obj:
return int(obj["timestamp"])
if "ts" in obj:
return int(float(obj["ts"]) * 1e9)
return None
# --------------------------------------------------------------------------
# Dispatch
# --------------------------------------------------------------------------
def _gaze_probe(store: ObjectStore, prefix: str) -> int | None:
# Segmented gaze_NNNN_<epoch>.csv or flat gaze.csv (old layout).
epoch = _first_epoch_csv(
store,
prefix,
file_re=re.compile(r"^gaze_\d+_\d+\.csv$"),
index_re=re.compile(r"^gaze_(\d+)_"),
ts_column="timestamp_unix_s",
unit="s",
)
if epoch is not None:
return epoch
flat = [(k, s) for (k, s) in _list_keys_under(store, prefix) if k.endswith("/gaze.csv")]
if not flat:
return None
parsed = _head_csv(store, *flat[0])
if parsed is None:
return None
header, rows = parsed
if "timestamp_unix_s" not in header:
return None
col = header.index("timestamp_unix_s")
row = rows[0]
return int(float(row[col]) * 1e9) if col < len(row) else None
_PROBES: dict[str, Callable[[ObjectStore, str], int | None]] = {
"eeg": _first_epoch_eeg,
"eeg-old-format": _first_epoch_eeg,
"eeg-cascaded": _first_epoch_eeg,
"microphone": _first_epoch_mic,
"mic": _first_epoch_mic,
"samsung-watch-accel": lambda store, prefix: _first_epoch_csv(
store,
prefix,
file_re=re.compile(r"^imu_\d+_\d+\.csv$"),
index_re=re.compile(r"^imu_(\d+)_"),
ts_column="timestamp",
unit="ns",
),
"samsungwatch": _first_epoch_samsungwatch,
"pupillabs-gaze": _gaze_probe,
"pupillabs": _gaze_probe,
"pupillabs-segmented": _gaze_probe,
"pupillabs-imu": lambda store, prefix: _first_epoch_csv(
store,
prefix,
file_re=re.compile(r"^imu_\d+_\d+\.csv$"),
index_re=re.compile(r"^imu_(\d+)_"),
ts_column="timestamp_unix_s",
unit="s",
),
"mouse": lambda store, prefix: _first_epoch_csv(
store,
prefix,
file_re=re.compile(r"^(?:mouse|segment)_\d+_\d+\.csv$"),
index_re=re.compile(r"^(?:mouse|segment)_(\d+)_"),
ts_column="timestamp",
unit="s",
),
"keyboard": lambda store, prefix: _first_epoch_csv(
store,
prefix,
file_re=re.compile(r"^(?:keyboard|segment)_\d+_\d+\.csv$"),
index_re=re.compile(r"^(?:keyboard|segment)_(\d+)_"),
ts_column="timestamp",
unit="s",
),
"notes": _first_epoch_notes,
"battery": lambda store, prefix: _first_epoch_csv(
store,
prefix,
file_re=re.compile(r"^battery\.csv$"),
# battery.csv carries no segment-index suffix, so the index_re never
# matches and the sole file ranks via the non-match sentinel — that's
# intentional (file_re already restricts the listing to one basename).
index_re=re.compile(r"^battery_(\d+)_"),
ts_column="timestamp",
unit="s",
),
"browser": _first_epoch_browser,
"location": _first_epoch_environment,
"environment": _first_epoch_environment,
}
[docs]
def supported_slugs() -> frozenset[str]:
"""Modality slugs the catalog-only backfill can derive an epoch for."""
return frozenset(_PROBES)
[docs]
def derive_first_sample_epoch_ns(
store: ObjectStore, raw_storage_uri: str, modality: str
) -> int | None:
"""First-sample absolute epoch (ns) for *modality* under its raw prefix.
Returns ``None`` for an absent/unparseable signal AND for modality slugs
with no probe (video → re-ingest; ``pupillabs-scene`` → permanently
unaligned; unknown slugs). Read/parse failures raise — the caller treats
them per-row.
"""
probe = _PROBES.get(modality)
if probe is None:
return None
return probe(store, _prefix_of(raw_storage_uri))