Source code for ursa.recovery.timing

"""Derive a recording's ``ended_at`` from per-modality segment metadata.

When the catalog's ``RecordingRow.duration`` is wrong or missing (e.g.
the 198 legacy rows whose manifests were reconstructed with
``ended_at = max(R2 last_modified)`` = upload time, not session end),
this module re-derives the true session end from the segment files the
rigs wrote at recording time.

Several signal sources are supported, split into **data-stream** probes
(read from actual recorded sample timestamps) and a universal
**worker-report** fallback:

- **EEG** (``eeg_*/eeg_*_timestamps.bin``): per-chunk
  ``(int64 offset, float64 unix_ts)`` pairs packed by
  ``data-engine/eeg/recorder.py``. The last 16-byte struct gives the
  timestamp of the most recent data chunk written before the recorder
  stopped.
- **Camera** (``camera_*/camera_timestamps_NNNN_<first_frame_ts>.csv``
  OR ``camera_*/segment_NNNN_<first_frame_ts>.csv``): per-frame
  ``frame, pts_ns, wall_clock, epoch_ns`` rows written by
  ``data-engine/camera/timestamps.py``. The last row's ``epoch_ns``
  gives the most recent frame's wall-clock. Two filename conventions
  are accepted because the legacy rig writers emit ``segment_*.csv``
  while newer writers emit ``camera_timestamps_*.csv`` — same column
  shape. The filter still requires one of those two prefixes (not
  just ``.csv``) so a stray debug dump can't be lex-max-picked and
  silently downgrade the recording.
- **Microphone** (``mic_*/mic_NNNN_<unix_ts>.wav``): the filename
  encodes the first-frame unix timestamp; the RIFF ``data`` chunk size
  + sample rate + channels + bits-per-sample give the duration.
  End = start + duration.
- **Screen** (``screen_*/screen_timestamps_NNNN_<first_frame_ts>.csv``):
  identical ``frame, pts_ns, wall_clock, epoch_ns`` columns as camera —
  the last row's ``epoch_ns`` is the most recent captured frame.
- **Samsung watch** (``samsungwatch_*/<sensor>_NNNN_<start_ts>.csv`` for
  imu / ppg / eda / heart_rate / battery): each sensor stream's first
  column is a nanosecond ``timestamp``; the max last-row timestamp
  across sensors is the last sample received.
- **Environment** (``environment_*/environment_NNNN_<start_ts>.jsonl``):
  newline-delimited ``{"timestamp": <ns>, "type": ...}`` poll records;
  the max ``timestamp`` is the last poll.
- **Worker report** (``<worker>/worker_report*.json``): a universal
  fallback across every worker type (notes, pupillabs, location,
  keyboard, mouse, battery, etc.). Each worker writes a report on clean
  shutdown carrying an explicit ``stopped_at_utc`` ISO-8601 timestamp.
  Probed for every worker dir alongside the data-stream probe, so a
  recording whose ``eeg_*_timestamps.bin`` is 0-bytes (failed recorder)
  can still derive ``ended_at`` from a sibling worker that shut down
  cleanly.

:func:`derive_recording_end_time` returns a :class:`ModalitySignals`
holding the per-source candidates; the recording's true ``ended_at``
is derived via :meth:`ModalitySignals.max_end`, which prefers the
latest **data-stream** end (eeg / camera / mic / screen / samsungwatch /
environment) when any such signal exists, falling back to
``worker_report`` only when none fired (recordings made up solely of
worker types that still lack a sample-timestamp probe).

Conservative-by-design: parsers return ``None`` when no parseable
signal exists (no ``_timestamps.bin`` present, no
``camera_timestamps_``/``segment_`` CSV, no valid WAV header, no
``worker_report*.json`` with a parseable ``stopped_at_utc`` — the
*absent-signal* case). Exceptions raised by read or parse failures are
caught by :func:`derive_recording_end_time` and appended to
:attr:`ModalitySignals.errors` — the *failed-parse* case.
**An empty ``errors`` list with a ``None`` end-time means "no signal,"
not "no failure."** A single broken segment file downgrades only its own
recording to ``indeterminate`` rather than aborting a multi-recording
rebuild.
"""

