Source code for ursa.status

"""Read-only ``ursa.status.*`` namespace — catalog observability.

The per-package ``*.status.*`` observability convention: a read-only namespace
returning Pydantic models with ``to_dataframe()`` / ``to_markdown()``. This is the
**single per-recording ingestion-status contract** consumed by Cepheus's ``/data``
page and the ``constellation-status-api`` HTTP wrapper — the recording-level
aggregation lives here so consumers don't each re-derive the precedence rules.

Built over the catalog's ingestion-lifecycle columns (``ingestion_status`` +
``ingestion_error`` / ``ingestion_attempts`` / ``last_attempt_at`` + the durable
``metadata["dead_letter"]`` flag). Read-only: no writes, no side effects.
"""

from __future__ import annotations

from typing import TYPE_CHECKING

from pydantic import BaseModel, ConfigDict

from ursa.catalog import IngestionStatus, ModalityRow
from ursa.data_interface import DataInterface

if TYPE_CHECKING:
    import pandas as pd

    from ursa._query_types import QueryResult

__all__ = [
    "ActiveJobs",
    "ModalityIngestion",
    "RecentIngestions",
    "RecordingIngestion",
    "active_jobs",
    "aggregate_recording_ingestion",
    "recent_ingestions",
]

# Recording-level aggregate precedence: a recording reads as the most-urgent
# state across its modalities (most → least urgent).
_STATUS_RANK: dict[IngestionStatus, int] = {
    IngestionStatus.FAILED: 3,
    IngestionStatus.INGESTING: 2,
    IngestionStatus.RAW: 1,
    # PROCESSED and EXPANDED are both terminal-success at the same precedence
    # (0). The aggregate fold below collapses a rank-0 max to PROCESSED, so an
    # EXPANDED-bearing recording never surfaces an ``expanded`` aggregate status
    # to Cepheus (which only knows the four pre-existing statuses).
    IngestionStatus.PROCESSED: 0,
    IngestionStatus.EXPANDED: 0,
}
# Exhaustiveness invariant: every IngestionStatus must have a rank, or the
# ``max(..., key=_STATUS_RANK[s])`` in aggregate_recording_ingestion KeyErrors on
# the first row carrying the un-ranked status. A unit test pins this so a future
# enum addition fails in CI, not in prod.
assert set(_STATUS_RANK) == set(IngestionStatus), (
    "every IngestionStatus needs a _STATUS_RANK entry; "
    f"missing: {set(IngestionStatus) - set(_STATUS_RANK)}"
)


