"""Derive a recording's ``ended_at`` from per-modality segment metadata.
When the catalog's ``RecordingRow.duration`` is wrong or missing (e.g.
the 198 legacy rows whose manifests were reconstructed with
``ended_at = max(R2 last_modified)`` = upload time, not session end),
this module re-derives the true session end from the segment files the
rigs wrote at recording time.
Several signal sources are supported, split into **data-stream** probes
(read from actual recorded sample timestamps) and a universal
**worker-report** fallback:
- **EEG** (``eeg_*/eeg_*_timestamps.bin``): per-chunk
``(int64 offset, float64 unix_ts)`` pairs packed by
``data-engine/eeg/recorder.py``. The last 16-byte struct gives the
timestamp of the most recent data chunk written before the recorder
stopped.
- **Camera** (``camera_*/camera_timestamps_NNNN_<first_frame_ts>.csv``
OR ``camera_*/segment_NNNN_<first_frame_ts>.csv``): per-frame
``frame, pts_ns, wall_clock, epoch_ns`` rows written by
``data-engine/camera/timestamps.py``. The last row's ``epoch_ns``
gives the most recent frame's wall-clock. Two filename conventions
are accepted because the legacy rig writers emit ``segment_*.csv``
while newer writers emit ``camera_timestamps_*.csv`` — same column
shape. The filter still requires one of those two prefixes (not
just ``.csv``) so a stray debug dump can't be lex-max-picked and
silently downgrade the recording.
- **Microphone** (``mic_*/mic_NNNN_<unix_ts>.wav``): the filename
encodes the first-frame unix timestamp; the RIFF ``data`` chunk size
+ sample rate + channels + bits-per-sample give the duration.
End = start + duration.
- **Screen** (``screen_*/screen_timestamps_NNNN_<first_frame_ts>.csv``):
identical ``frame, pts_ns, wall_clock, epoch_ns`` columns as camera —
the last row's ``epoch_ns`` is the most recent captured frame.
- **Samsung watch** (``samsungwatch_*/<sensor>_NNNN_<start_ts>.csv`` for
imu / ppg / eda / heart_rate / battery): each sensor stream's first
column is a nanosecond ``timestamp``; the max last-row timestamp
across sensors is the last sample received.
- **Environment** (``environment_*/environment_NNNN_<start_ts>.jsonl``):
newline-delimited ``{"timestamp": <ns>, "type": ...}`` poll records;
the max ``timestamp`` is the last poll.
- **Worker report** (``<worker>/worker_report*.json``): a universal
fallback across every worker type (notes, pupillabs, location,
keyboard, mouse, battery, etc.). Each worker writes a report on clean
shutdown carrying an explicit ``stopped_at_utc`` ISO-8601 timestamp.
Probed for every worker dir alongside the data-stream probe, so a
recording whose ``eeg_*_timestamps.bin`` is 0-bytes (failed recorder)
can still derive ``ended_at`` from a sibling worker that shut down
cleanly.
:func:`derive_recording_end_time` returns a :class:`ModalitySignals`
holding the per-source candidates; the recording's true ``ended_at``
is derived via :meth:`ModalitySignals.max_end`, which prefers the
latest **data-stream** end (eeg / camera / mic / screen / samsungwatch /
environment) when any such signal exists, falling back to
``worker_report`` only when none fired (recordings made up solely of
worker types that still lack a sample-timestamp probe).
Conservative-by-design: parsers return ``None`` when no parseable
signal exists (no ``_timestamps.bin`` present, no
``camera_timestamps_``/``segment_`` CSV, no valid WAV header, no
``worker_report*.json`` with a parseable ``stopped_at_utc`` — the
*absent-signal* case). Exceptions raised by read or parse failures are
caught by :func:`derive_recording_end_time` and appended to
:attr:`ModalitySignals.errors` — the *failed-parse* case.
**An empty ``errors`` list with a ``None`` end-time means "no signal,"
not "no failure."** A single broken segment file downgrades only its own
recording to ``indeterminate`` rather than aborting a multi-recording
rebuild.
"""
from __future__ import annotations
import csv
import json
import re
import struct
from dataclasses import dataclass, field
from datetime import datetime, timezone
from ursa.layout import is_worker_report_basename
from ursa.store import ObjectStore
__all__ = [
"CSV_TAIL_BYTES",
"WAV_HEADER_BYTES",
"ModalitySignals",
"derive_recording_end_time",
]
# Tail size for CSV/JSONL data-stream reads (camera / screen / samsungwatch
# / environment). Each row is ~70 bytes; 8 KiB covers ~100 rows comfortably.
# We only need the last row(s).
CSV_TAIL_BYTES = 8192
# WAV header is fixed-size for standard PCM (44 bytes for the canonical
# layout); reading 256 bytes is enough to traverse any non-PCM extension
# subchunks and locate the ``data`` subchunk size field.
WAV_HEADER_BYTES = 256
[docs]
@dataclass
class ModalitySignals:
"""End-time candidates per signal source for one recording.
Populated by :func:`derive_recording_end_time`. Use
:meth:`max_end` to collapse the modality candidates into a single
``ended_at`` value; ``None`` indicates no signal source produced a
parseable result (the row is *indeterminate*).
"""
eeg: datetime | None = None
camera: datetime | None = None
mic: datetime | None = None
#: Last per-sample timestamp from a ``screen_*`` worker's
#: ``screen_timestamps_*.csv`` — identical ``(frame, pts_ns,
#: wall_clock, epoch_ns)`` shape as camera.
screen: datetime | None = None
#: Last per-sample timestamp across a ``samsungwatch_*`` worker's
#: sensor CSVs (imu / ppg / eda / heart_rate / battery), each with a
#: nanosecond ``timestamp`` first column.
samsungwatch: datetime | None = None
#: Last poll timestamp from an ``environment_*`` worker's
#: ``environment_*.jsonl`` (``{"timestamp": <ns>, ...}`` records).
environment: datetime | None = None
#: Latest ``stopped_at_utc`` across any ``worker_report*.json`` in
#: any worker dir under this recording. Universal across worker
#: types — populated whenever at least one worker shut down cleanly
#: enough to write a report, regardless of whether the data-stream
#: probes also fired.
worker_report: datetime | None = None
#: Per-worker error strings recorded when a parser raised. A
#: non-empty list does NOT necessarily mean :meth:`max_end` is
#: ``None`` — a single worker can fail while another succeeds; the
#: caller should log these regardless of the ``max_end()`` outcome.
errors: list[str] = field(default_factory=list)
[docs]
def max_end(self) -> datetime | None:
"""Best ``ended_at`` estimate, or ``None`` if no signal produced a
candidate (the row is *indeterminate*).
Prefers the latest **data-stream** end (``eeg`` / ``camera`` / ``mic`` /
``screen`` / ``samsungwatch`` / ``environment``), each read from actual
recorded sample timestamps. ``worker_report`` (``stopped_at_utc``) is
wall-clock at *worker process teardown*, not the last sample — workers
routinely linger hours (even into the next day) past the last data, so
including it in the max inflated durations (e.g. a 5h session reported as
30h, or short single-modality sessions reported as a uniform ~8h once the
recorder's idle timeout fires). It is therefore used **only as a
fallback** when no data-stream signal exists — i.e. for worker types
that still lack a sample-timestamp probe (notes / pupillabs / keyboard /
mouse / location / battery).
"""
stream = [
t
for t in (
self.eeg,
self.camera,
self.mic,
self.screen,
self.samsungwatch,
self.environment,
)
if t is not None
]
if stream:
return max(stream)
return self.worker_report
def _list_keys_under(store: ObjectStore, prefix: str) -> list[tuple[str, int]]:
"""Return ``(key, size)`` for every object directly or transitively
under ``prefix``. Modality-specific filename filtering happens in
the per-modality ``_*_end_time`` helpers."""
return [(meta.key, meta.size) for meta in store.list(prefix=prefix)]
def _pick_latest_by_index(
files: list[tuple[str, int]],
*,
index_regex: re.Pattern[str],
) -> tuple[str, int] | None:
"""Pick the ``(key, size)`` whose basename has the largest numeric
segment index per ``index_regex.search(basename).group(1)``.
Falls back to lexicographic max for any file whose basename doesn't
match the regex (sentinel index ``-1``), so a stray-named file can't
out-rank a real segment. Returns ``None`` if ``files`` is empty.
Rationale (F1, 2026-05-20 review): data-engine recorders pad
segment indices to four digits, so once a recording crosses 10000
segments, ``eeg_10000_…`` lex-sorts *before* ``eeg_9999_…`` and a
naive ``max(files)`` returns an earlier segment. Long-running
pilot rigs or sleep studies can plausibly cross 10000 EEG segments.
"""
if not files:
return None
def _key(kv: tuple[str, int]) -> tuple[int, str]:
basename = kv[0].rsplit("/", 1)[-1]
m = index_regex.search(basename)
if m is None:
return (-1, kv[0])
try:
return (int(m.group(1)), kv[0])
except (ValueError, IndexError):
return (-1, kv[0])
return max(files, key=_key)
_EEG_SEGMENT_INDEX_RE = re.compile(r"^eeg_(\d+)_")
def _eeg_end_time(store: ObjectStore, rec_prefix: str, worker_dir: str) -> datetime | None:
"""Last EEG timestamp for one ``eeg_*`` worker, or ``None`` if no
``*_timestamps.bin`` file is present for this worker.
Does not catch read or ``struct.unpack`` errors — those propagate up
to :func:`derive_recording_end_time`'s outer ``except Exception`` so
the per-row ``errors`` list captures the actual failure mode.
"""
files = [
(k, s)
for (k, s) in _list_keys_under(store, f"{rec_prefix}{worker_dir}/")
if k.endswith("_timestamps.bin") and s >= 16
]
latest = _pick_latest_by_index(files, index_regex=_EEG_SEGMENT_INDEX_RE)
if latest is None:
return None
latest_key, latest_size = latest
tail = store.get_range(latest_key, start=latest_size - 16, length=16)
_offset, unix_ts = struct.unpack("<qd", tail)
return datetime.fromtimestamp(unix_ts, tz=timezone.utc)
def _read_last_csv_row(store: ObjectStore, key: str, size: int) -> list[str] | None:
"""Return the last *complete* data row of a CSV as a field list, or
``None`` if the tail holds no usable row.
Shared by every CSV-tailing probe (camera / screen / samsungwatch).
Reads only the trailing :data:`CSV_TAIL_BYTES` and applies these
truncation guards:
* Strip the leading partial line when the read started mid-file
(``read_start > 0``).
* Drop the final line when the buffer doesn't end in ``\\n`` — a rig
crashing mid-flush leaves a truncated final row (e.g. an
``epoch_ns`` missing its nanosecond suffix that parses to ~1970);
trust the previous complete row instead.
* Skip blank and ``#``-comment lines.
The CSV *header* is not filtered here — callers validate the row
shape (e.g. a numeric first column) and treat a surviving header as
"no signal", so a header-only file yields ``None`` downstream.
"""
read_start = max(0, size - CSV_TAIL_BYTES)
tail = store.get_range(key, start=read_start, length=size - read_start)
text = tail.decode("utf-8", errors="ignore")
is_complete_eof = text.endswith("\n")
lines = text.splitlines()
if read_start > 0 and lines:
lines = lines[1:]
if not is_complete_eof and lines:
lines = lines[:-1]
candidates = [ln for ln in lines if ln.strip() and not ln.lstrip().startswith("#")]
if not candidates:
return None
return next(csv.reader([candidates[-1]]))
def _frame_csv_end_time(
store: ObjectStore,
rec_prefix: str,
worker_dir: str,
*,
basename_prefixes: tuple[str, ...],
index_regex: re.Pattern[str],
) -> datetime | None:
"""Last frame timestamp from a ``(frame, pts_ns, wall_clock, epoch_ns)``
CSV stream, or ``None`` if no readable timestamps CSV is present.
The data-engine camera and screen recorders write byte-identical
column shapes (``epoch_ns`` in column 3); this helper backs both
:func:`_camera_end_time` and :func:`_screen_end_time`. ``basename_prefixes``
restricts which ``.csv`` files count as a timestamps stream — accepting
any ``.csv`` would let a stray debug dump that lex-sorts after the real
file get picked and silently downgrade the recording to indeterminate.
"""
files = [
(k, s)
for (k, s) in _list_keys_under(store, f"{rec_prefix}{worker_dir}/")
if k.endswith(".csv") and k.rsplit("/", 1)[-1].startswith(basename_prefixes) and s > 0
]
latest = _pick_latest_by_index(files, index_regex=index_regex)
if latest is None:
return None
last_row = _read_last_csv_row(store, *latest)
if last_row is None or len(last_row) < 4:
return None
# Verify the row looks like a real data row (numeric frame index)
# before trusting column 3; return ``None`` for any non-numeric row
# (e.g. a surviving header) so a single schema-drift artifact
# downgrades only its own recording to ``indeterminate`` rather than
# aborting the whole backfill.
try:
int(last_row[0])
epoch_ns = int(last_row[3])
except (ValueError, IndexError):
return None
return datetime.fromtimestamp(epoch_ns / 1_000_000_000, tz=timezone.utc)
_CAMERA_SEGMENT_INDEX_RE = re.compile(r"^(?:camera_timestamps|segment)_(\d+)_")
#: Basename prefixes accepted by :func:`_camera_end_time`. Both shapes
#: emit identical ``(frame, pts_ns, wall_clock, epoch_ns)`` columns —
#: ``segment_*.csv`` is the legacy rig-writer convention (pre-2026-Q2),
#: still present in the v0.1.0-backfill cohort under R2;
#: ``camera_timestamps_*.csv`` is the post-pipeline-rewrite shape.
_CAMERA_CSV_BASENAME_PREFIXES = ("camera_timestamps_", "segment_")
def _camera_end_time(store: ObjectStore, rec_prefix: str, worker_dir: str) -> datetime | None:
"""Last camera frame timestamp for one ``camera_*`` worker, or
``None`` if no readable timestamps CSV is present.
Reads ``camera_timestamps_{idx:04d}_{first_frame_ts}.csv`` (written
by ``data-engine/camera/pipeline.py``) or the legacy
``segment_{idx:04d}_{first_frame_ts}.csv`` shape from older rig
writers (same column layout). See :func:`_frame_csv_end_time`.
"""
return _frame_csv_end_time(
store,
rec_prefix,
worker_dir,
basename_prefixes=_CAMERA_CSV_BASENAME_PREFIXES,
index_regex=_CAMERA_SEGMENT_INDEX_RE,
)
_SCREEN_SEGMENT_INDEX_RE = re.compile(r"^screen_timestamps_(\d+)_")
#: Basename prefix for the screen recorder's per-frame timestamps CSV
#: (``screen_timestamps_{idx:04d}_{first_frame_ts}.csv``). Same
#: ``(frame, pts_ns, wall_clock, epoch_ns)`` columns as camera.
_SCREEN_CSV_BASENAME_PREFIXES = ("screen_timestamps_",)
def _screen_end_time(store: ObjectStore, rec_prefix: str, worker_dir: str) -> datetime | None:
"""Last screen-capture frame timestamp for one ``screen_*`` worker,
or ``None`` if no readable timestamps CSV is present.
The screen recorder emits the identical column shape as camera, so
this reuses :func:`_frame_csv_end_time`. Recovering the real last
frame here (vs. falling back to ``worker_report``) is what stops a
short screen session whose worker lingered to its idle timeout from
being recorded as a ~8h recording (ENG-1295 modality-only caveat).
"""
return _frame_csv_end_time(
store,
rec_prefix,
worker_dir,
basename_prefixes=_SCREEN_CSV_BASENAME_PREFIXES,
index_regex=_SCREEN_SEGMENT_INDEX_RE,
)
#: Matches a data-engine segment basename ``<sensor>_{idx}_{start_ts}.csv``
#: and captures the segment index — used to pick the latest segment per
#: samsungwatch sensor stream. ``<sensor>`` may contain underscores
#: (``heart_rate``), so the index is anchored as the second-to-last
#: ``_``-delimited numeric field before ``.csv``.
_WATCH_SEGMENT_INDEX_RE = re.compile(r"_(\d+)_\d+\.csv$")
_WATCH_CSV_RE = re.compile(r"^(?P<sensor>.+?)_(?P<idx>\d+)_(?P<start>\d+)\.csv$")
def _samsungwatch_end_time(store: ObjectStore, rec_prefix: str, worker_dir: str) -> datetime | None:
"""Latest per-sample timestamp across a ``samsungwatch_*`` worker's
sensor CSVs (imu / ppg / eda / heart_rate / battery), or ``None`` if
none carry a parseable timestamp.
Each watch sensor stream is a separate
``<sensor>_{idx}_{start_ts}.csv`` whose first column is a nanosecond
``timestamp``. We take the latest segment *per sensor*, read its last
complete row, and return the max timestamp across sensors — the
sensors stop emitting at the real session end even though the worker
process lingers to its idle timeout (ENG-1295 modality-only caveat).
"""
by_sensor: dict[str, list[tuple[str, int]]] = {}
for k, s in _list_keys_under(store, f"{rec_prefix}{worker_dir}/"):
if s <= 0 or not k.endswith(".csv"):
continue
m = _WATCH_CSV_RE.match(k.rsplit("/", 1)[-1])
if m is None:
continue
by_sensor.setdefault(m.group("sensor"), []).append((k, s))
latest_ns: int | None = None
for files in by_sensor.values():
latest = _pick_latest_by_index(files, index_regex=_WATCH_SEGMENT_INDEX_RE)
if latest is None:
continue
last_row = _read_last_csv_row(store, *latest)
if not last_row:
continue
try:
ts_ns = int(last_row[0])
except (ValueError, IndexError):
continue
if latest_ns is None or ts_ns > latest_ns:
latest_ns = ts_ns
if latest_ns is None:
return None
return datetime.fromtimestamp(latest_ns / 1_000_000_000, tz=timezone.utc)
_ENV_SEGMENT_INDEX_RE = re.compile(r"^environment_(\d+)_")
def _environment_end_time(store: ObjectStore, rec_prefix: str, worker_dir: str) -> datetime | None:
"""Last poll timestamp from an ``environment_*`` worker's
``environment_{idx}_{start_ts}.jsonl``, or ``None`` if no readable
record carries a ``timestamp``.
Each line is a JSON object ``{"timestamp": <ns>, "type": ..., ...}``;
one poll emits several records (location / weather / aqi / alerts)
sharing a timestamp. We scan the tail of the latest segment and take
the max ``timestamp``. Like the other data-stream probes this
recovers the real last poll instead of the worker's lingering
``stopped_at_utc`` (ENG-1295 modality-only caveat).
"""
files = [
(k, s)
for (k, s) in _list_keys_under(store, f"{rec_prefix}{worker_dir}/")
if k.endswith(".jsonl") and k.rsplit("/", 1)[-1].startswith("environment_") and s > 0
]
latest = _pick_latest_by_index(files, index_regex=_ENV_SEGMENT_INDEX_RE)
if latest is None:
return None
latest_key, latest_size = latest
read_start = max(0, latest_size - CSV_TAIL_BYTES)
tail = store.get_range(latest_key, start=read_start, length=latest_size - read_start)
text = tail.decode("utf-8", errors="ignore")
is_complete_eof = text.endswith("\n")
lines = text.splitlines()
if read_start > 0 and lines:
lines = lines[1:]
if not is_complete_eof and lines:
lines = lines[:-1]
latest_ns: int | None = None
for line in lines:
if not line.strip():
continue
# Skip records that don't parse or lack a numeric timestamp — a
# partial/garbled line downgrades only itself, not the recording.
try:
ts_ns = int(json.loads(line)["timestamp"])
except (ValueError, KeyError, TypeError, json.JSONDecodeError):
continue
if latest_ns is None or ts_ns > latest_ns:
latest_ns = ts_ns
if latest_ns is None:
return None
return datetime.fromtimestamp(latest_ns / 1_000_000_000, tz=timezone.utc)
_MIC_FILENAME_RE = re.compile(r"^mic_\d+_([\d.]+)\.wav$")
def _parse_wav_duration_seconds(header: bytes) -> float | None:
"""Compute the duration of a standard PCM WAV given its first ~256
bytes — enough to traverse any ``fmt`` / ``LIST`` / ``fact``
subchunks and reach the ``data`` subchunk header.
Returns ``None`` if the bytes don't form a valid RIFF / WAVE header,
or if the format isn't standard PCM (non-multiple-of-8
``bits_per_sample`` triggers an early-return; ADPCM and similar
yield a ``bytes_per_sample`` of 0 downstream and would
``ZeroDivisionError`` otherwise).
"""
if len(header) < 44 or header[:4] != b"RIFF" or header[8:12] != b"WAVE":
return None
# Walk subchunks starting at offset 12.
offset = 12
sample_rate: int | None = None
channels: int | None = None
bits_per_sample: int | None = None
while offset + 8 <= len(header):
chunk_id = header[offset : offset + 4]
(chunk_size,) = struct.unpack("<I", header[offset + 4 : offset + 8])
body_offset = offset + 8
if chunk_id == b"fmt ":
if body_offset + 16 > len(header):
return None
(
_audio_format,
channels,
sample_rate,
_byte_rate,
_block_align,
bits_per_sample,
) = struct.unpack("<HHIIHH", header[body_offset : body_offset + 16])
elif chunk_id == b"data":
if not (sample_rate and channels and bits_per_sample):
return None
if bits_per_sample < 8 or bits_per_sample % 8 != 0:
return None
bytes_per_sample = bits_per_sample // 8
total_samples = chunk_size // (bytes_per_sample * channels)
return float(total_samples) / float(sample_rate)
# RIFF requires chunks be word-aligned: pad the offset by one
# byte when ``chunk_size`` is odd. Data-engine emits only
# ``fmt`` + ``data`` (both even-sized), but a hand-edited WAV
# with a 5-byte LIST/INFO chunk would otherwise mis-locate the
# ``data`` chunk.
offset = body_offset + chunk_size + (chunk_size & 1)
return None
_MIC_SEGMENT_INDEX_RE = re.compile(r"^mic_(\d+)_")
def _mic_end_time(store: ObjectStore, rec_prefix: str, worker_dir: str) -> datetime | None:
"""Last mic frame timestamp for one ``mic_*`` worker, or ``None``
if no readable WAV is present."""
files = [
(k, s)
for (k, s) in _list_keys_under(store, f"{rec_prefix}{worker_dir}/")
if k.endswith(".wav") and s > 44
]
latest = _pick_latest_by_index(files, index_regex=_MIC_SEGMENT_INDEX_RE)
if latest is None:
return None
latest_key, _latest_size = latest
basename = latest_key.rsplit("/", 1)[-1]
m = _MIC_FILENAME_RE.match(basename)
if not m:
return None
try:
start_ts = float(m.group(1))
except ValueError:
return None
header = store.get_range(latest_key, start=0, length=WAV_HEADER_BYTES)
duration_s = _parse_wav_duration_seconds(header)
if duration_s is None:
return None
return datetime.fromtimestamp(start_ts + duration_s, tz=timezone.utc)
def _parse_stopped_at_utc(text: str) -> datetime | None:
"""Parse the ``stopped_at_utc`` field from a ``worker_report*.json``
blob.
Returns ``None`` only for the *absent-signal* shapes (top-level
value is not a JSON object, ``stopped_at_utc`` key missing, or its
value is not a string). **Parse failures propagate** — a malformed
JSON body raises :class:`json.JSONDecodeError`, and an unparseable
timestamp string raises :class:`ValueError` from
:func:`datetime.strptime`. This matches the module's "absent-signal
vs. failed-parse" contract: callers (currently
:func:`_worker_report_end_time`) catch these exceptions per-file and
surface them in :attr:`ModalitySignals.errors` rather than silently
treating a corrupt file as "no signal here".
The on-disk format from ``data-engine/common/base_worker.py`` writes
``stopped_at_utc`` as an ISO-8601 timestamp with an explicit
timezone offset (e.g. ``"2026-05-04T01:05:43-0800"``). The output
is normalised to UTC.
"""
data = json.loads(text)
if not isinstance(data, dict):
return None
ts_str = data.get("stopped_at_utc")
if not isinstance(ts_str, str):
return None
dt = datetime.strptime(ts_str, "%Y-%m-%dT%H:%M:%S%z")
return dt.astimezone(timezone.utc)
def _worker_report_end_time(
store: ObjectStore,
rec_prefix: str,
worker_dir: str,
*,
errors: list[str],
) -> datetime | None:
"""Latest ``stopped_at_utc`` from any ``worker_report*.json`` file
in one worker dir, or ``None`` if no such file is present.
Universal across worker types: every modality writes a
``worker_report*.json`` on clean shutdown carrying an explicit
``stopped_at_utc`` field. When the modality-specific probe
(EEG / camera / mic / screen / samsungwatch / environment) has no
usable signal — or the worker type has no data-stream probe (notes,
pupillabs, location, keyboard, mouse, battery, etc.) — worker_report
provides a fallback session-end signal.
Per-file exception guard: a single corrupt or transiently-missing
``worker_report*.json`` (R2 5xx, eventual-consistency
``ObjectNotFound`` between list and get, malformed JSON, unparseable
timestamp) is recorded in ``errors`` and skipped, preserving any
``latest`` already accumulated from earlier files in this worker
dir. Mirrors the worker-dir-level conservative-by-design pattern in
:func:`derive_recording_end_time`.
"""
files = [
(k, s)
for (k, s) in _list_keys_under(store, f"{rec_prefix}{worker_dir}/")
if is_worker_report_basename(k.rsplit("/", 1)[-1]) and s > 0
]
latest: datetime | None = None
for key, _size in files:
basename = key.rsplit("/", 1)[-1]
try:
body = store.get(key)
text = body.decode("utf-8", errors="ignore")
dt = _parse_stopped_at_utc(text)
except Exception as exc:
errors.append(f"{worker_dir}/{basename}: {exc!r}")
continue
if dt is not None and (latest is None or dt > latest):
latest = dt
return latest
[docs]
def derive_recording_end_time(store: ObjectStore, rec_id: str) -> ModalitySignals:
"""Probe every modality worker under ``recordings/<rec_id>/`` and
return per-modality end-time candidates.
:func:`ModalitySignals.max_end` collapses them into a single
``ended_at`` value. A single broken segment file degrades only its
own worker — the error is recorded in ``ModalitySignals.errors``
and the walk continues. Callers should log ``errors`` regardless
of the ``max_end()`` outcome so future parser regressions don't
hide behind a successful sibling modality.
"""
sig = ModalitySignals()
rec_prefix = f"recordings/{rec_id}/"
# ``list_prefixes`` returns an empty iterator (not an exception)
# when the prefix has been moved or cleaned up — distinguish that
# from "prefix exists but has no parseable modality files" so
# operators see the right diagnostic.
worker_dirs = sorted(
p.removeprefix(rec_prefix).rstrip("/") for p in store.list_prefixes(rec_prefix)
)
if not worker_dirs:
sig.errors.append(f"no prefix at {rec_prefix}")
return sig
for worker in worker_dirs:
try:
if worker.startswith("eeg_"):
ts = _eeg_end_time(store, rec_prefix, worker)
if ts is not None and (sig.eeg is None or ts > sig.eeg):
sig.eeg = ts
elif worker.startswith("camera_"):
ts = _camera_end_time(store, rec_prefix, worker)
if ts is not None and (sig.camera is None or ts > sig.camera):
sig.camera = ts
elif worker.startswith("mic_"):
ts = _mic_end_time(store, rec_prefix, worker)
if ts is not None and (sig.mic is None or ts > sig.mic):
sig.mic = ts
elif worker.startswith("screen_"):
ts = _screen_end_time(store, rec_prefix, worker)
if ts is not None and (sig.screen is None or ts > sig.screen):
sig.screen = ts
elif worker.startswith("samsungwatch_"):
ts = _samsungwatch_end_time(store, rec_prefix, worker)
if ts is not None and (sig.samsungwatch is None or ts > sig.samsungwatch):
sig.samsungwatch = ts
elif worker.startswith("environment_"):
ts = _environment_end_time(store, rec_prefix, worker)
if ts is not None and (sig.environment is None or ts > sig.environment):
sig.environment = ts
# Universal fallback: probe ``worker_report*.json`` in every
# worker dir (including ones already covered by a
# modality-specific probe above). A clean-shutdown
# worker_report still yields a valid signal even when the
# modality-specific file is missing or corrupt — and worker
# types without a dedicated probe (notes, pupillabs, location,
# keyboard, mouse, battery) depend on this fallback entirely.
# Per-file parse failures are recorded in ``sig.errors`` rather
# than being silently interpreted as "no signal".
ts = _worker_report_end_time(store, rec_prefix, worker, errors=sig.errors)
if ts is not None and (sig.worker_report is None or ts > sig.worker_report):
sig.worker_report = ts
except Exception as exc: # pragma: no cover — defensive
sig.errors.append(f"{worker}: {exc!r}")
return sig