ursa.status¶
Read-only ursa.status.* namespace — catalog observability.
The per-package *.status.* observability convention: a read-only namespace
returning Pydantic models with to_dataframe() / to_markdown(). This is the
single per-recording ingestion-status contract consumed by Cepheus’s /data
page and the constellation-status-api HTTP wrapper — the recording-level
aggregation lives here so consumers don’t each re-derive the precedence rules.
Built over the catalog’s ingestion-lifecycle columns (ingestion_status +
ingestion_error / ingestion_attempts / last_attempt_at + the durable
metadata["dead_letter"] flag). Read-only: no writes, no side effects.
Module Contents¶
Classes¶
In-flight ingestion work — modalities currently leased ( |
|
One modality’s ingestion state (a flattened view of its |
|
Result of :func: |
|
Recording-level ingestion status, aggregated across all its modalities. |
Functions¶
Modalities currently being ingested ( |
|
Aggregate a mixed-status |
|
Per-recording ingestion status, most-recently-attempted first. |
API¶
- class ursa.status.ActiveJobs(/, **data: typing.Any)[source]¶
Bases:
pydantic.BaseModelIn-flight ingestion work — modalities currently leased (
ingesting).Initialization
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.- model_config¶
‘ConfigDict(…)’
- ingesting: tuple[ursa.status.ModalityIngestion, ...]¶
None
- n_ingesting: int¶
None
- class ursa.status.ModalityIngestion(/, **data: typing.Any)[source]¶
Bases:
pydantic.BaseModelOne modality’s ingestion state (a flattened view of its
ModalityRow).Initialization
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.- model_config¶
‘ConfigDict(…)’
- modality: str¶
None
- worker_id: str¶
None
- status: ursa.catalog.IngestionStatus¶
None
- error: str | None¶
None
- attempts: int¶
None
- last_attempt_at: int | None¶
None
- class ursa.status.RecentIngestions(/, **data: typing.Any)[source]¶
Bases:
pydantic.BaseModelResult of :func:
recent_ingestions.rowsis the (limited) recent slice; then_*counts are over the full post-sinceset (pre-limit).Initialization
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.- model_config¶
‘ConfigDict(…)’
- rows: tuple[ursa.status.RecordingIngestion, ...]¶
None
- n_processed: int¶
None
- n_failed: int¶
None
- n_ingesting: int¶
None
- n_raw: int¶
None
- class ursa.status.RecordingIngestion(/, **data: typing.Any)[source]¶
Bases:
pydantic.BaseModelRecording-level ingestion status, aggregated across all its modalities.
Initialization
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.- model_config¶
‘ConfigDict(…)’
- recording_hash: str¶
None
- status: ursa.catalog.IngestionStatus¶
None
- dead_letter: bool¶
None
- error: str | None¶
None
- started_at: int¶
None
- last_attempt_at: int | None¶
None
- modalities: tuple[ursa.status.ModalityIngestion, ...]¶
None
- ursa.status.active_jobs(*, profile: str = 'r2', data: ursa.data_interface.DataInterface | None = None) ursa.status.ActiveJobs[source]¶
Modalities currently being ingested (
ingestion_status="ingesting") — the in-flight leases held by the Virgo ingestion daemon.
- ursa.status.aggregate_recording_ingestion(qr: ursa._query_types.QueryResult) ursa.status.RecordingIngestion[source]¶
Aggregate a mixed-status
QueryResult(fromquery(status="all")) into a single recording-level ingestion status.Precedence:
failed>ingesting>raw>processed— the recording readsfailedif any worker failed.dead_letteris true if any failed modality carries the durablemetadata["dead_letter"]flag;erroris the first failed modality’s message. A recording with no modality rows aggregates toraw(registered but never ingested) — neverprocessed.expandedrows (combined modalities split into per-stream children) rank alongsideprocessedand fold toprocessedin the aggregate, so the recording-levelstatusis neverexpanded(Cepheus only knows the four pre-existing statuses). They are also omitted from the per-modalitymodalitiestuple — the data lives in their children (themselvesprocessedrows in the same result).
- ursa.status.recent_ingestions(*, profile: str = 'r2', since: int | None = None, limit: int | None = 50, data: ursa.data_interface.DataInterface | None = None) ursa.status.RecentIngestions[source]¶
Per-recording ingestion status, most-recently-attempted first.
Queries the catalog once with
status="all"and aggregates each recording via :func:aggregate_recording_ingestion.since(epoch-ns) keeps only recordings whoselast_attempt_atis set and>= since— this drops never-attemptedrawrows and any pre-migrationprocessedrows whoselast_attempt_atwas backfilled toNULLbymigrate_modalities_ingestion_state.limittruncatesrows(passNonefor the full list, as Cepheus’s recordings page does). Then_*counts cover the full post-sinceset (the same windowed scope asrows, not just the limit-truncated slice).