from __future__ import annotations

import csv
import json
import re
import struct
from dataclasses import dataclass, field
from datetime import datetime, timezone

from ursa.layout import is_worker_report_basename
from ursa.store import ObjectStore

__all__ = [
    "CSV_TAIL_BYTES",
    "WAV_HEADER_BYTES",
    "ModalitySignals",
    "derive_recording_end_time",
]


# Tail size for CSV/JSONL data-stream reads (camera / screen / samsungwatch
# / environment). Each row is ~70 bytes; 8 KiB covers ~100 rows comfortably.
# We only need the last row(s).
CSV_TAIL_BYTES = 8192

# WAV header is fixed-size for standard PCM (44 bytes for the canonical
# layout); reading 256 bytes is enough to traverse any non-PCM extension
# subchunks and locate the ``data`` subchunk size field.
WAV_HEADER_BYTES = 256


[docs] @dataclass class ModalitySignals: """End-time candidates per signal source for one recording. Populated by :func:`derive_recording_end_time`. Use :meth:`max_end` to collapse the modality candidates into a single ``ended_at`` value; ``None`` indicates no signal source produced a parseable result (the row is *indeterminate*). """ eeg: datetime | None = None camera: datetime | None = None mic: datetime | None = None #: Last per-sample timestamp from a ``screen_*`` worker's #: ``screen_timestamps_*.csv`` — identical ``(frame, pts_ns, #: wall_clock, epoch_ns)`` shape as camera. screen: datetime | None = None #: Last per-sample timestamp across a ``samsungwatch_*`` worker's #: sensor CSVs (imu / ppg / eda / heart_rate / battery), each with a #: nanosecond ``timestamp`` first column. samsungwatch: datetime | None = None #: Last poll timestamp from an ``environment_*`` worker's #: ``environment_*.jsonl`` (``{"timestamp": <ns>, ...}`` records). environment: datetime | None = None #: Latest ``stopped_at_utc`` across any ``worker_report*.json`` in #: any worker dir under this recording. Universal across worker #: types — populated whenever at least one worker shut down cleanly #: enough to write a report, regardless of whether the data-stream #: probes also fired. worker_report: datetime | None = None #: Per-worker error strings recorded when a parser raised. A #: non-empty list does NOT necessarily mean :meth:`max_end` is #: ``None`` — a single worker can fail while another succeeds; the #: caller should log these regardless of the ``max_end()`` outcome. errors: list[str] = field(default_factory=list)
[docs] def max_end(self) -> datetime | None: """Best ``ended_at`` estimate, or ``None`` if no signal produced a candidate (the row is *indeterminate*). Prefers the latest **data-stream** end (``eeg`` / ``camera`` / ``mic`` / ``screen`` / ``samsungwatch`` / ``environment``), each read from actual recorded sample timestamps. ``worker_report`` (``stopped_at_utc``) is wall-clock at *worker process teardown*, not the last sample — workers routinely linger hours (even into the next day) past the last data, so including it in the max inflated durations (e.g. a 5h session reported as 30h, or short single-modality sessions reported as a uniform ~8h once the recorder's idle timeout fires). It is therefore used **only as a fallback** when no data-stream signal exists — i.e. for worker types that still lack a sample-timestamp probe (notes / pupillabs / keyboard / mouse / location / battery). """ stream = [ t for t in ( self.eeg, self.camera, self.mic, self.screen, self.samsungwatch, self.environment, ) if t is not None ] if stream: return max(stream) return self.worker_report
def _list_keys_under(store: ObjectStore, prefix: str) -> list[tuple[str, int]]: """Return ``(key, size)`` for every object directly or transitively under ``prefix``. Modality-specific filename filtering happens in the per-modality ``_*_end_time`` helpers.""" return [(meta.key, meta.size) for meta in store.list(prefix=prefix)] def _pick_latest_by_index( files: list[tuple[str, int]], *, index_regex: re.Pattern[str], ) -> tuple[str, int] | None: """Pick the ``(key, size)`` whose basename has the largest numeric segment index per ``index_regex.search(basename).group(1)``. Falls back to lexicographic max for any file whose basename doesn't match the regex (sentinel index ``-1``), so a stray-named file can't out-rank a real segment. Returns ``None`` if ``files`` is empty. Rationale (F1, 2026-05-20 review): data-engine recorders pad segment indices to four digits, so once a recording crosses 10000 segments, ``eeg_10000_…`` lex-sorts *before* ``eeg_9999_…`` and a naive ``max(files)`` returns an earlier segment. Long-running pilot rigs or sleep studies can plausibly cross 10000 EEG segments. """ if not files: return None def _key(kv: tuple[str, int]) -> tuple[int, str]: basename = kv[0].rsplit("/", 1)[-1] m = index_regex.search(basename) if m is None: return (-1, kv[0]) try: return (int(m.group(1)), kv[0]) except (ValueError, IndexError): return (-1, kv[0]) return max(files, key=_key) _EEG_SEGMENT_INDEX_RE = re.compile(r"^eeg_(\d+)_") def _eeg_end_time(store: ObjectStore, rec_prefix: str, worker_dir: str) -> datetime | None: """Last EEG timestamp for one ``eeg_*`` worker, or ``None`` if no ``*_timestamps.bin`` file is present for this worker. Does not catch read or ``struct.unpack`` errors — those propagate up to :func:`derive_recording_end_time`'s outer ``except Exception`` so the per-row ``errors`` list captures the actual failure mode. """ files = [ (k, s) for (k, s) in _list_keys_under(store, f"{rec_prefix}{worker_dir}/") if k.endswith("_timestamps.bin") and s >= 16 ] latest = _pick_latest_by_index(files, index_regex=_EEG_SEGMENT_INDEX_RE) if latest is None: return None latest_key, latest_size = latest tail = store.get_range(latest_key, start=latest_size - 16, length=16) _offset, unix_ts = struct.unpack("<qd", tail) return datetime.fromtimestamp(unix_ts, tz=timezone.utc) def _read_last_csv_row(store: ObjectStore, key: str, size: int) -> list[str] | None: """Return the last *complete* data row of a CSV as a field list, or ``None`` if the tail holds no usable row. Shared by every CSV-tailing probe (camera / screen / samsungwatch). Reads only the trailing :data:`CSV_TAIL_BYTES` and applies these truncation guards: * Strip the leading partial line when the read started mid-file (``read_start > 0``). * Drop the final line when the buffer doesn't end in ``\\n`` — a rig crashing mid-flush leaves a truncated final row (e.g. an ``epoch_ns`` missing its nanosecond suffix that parses to ~1970); trust the previous complete row instead. * Skip blank and ``#``-comment lines. The CSV *header* is not filtered here — callers validate the row shape (e.g. a numeric first column) and treat a surviving header as "no signal", so a header-only file yields ``None`` downstream. """ read_start = max(0, size - CSV_TAIL_BYTES) tail = store.get_range(key, start=read_start, length=size - read_start) text = tail.decode("utf-8", errors="ignore") is_complete_eof = text.endswith("\n") lines = text.splitlines() if read_start > 0 and lines: lines = lines[1:] if not is_complete_eof and lines: lines = lines[:-1] candidates = [ln for ln in lines if ln.strip() and not ln.lstrip().startswith("#")] if not candidates: return None return next(csv.reader([candidates[-1]])) def _frame_csv_end_time( store: ObjectStore, rec_prefix: str, worker_dir: str, *, basename_prefixes: tuple[str, ...], index_regex: re.Pattern[str], ) -> datetime | None: """Last frame timestamp from a ``(frame, pts_ns, wall_clock, epoch_ns)`` CSV stream, or ``None`` if no readable timestamps CSV is present. The data-engine camera and screen recorders write byte-identical column shapes (``epoch_ns`` in column 3); this helper backs both :func:`_camera_end_time` and :func:`_screen_end_time`. ``basename_prefixes`` restricts which ``.csv`` files count as a timestamps stream — accepting any ``.csv`` would let a stray debug dump that lex-sorts after the real file get picked and silently downgrade the recording to indeterminate. """ files = [ (k, s) for (k, s) in _list_keys_under(store, f"{rec_prefix}{worker_dir}/") if k.endswith(".csv") and k.rsplit("/", 1)[-1].startswith(basename_prefixes) and s > 0 ] latest = _pick_latest_by_index(files, index_regex=index_regex) if latest is None: return None last_row = _read_last_csv_row(store, *latest) if last_row is None or len(last_row) < 4: return None # Verify the row looks like a real data row (numeric frame index) # before trusting column 3; return ``None`` for any non-numeric row # (e.g. a surviving header) so a single schema-drift artifact # downgrades only its own recording to ``indeterminate`` rather than # aborting the whole backfill. try: int(last_row[0]) epoch_ns = int(last_row[3]) except (ValueError, IndexError): return None return datetime.fromtimestamp(epoch_ns / 1_000_000_000, tz=timezone.utc) _CAMERA_SEGMENT_INDEX_RE = re.compile(r"^(?:camera_timestamps|segment)_(\d+)_") #: Basename prefixes accepted by :func:`_camera_end_time`. Both shapes #: emit identical ``(frame, pts_ns, wall_clock, epoch_ns)`` columns — #: ``segment_*.csv`` is the legacy rig-writer convention (pre-2026-Q2), #: still present in the v0.1.0-backfill cohort under R2; #: ``camera_timestamps_*.csv`` is the post-pipeline-rewrite shape. _CAMERA_CSV_BASENAME_PREFIXES = ("camera_timestamps_", "segment_") def _camera_end_time(store: ObjectStore, rec_prefix: str, worker_dir: str) -> datetime | None: """Last camera frame timestamp for one ``camera_*`` worker, or ``None`` if no readable timestamps CSV is present. Reads ``camera_timestamps_{idx:04d}_{first_frame_ts}.csv`` (written by ``data-engine/camera/pipeline.py``) or the legacy ``segment_{idx:04d}_{first_frame_ts}.csv`` shape from older rig writers (same column layout). See :func:`_frame_csv_end_time`. """ return _frame_csv_end_time( store, rec_prefix, worker_dir, basename_prefixes=_CAMERA_CSV_BASENAME_PREFIXES, index_regex=_CAMERA_SEGMENT_INDEX_RE, ) _SCREEN_SEGMENT_INDEX_RE = re.compile(r"^screen_timestamps_(\d+)_") #: Basename prefix for the screen recorder's per-frame timestamps CSV #: (``screen_timestamps_{idx:04d}_{first_frame_ts}.csv``). Same #: ``(frame, pts_ns, wall_clock, epoch_ns)`` columns as camera. _SCREEN_CSV_BASENAME_PREFIXES = ("screen_timestamps_",) def _screen_end_time(store: ObjectStore, rec_prefix: str, worker_dir: str) -> datetime | None: """Last screen-capture frame timestamp for one ``screen_*`` worker, or ``None`` if no readable timestamps CSV is present. The screen recorder emits the identical column shape as camera, so this reuses :func:`_frame_csv_end_time`. Recovering the real last frame here (vs. falling back to ``worker_report``) is what stops a short screen session whose worker lingered to its idle timeout from being recorded as a ~8h recording (ENG-1295 modality-only caveat). """ return _frame_csv_end_time( store, rec_prefix, worker_dir, basename_prefixes=_SCREEN_CSV_BASENAME_PREFIXES, index_regex=_SCREEN_SEGMENT_INDEX_RE, ) #: Matches a data-engine segment basename ``<sensor>_{idx}_{start_ts}.csv`` #: and captures the segment index — used to pick the latest segment per #: samsungwatch sensor stream. ``<sensor>`` may contain underscores #: (``heart_rate``), so the index is anchored as the second-to-last #: ``_``-delimited numeric field before ``.csv``. _WATCH_SEGMENT_INDEX_RE = re.compile(r"_(\d+)_\d+\.csv$") _WATCH_CSV_RE = re.compile(r"^(?P<sensor>.+?)_(?P<idx>\d+)_(?P<start>\d+)\.csv$") def _samsungwatch_end_time(store: ObjectStore, rec_prefix: str, worker_dir: str) -> datetime | None: """Latest per-sample timestamp across a ``samsungwatch_*`` worker's sensor CSVs (imu / ppg / eda / heart_rate / battery), or ``None`` if none carry a parseable timestamp. Each watch sensor stream is a separate ``<sensor>_{idx}_{start_ts}.csv`` whose first column is a nanosecond ``timestamp``. We take the latest segment *per sensor*, read its last complete row, and return the max timestamp across sensors — the sensors stop emitting at the real session end even though the worker process lingers to its idle timeout (ENG-1295 modality-only caveat). """ by_sensor: dict[str, list[tuple[str, int]]] = {} for k, s in _list_keys_under(store, f"{rec_prefix}{worker_dir}/"): if s <= 0 or not k.endswith(".csv"): continue m = _WATCH_CSV_RE.match(k.rsplit("/", 1)[-1]) if m is None: continue by_sensor.setdefault(m.group("sensor"), []).append((k, s)) latest_ns: int | None = None for files in by_sensor.values(): latest = _pick_latest_by_index(files, index_regex=_WATCH_SEGMENT_INDEX_RE) if latest is None: continue last_row = _read_last_csv_row(store, *latest) if not last_row: continue try: ts_ns = int(last_row[0]) except (ValueError, IndexError): continue if latest_ns is None or ts_ns > latest_ns: latest_ns = ts_ns if latest_ns is None: return None return datetime.fromtimestamp(latest_ns / 1_000_000_000, tz=timezone.utc) _ENV_SEGMENT_INDEX_RE = re.compile(r"^environment_(\d+)_") def _environment_end_time(store: ObjectStore, rec_prefix: str, worker_dir: str) -> datetime | None: """Last poll timestamp from an ``environment_*`` worker's ``environment_{idx}_{start_ts}.jsonl``, or ``None`` if no readable record carries a ``timestamp``. Each line is a JSON object ``{"timestamp": <ns>, "type": ..., ...}``; one poll emits several records (location / weather / aqi / alerts) sharing a timestamp. We scan the tail of the latest segment and take the max ``timestamp``. Like the other data-stream probes this recovers the real last poll instead of the worker's lingering ``stopped_at_utc`` (ENG-1295 modality-only caveat). """ files = [ (k, s) for (k, s) in _list_keys_under(store, f"{rec_prefix}{worker_dir}/") if k.endswith(".jsonl") and k.rsplit("/", 1)[-1].startswith("environment_") and s > 0 ] latest = _pick_latest_by_index(files, index_regex=_ENV_SEGMENT_INDEX_RE) if latest is None: return None latest_key, latest_size = latest read_start = max(0, latest_size - CSV_TAIL_BYTES) tail = store.get_range(latest_key, start=read_start, length=latest_size - read_start) text = tail.decode("utf-8", errors="ignore") is_complete_eof = text.endswith("\n") lines = text.splitlines() if read_start > 0 and lines: lines = lines[1:] if not is_complete_eof and lines: lines = lines[:-1] latest_ns: int | None = None for line in lines: if not line.strip(): continue # Skip records that don't parse or lack a numeric timestamp — a # partial/garbled line downgrades only itself, not the recording. try: ts_ns = int(json.loads(line)["timestamp"]) except (ValueError, KeyError, TypeError, json.JSONDecodeError): continue if latest_ns is None or ts_ns > latest_ns: latest_ns = ts_ns if latest_ns is None: return None return datetime.fromtimestamp(latest_ns / 1_000_000_000, tz=timezone.utc) _MIC_FILENAME_RE = re.compile(r"^mic_\d+_([\d.]+)\.wav$") def _parse_wav_duration_seconds(header: bytes) -> float | None: """Compute the duration of a standard PCM WAV given its first ~256 bytes — enough to traverse any ``fmt`` / ``LIST`` / ``fact`` subchunks and reach the ``data`` subchunk header. Returns ``None`` if the bytes don't form a valid RIFF / WAVE header, or if the format isn't standard PCM (non-multiple-of-8 ``bits_per_sample`` triggers an early-return; ADPCM and similar yield a ``bytes_per_sample`` of 0 downstream and would ``ZeroDivisionError`` otherwise). """ if len(header) < 44 or header[:4] != b"RIFF" or header[8:12] != b"WAVE": return None # Walk subchunks starting at offset 12. offset = 12 sample_rate: int | None = None channels: int | None = None bits_per_sample: int | None = None while offset + 8 <= len(header): chunk_id = header[offset : offset + 4] (chunk_size,) = struct.unpack("<I", header[offset + 4 : offset + 8]) body_offset = offset + 8 if chunk_id == b"fmt ": if body_offset + 16 > len(header): return None ( _audio_format, channels, sample_rate, _byte_rate, _block_align, bits_per_sample, ) = struct.unpack("<HHIIHH", header[body_offset : body_offset + 16]) elif chunk_id == b"data": if not (sample_rate and channels and bits_per_sample): return None if bits_per_sample < 8 or bits_per_sample % 8 != 0: return None bytes_per_sample = bits_per_sample // 8 total_samples = chunk_size // (bytes_per_sample * channels) return float(total_samples) / float(sample_rate) # RIFF requires chunks be word-aligned: pad the offset by one # byte when ``chunk_size`` is odd. Data-engine emits only # ``fmt`` + ``data`` (both even-sized), but a hand-edited WAV # with a 5-byte LIST/INFO chunk would otherwise mis-locate the # ``data`` chunk. offset = body_offset + chunk_size + (chunk_size & 1) return None _MIC_SEGMENT_INDEX_RE = re.compile(r"^mic_(\d+)_") def _mic_end_time(store: ObjectStore, rec_prefix: str, worker_dir: str) -> datetime | None: """Last mic frame timestamp for one ``mic_*`` worker, or ``None`` if no readable WAV is present.""" files = [ (k, s) for (k, s) in _list_keys_under(store, f"{rec_prefix}{worker_dir}/") if k.endswith(".wav") and s > 44 ] latest = _pick_latest_by_index(files, index_regex=_MIC_SEGMENT_INDEX_RE) if latest is None: return None latest_key, _latest_size = latest basename = latest_key.rsplit("/", 1)[-1] m = _MIC_FILENAME_RE.match(basename) if not m: return None try: start_ts = float(m.group(1)) except ValueError: return None header = store.get_range(latest_key, start=0, length=WAV_HEADER_BYTES) duration_s = _parse_wav_duration_seconds(header) if duration_s is None: return None return datetime.fromtimestamp(start_ts + duration_s, tz=timezone.utc) def _parse_stopped_at_utc(text: str) -> datetime | None: """Parse the ``stopped_at_utc`` field from a ``worker_report*.json`` blob. Returns ``None`` only for the *absent-signal* shapes (top-level value is not a JSON object, ``stopped_at_utc`` key missing, or its value is not a string). **Parse failures propagate** — a malformed JSON body raises :class:`json.JSONDecodeError`, and an unparseable timestamp string raises :class:`ValueError` from :func:`datetime.strptime`. This matches the module's "absent-signal vs. failed-parse" contract: callers (currently :func:`_worker_report_end_time`) catch these exceptions per-file and surface them in :attr:`ModalitySignals.errors` rather than silently treating a corrupt file as "no signal here". The on-disk format from ``data-engine/common/base_worker.py`` writes ``stopped_at_utc`` as an ISO-8601 timestamp with an explicit timezone offset (e.g. ``"2026-05-04T01:05:43-0800"``). The output is normalised to UTC. """ data = json.loads(text) if not isinstance(data, dict): return None ts_str = data.get("stopped_at_utc") if not isinstance(ts_str, str): return None dt = datetime.strptime(ts_str, "%Y-%m-%dT%H:%M:%S%z") return dt.astimezone(timezone.utc) def _worker_report_end_time( store: ObjectStore, rec_prefix: str, worker_dir: str, *, errors: list[str], ) -> datetime | None: """Latest ``stopped_at_utc`` from any ``worker_report*.json`` file in one worker dir, or ``None`` if no such file is present. Universal across worker types: every modality writes a ``worker_report*.json`` on clean shutdown carrying an explicit ``stopped_at_utc`` field. When the modality-specific probe (EEG / camera / mic / screen / samsungwatch / environment) has no usable signal — or the worker type has no data-stream probe (notes, pupillabs, location, keyboard, mouse, battery, etc.) — worker_report provides a fallback session-end signal. Per-file exception guard: a single corrupt or transiently-missing ``worker_report*.json`` (R2 5xx, eventual-consistency ``ObjectNotFound`` between list and get, malformed JSON, unparseable timestamp) is recorded in ``errors`` and skipped, preserving any ``latest`` already accumulated from earlier files in this worker dir. Mirrors the worker-dir-level conservative-by-design pattern in :func:`derive_recording_end_time`. """ files = [ (k, s) for (k, s) in _list_keys_under(store, f"{rec_prefix}{worker_dir}/") if is_worker_report_basename(k.rsplit("/", 1)[-1]) and s > 0 ] latest: datetime | None = None for key, _size in files: basename = key.rsplit("/", 1)[-1] try: body = store.get(key) text = body.decode("utf-8", errors="ignore") dt = _parse_stopped_at_utc(text) except Exception as exc: errors.append(f"{worker_dir}/{basename}: {exc!r}") continue if dt is not None and (latest is None or dt > latest): latest = dt return latest
[docs] def derive_recording_end_time(store: ObjectStore, rec_id: str) -> ModalitySignals: """Probe every modality worker under ``recordings/<rec_id>/`` and return per-modality end-time candidates. :func:`ModalitySignals.max_end` collapses them into a single ``ended_at`` value. A single broken segment file degrades only its own worker — the error is recorded in ``ModalitySignals.errors`` and the walk continues. Callers should log ``errors`` regardless of the ``max_end()`` outcome so future parser regressions don't hide behind a successful sibling modality. """ sig = ModalitySignals() rec_prefix = f"recordings/{rec_id}/" # ``list_prefixes`` returns an empty iterator (not an exception) # when the prefix has been moved or cleaned up — distinguish that # from "prefix exists but has no parseable modality files" so # operators see the right diagnostic. worker_dirs = sorted( p.removeprefix(rec_prefix).rstrip("/") for p in store.list_prefixes(rec_prefix) ) if not worker_dirs: sig.errors.append(f"no prefix at {rec_prefix}") return sig for worker in worker_dirs: try: if worker.startswith("eeg_"): ts = _eeg_end_time(store, rec_prefix, worker) if ts is not None and (sig.eeg is None or ts > sig.eeg): sig.eeg = ts elif worker.startswith("camera_"): ts = _camera_end_time(store, rec_prefix, worker) if ts is not None and (sig.camera is None or ts > sig.camera): sig.camera = ts elif worker.startswith("mic_"): ts = _mic_end_time(store, rec_prefix, worker) if ts is not None and (sig.mic is None or ts > sig.mic): sig.mic = ts elif worker.startswith("screen_"): ts = _screen_end_time(store, rec_prefix, worker) if ts is not None and (sig.screen is None or ts > sig.screen): sig.screen = ts elif worker.startswith("samsungwatch_"): ts = _samsungwatch_end_time(store, rec_prefix, worker) if ts is not None and (sig.samsungwatch is None or ts > sig.samsungwatch): sig.samsungwatch = ts elif worker.startswith("environment_"): ts = _environment_end_time(store, rec_prefix, worker) if ts is not None and (sig.environment is None or ts > sig.environment): sig.environment = ts # Universal fallback: probe ``worker_report*.json`` in every # worker dir (including ones already covered by a # modality-specific probe above). A clean-shutdown # worker_report still yields a valid signal even when the # modality-specific file is missing or corrupt — and worker # types without a dedicated probe (notes, pupillabs, location, # keyboard, mouse, battery) depend on this fallback entirely. # Per-file parse failures are recorded in ``sig.errors`` rather # than being silently interpreted as "no signal". ts = _worker_report_end_time(store, rec_prefix, worker, errors=sig.errors) if ts is not None and (sig.worker_report is None or ts > sig.worker_report): sig.worker_report = ts except Exception as exc: # pragma: no cover — defensive sig.errors.append(f"{worker}: {exc!r}") return sig