[docs] class ModalityIngestion(BaseModel): """One modality's ingestion state (a flattened view of its ``ModalityRow``).""" model_config = ConfigDict(frozen=True) modality: str worker_id: str status: IngestionStatus error: str | None attempts: int last_attempt_at: int | None
[docs] class RecordingIngestion(BaseModel): """Recording-level ingestion status, aggregated across all its modalities.""" model_config = ConfigDict(frozen=True) recording_hash: str #: Aggregate of the per-modality statuses (``failed`` > ``ingesting`` > ``raw`` > ``processed``). status: IngestionStatus #: True if any failed modality carries the durable ``metadata["dead_letter"]`` flag. dead_letter: bool #: First failed modality's ``ingestion_error`` — ``None`` if nothing failed, #: or possibly ``""`` if that modality recorded an empty-string error. error: str | None started_at: int # recording start_time, epoch-ns last_attempt_at: int | None # max across modalities (None if never attempted) modalities: tuple[ModalityIngestion, ...]
[docs] class RecentIngestions(BaseModel): """Result of :func:`recent_ingestions`. ``rows`` is the (limited) recent slice; the ``n_*`` counts are over the full post-``since`` set (pre-limit).""" model_config = ConfigDict(frozen=True) rows: tuple[RecordingIngestion, ...] n_processed: int n_failed: int n_ingesting: int n_raw: int
[docs] def to_dataframe(self) -> "pd.DataFrame": import pandas as pd return pd.DataFrame( [ { "recording_hash": r.recording_hash, "status": r.status.value, "dead_letter": r.dead_letter, "error": r.error, "started_at": r.started_at, "last_attempt_at": r.last_attempt_at, "n_modalities": len(r.modalities), } for r in self.rows ] )
[docs] def to_markdown(self) -> str: lines = [ "| recording | status | dead_letter | last_attempt_at | error |", "|---|---|---|---|---|", ] for r in self.rows: # `is None` vs "": a FAILED row with an empty-string error (a bare # `raise`) is distinct from no error — render it visibly, and avoid # `"".splitlines()[0]` (IndexError on empty). if r.error is None: err = "" elif r.error == "": err = "<empty error>" else: err = r.error.splitlines()[0][:80] # ``is None`` (not falsy-or): epoch-ns 0 is a valid timestamp, distinct # from a never-attempted (None) row. last = "" if r.last_attempt_at is None else r.last_attempt_at lines.append( f"| {r.recording_hash[:16]} | {r.status.value} | " f"{'yes' if r.dead_letter else ''} | {last} | {err} |" ) total = self.n_processed + self.n_failed + self.n_ingesting + self.n_raw shown = f" (showing {len(self.rows)})" if len(self.rows) != total else "" return "\n".join(lines) + ( f"\n\n**{total} recordings**{shown} — processed={self.n_processed}, " f"failed={self.n_failed}, ingesting={self.n_ingesting}, raw={self.n_raw}" )
[docs] class ActiveJobs(BaseModel): """In-flight ingestion work — modalities currently leased (``ingesting``).""" model_config = ConfigDict(frozen=True) ingesting: tuple[ModalityIngestion, ...] n_ingesting: int
def _modality_view(m: ModalityRow) -> ModalityIngestion: return ModalityIngestion( modality=m.modality, worker_id=m.worker_id, status=m.ingestion_status, error=m.ingestion_error, attempts=m.ingestion_attempts, last_attempt_at=m.last_attempt_at, )
[docs] def aggregate_recording_ingestion(qr: "QueryResult") -> RecordingIngestion: """Aggregate a mixed-status ``QueryResult`` (from ``query(status="all")``) into a single recording-level ingestion status. Precedence: ``failed`` > ``ingesting`` > ``raw`` > ``processed`` — the recording reads ``failed`` if *any* worker failed. ``dead_letter`` is true if any failed modality carries the durable ``metadata["dead_letter"]`` flag; ``error`` is the first failed modality's message. A recording with **no** modality rows aggregates to ``raw`` (registered but never ingested) — never ``processed``. ``expanded`` rows (combined modalities split into per-stream children) rank alongside ``processed`` and **fold to** ``processed`` in the aggregate, so the recording-level ``status`` is never ``expanded`` (Cepheus only knows the four pre-existing statuses). They are also omitted from the per-modality ``modalities`` tuple — the data lives in their children (themselves ``processed`` rows in the same result). """ # Deterministic order (dict-insertion order from the query is incidental). rows = sorted(qr.modalities.values(), key=lambda m: (m.modality, m.worker_id)) # EXPANDED parents are bookkeeping rows with no asset — their data lives in # the per-stream children (themselves PROCESSED rows in the same result). Omit # them from the per-modality view so Cepheus never receives a ModalityIngestion # carrying the ``expanded`` status it doesn't know how to render. The rank fold # below still considers ALL rows, so an all-EXPANDED recording isn't misread as # empty/raw — it folds to PROCESSED. mods = tuple( _modality_view(m) for m in rows if m.ingestion_status is not IngestionStatus.EXPANDED ) agg_status = max( (m.ingestion_status for m in rows), key=lambda s: _STATUS_RANK[s], default=IngestionStatus.RAW, ) # Collapse the EXPANDED/PROCESSED tie (both rank 0) to PROCESSED so the # recording-level aggregate stays within the four statuses Cepheus knows. if _STATUS_RANK[agg_status] == 0: agg_status = IngestionStatus.PROCESSED failed = [m for m in rows if m.ingestion_status is IngestionStatus.FAILED] dead_letter = any(bool(m.metadata.get("dead_letter")) for m in failed) # `is not None`, not truthy: an empty-string error ("" — e.g. a bare # `raise SomeError()`) is a real failure signal that mark_ingestion_status # preserves, so don't skip past it to report error=None. error = next((m.ingestion_error for m in failed if m.ingestion_error is not None), None) attempts_at = [m.last_attempt_at for m in rows if m.last_attempt_at is not None] return RecordingIngestion( recording_hash=qr.recording_hash, status=agg_status, dead_letter=dead_letter, error=error, started_at=qr.start_time, last_attempt_at=max(attempts_at) if attempts_at else None, modalities=mods, )
[docs] def recent_ingestions( *, profile: str = "r2", since: int | None = None, limit: int | None = 50, data: DataInterface | None = None, ) -> RecentIngestions: """Per-recording ingestion status, most-recently-attempted first. Queries the catalog once with ``status="all"`` and aggregates each recording via :func:`aggregate_recording_ingestion`. ``since`` (epoch-ns) keeps only recordings whose ``last_attempt_at`` is set **and** ``>= since`` — this drops never-attempted ``raw`` rows *and* any pre-migration ``processed`` rows whose ``last_attempt_at`` was backfilled to ``NULL`` by ``migrate_modalities_ingestion_state``. ``limit`` truncates ``rows`` (pass ``None`` for the full list, as Cepheus's recordings page does). The ``n_*`` counts cover the full post-``since`` set (the same windowed scope as ``rows``, not just the limit-truncated slice). """ di = data if data is not None else DataInterface(profile=profile) aggregates = [aggregate_recording_ingestion(qr) for qr in di.query(status="all")] if since is not None: aggregates = [ a for a in aggregates if a.last_attempt_at is not None and a.last_attempt_at >= since ] aggregates.sort( key=lambda a: a.last_attempt_at if a.last_attempt_at is not None else -1, reverse=True, ) counts = {s: 0 for s in IngestionStatus} for a in aggregates: counts[a.status] += 1 # aggregate_recording_ingestion folds every rank-0 (processed/expanded) recording # to PROCESSED, so no recording-level aggregate is ever EXPANDED. Enforce it with # an explicit raise (NOT assert — this is a prod read path feeding Cepheus, and # `python -O` strips asserts; a fold regression would then silently undercount # n_processed, since the four returned n_* fields don't read counts[EXPANDED]). if counts[IngestionStatus.EXPANDED] != 0: raise AssertionError( "aggregate_recording_ingestion must fold expanded → processed; " "an EXPANDED recording-level aggregate leaked into recent_ingestions counts" ) rows = aggregates[:limit] if limit is not None else aggregates return RecentIngestions( rows=tuple(rows), n_processed=counts[IngestionStatus.PROCESSED], n_failed=counts[IngestionStatus.FAILED], n_ingesting=counts[IngestionStatus.INGESTING], n_raw=counts[IngestionStatus.RAW], )
[docs] def active_jobs(*, profile: str = "r2", data: DataInterface | None = None) -> ActiveJobs: """Modalities currently being ingested (``ingestion_status="ingesting"``) — the in-flight leases held by the Virgo ingestion daemon.""" di = data if data is not None else DataInterface(profile=profile) # Targeted modality scan rather than a full status="all" recording query + # hydration — we only need the ingesting rows. rows = di.list_modalities(where={"ingestion_status": IngestionStatus.INGESTING.value}) ingesting = tuple(_modality_view(m) for m in rows) return ActiveJobs(ingesting=ingesting, n_ingesting=len(ingesting))