"""Read-only ``ursa.status.*`` namespace — catalog observability.
The per-package ``*.status.*`` observability convention: a read-only namespace
returning Pydantic models with ``to_dataframe()`` / ``to_markdown()``. This is the
**single per-recording ingestion-status contract** consumed by Cepheus's ``/data``
page and the ``constellation-status-api`` HTTP wrapper — the recording-level
aggregation lives here so consumers don't each re-derive the precedence rules.
Built over the catalog's ingestion-lifecycle columns (``ingestion_status`` +
``ingestion_error`` / ``ingestion_attempts`` / ``last_attempt_at`` + the durable
``metadata["dead_letter"]`` flag). Read-only: no writes, no side effects.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from pydantic import BaseModel, ConfigDict
from ursa.catalog import IngestionStatus, ModalityRow
from ursa.data_interface import DataInterface
if TYPE_CHECKING:
import pandas as pd
from ursa._query_types import QueryResult
__all__ = [
"ActiveJobs",
"ModalityIngestion",
"RecentIngestions",
"RecordingIngestion",
"active_jobs",
"aggregate_recording_ingestion",
"recent_ingestions",
]
# Recording-level aggregate precedence: a recording reads as the most-urgent
# state across its modalities (most → least urgent).
_STATUS_RANK: dict[IngestionStatus, int] = {
IngestionStatus.FAILED: 3,
IngestionStatus.INGESTING: 2,
IngestionStatus.RAW: 1,
# PROCESSED and EXPANDED are both terminal-success at the same precedence
# (0). The aggregate fold below collapses a rank-0 max to PROCESSED, so an
# EXPANDED-bearing recording never surfaces an ``expanded`` aggregate status
# to Cepheus (which only knows the four pre-existing statuses).
IngestionStatus.PROCESSED: 0,
IngestionStatus.EXPANDED: 0,
}
# Exhaustiveness invariant: every IngestionStatus must have a rank, or the
# ``max(..., key=_STATUS_RANK[s])`` in aggregate_recording_ingestion KeyErrors on
# the first row carrying the un-ranked status. A unit test pins this so a future
# enum addition fails in CI, not in prod.
assert set(_STATUS_RANK) == set(IngestionStatus), (
"every IngestionStatus needs a _STATUS_RANK entry; "
f"missing: {set(IngestionStatus) - set(_STATUS_RANK)}"
)
[docs]
class ModalityIngestion(BaseModel):
"""One modality's ingestion state (a flattened view of its ``ModalityRow``)."""
model_config = ConfigDict(frozen=True)
modality: str
worker_id: str
status: IngestionStatus
error: str | None
attempts: int
last_attempt_at: int | None
[docs]
class RecordingIngestion(BaseModel):
"""Recording-level ingestion status, aggregated across all its modalities."""
model_config = ConfigDict(frozen=True)
recording_hash: str
#: Aggregate of the per-modality statuses (``failed`` > ``ingesting`` > ``raw`` > ``processed``).
status: IngestionStatus
#: True if any failed modality carries the durable ``metadata["dead_letter"]`` flag.
dead_letter: bool
#: First failed modality's ``ingestion_error`` — ``None`` if nothing failed,
#: or possibly ``""`` if that modality recorded an empty-string error.
error: str | None
started_at: int # recording start_time, epoch-ns
last_attempt_at: int | None # max across modalities (None if never attempted)
modalities: tuple[ModalityIngestion, ...]
[docs]
class RecentIngestions(BaseModel):
"""Result of :func:`recent_ingestions`. ``rows`` is the (limited) recent slice;
the ``n_*`` counts are over the full post-``since`` set (pre-limit)."""
model_config = ConfigDict(frozen=True)
rows: tuple[RecordingIngestion, ...]
n_processed: int
n_failed: int
n_ingesting: int
n_raw: int
[docs]
def to_dataframe(self) -> "pd.DataFrame":
import pandas as pd
return pd.DataFrame(
[
{
"recording_hash": r.recording_hash,
"status": r.status.value,
"dead_letter": r.dead_letter,
"error": r.error,
"started_at": r.started_at,
"last_attempt_at": r.last_attempt_at,
"n_modalities": len(r.modalities),
}
for r in self.rows
]
)
[docs]
def to_markdown(self) -> str:
lines = [
"| recording | status | dead_letter | last_attempt_at | error |",
"|---|---|---|---|---|",
]
for r in self.rows:
# `is None` vs "": a FAILED row with an empty-string error (a bare
# `raise`) is distinct from no error — render it visibly, and avoid
# `"".splitlines()[0]` (IndexError on empty).
if r.error is None:
err = ""
elif r.error == "":
err = "<empty error>"
else:
err = r.error.splitlines()[0][:80]
# ``is None`` (not falsy-or): epoch-ns 0 is a valid timestamp, distinct
# from a never-attempted (None) row.
last = "" if r.last_attempt_at is None else r.last_attempt_at
lines.append(
f"| {r.recording_hash[:16]} | {r.status.value} | "
f"{'yes' if r.dead_letter else ''} | {last} | {err} |"
)
total = self.n_processed + self.n_failed + self.n_ingesting + self.n_raw
shown = f" (showing {len(self.rows)})" if len(self.rows) != total else ""
return "\n".join(lines) + (
f"\n\n**{total} recordings**{shown} — processed={self.n_processed}, "
f"failed={self.n_failed}, ingesting={self.n_ingesting}, raw={self.n_raw}"
)
[docs]
class ActiveJobs(BaseModel):
"""In-flight ingestion work — modalities currently leased (``ingesting``)."""
model_config = ConfigDict(frozen=True)
ingesting: tuple[ModalityIngestion, ...]
n_ingesting: int
def _modality_view(m: ModalityRow) -> ModalityIngestion:
return ModalityIngestion(
modality=m.modality,
worker_id=m.worker_id,
status=m.ingestion_status,
error=m.ingestion_error,
attempts=m.ingestion_attempts,
last_attempt_at=m.last_attempt_at,
)
[docs]
def aggregate_recording_ingestion(qr: "QueryResult") -> RecordingIngestion:
"""Aggregate a mixed-status ``QueryResult`` (from ``query(status="all")``) into a
single recording-level ingestion status.
Precedence: ``failed`` > ``ingesting`` > ``raw`` > ``processed`` — the recording
reads ``failed`` if *any* worker failed. ``dead_letter`` is true if any failed
modality carries the durable ``metadata["dead_letter"]`` flag; ``error`` is the
first failed modality's message. A recording with **no** modality rows
aggregates to ``raw`` (registered but never ingested) — never ``processed``.
``expanded`` rows (combined modalities split into per-stream children) rank
alongside ``processed`` and **fold to** ``processed`` in the aggregate, so the
recording-level ``status`` is never ``expanded`` (Cepheus only knows the four
pre-existing statuses). They are also omitted from the per-modality
``modalities`` tuple — the data lives in their children (themselves
``processed`` rows in the same result).
"""
# Deterministic order (dict-insertion order from the query is incidental).
rows = sorted(qr.modalities.values(), key=lambda m: (m.modality, m.worker_id))
# EXPANDED parents are bookkeeping rows with no asset — their data lives in
# the per-stream children (themselves PROCESSED rows in the same result). Omit
# them from the per-modality view so Cepheus never receives a ModalityIngestion
# carrying the ``expanded`` status it doesn't know how to render. The rank fold
# below still considers ALL rows, so an all-EXPANDED recording isn't misread as
# empty/raw — it folds to PROCESSED.
mods = tuple(
_modality_view(m) for m in rows if m.ingestion_status is not IngestionStatus.EXPANDED
)
agg_status = max(
(m.ingestion_status for m in rows),
key=lambda s: _STATUS_RANK[s],
default=IngestionStatus.RAW,
)
# Collapse the EXPANDED/PROCESSED tie (both rank 0) to PROCESSED so the
# recording-level aggregate stays within the four statuses Cepheus knows.
if _STATUS_RANK[agg_status] == 0:
agg_status = IngestionStatus.PROCESSED
failed = [m for m in rows if m.ingestion_status is IngestionStatus.FAILED]
dead_letter = any(bool(m.metadata.get("dead_letter")) for m in failed)
# `is not None`, not truthy: an empty-string error ("" — e.g. a bare
# `raise SomeError()`) is a real failure signal that mark_ingestion_status
# preserves, so don't skip past it to report error=None.
error = next((m.ingestion_error for m in failed if m.ingestion_error is not None), None)
attempts_at = [m.last_attempt_at for m in rows if m.last_attempt_at is not None]
return RecordingIngestion(
recording_hash=qr.recording_hash,
status=agg_status,
dead_letter=dead_letter,
error=error,
started_at=qr.start_time,
last_attempt_at=max(attempts_at) if attempts_at else None,
modalities=mods,
)
[docs]
def recent_ingestions(
*,
profile: str = "r2",
since: int | None = None,
limit: int | None = 50,
data: DataInterface | None = None,
) -> RecentIngestions:
"""Per-recording ingestion status, most-recently-attempted first.
Queries the catalog once with ``status="all"`` and aggregates each recording
via :func:`aggregate_recording_ingestion`. ``since`` (epoch-ns) keeps only
recordings whose ``last_attempt_at`` is set **and** ``>= since`` — this drops
never-attempted ``raw`` rows *and* any pre-migration ``processed`` rows whose
``last_attempt_at`` was backfilled to ``NULL`` by
``migrate_modalities_ingestion_state``. ``limit`` truncates ``rows`` (pass
``None`` for the full list, as Cepheus's recordings page does). The ``n_*``
counts cover the full post-``since`` set (the same windowed scope as ``rows``,
not just the limit-truncated slice).
"""
di = data if data is not None else DataInterface(profile=profile)
aggregates = [aggregate_recording_ingestion(qr) for qr in di.query(status="all")]
if since is not None:
aggregates = [
a for a in aggregates if a.last_attempt_at is not None and a.last_attempt_at >= since
]
aggregates.sort(
key=lambda a: a.last_attempt_at if a.last_attempt_at is not None else -1,
reverse=True,
)
counts = {s: 0 for s in IngestionStatus}
for a in aggregates:
counts[a.status] += 1
# aggregate_recording_ingestion folds every rank-0 (processed/expanded) recording
# to PROCESSED, so no recording-level aggregate is ever EXPANDED. Enforce it with
# an explicit raise (NOT assert — this is a prod read path feeding Cepheus, and
# `python -O` strips asserts; a fold regression would then silently undercount
# n_processed, since the four returned n_* fields don't read counts[EXPANDED]).
if counts[IngestionStatus.EXPANDED] != 0:
raise AssertionError(
"aggregate_recording_ingestion must fold expanded → processed; "
"an EXPANDED recording-level aggregate leaked into recent_ingestions counts"
)
rows = aggregates[:limit] if limit is not None else aggregates
return RecentIngestions(
rows=tuple(rows),
n_processed=counts[IngestionStatus.PROCESSED],
n_failed=counts[IngestionStatus.FAILED],
n_ingesting=counts[IngestionStatus.INGESTING],
n_raw=counts[IngestionStatus.RAW],
)
[docs]
def active_jobs(*, profile: str = "r2", data: DataInterface | None = None) -> ActiveJobs:
"""Modalities currently being ingested (``ingestion_status="ingesting"``) — the
in-flight leases held by the Virgo ingestion daemon."""
di = data if data is not None else DataInterface(profile=profile)
# Targeted modality scan rather than a full status="all" recording query +
# hydration — we only need the ingesting rows.
rows = di.list_modalities(where={"ingestion_status": IngestionStatus.INGESTING.value})
ingesting = tuple(_modality_view(m) for m in rows)
return ActiveJobs(ingesting=ingesting, n_ingesting=len(ingesting))