Source code for ursa.store.backends.r2

"""Cloudflare R2 backend, S3-protocol via obstore.

Credentials (and bucket) are pulled from constellation-utils based on
the `creds` field of `R2StoreConfig`. The cred-name selector is the
single source of truth for which bucket the store binds to:

- ``assets_rw`` -> ``secrets.r2_assets_rw()`` -> ``constellation-assets`` RW
- ``assets_ro`` -> ``secrets.r2_assets_ro()`` -> ``constellation-assets`` RO
- ``raw_rw``    -> ``secrets.r2_raw_rw()``    -> ``constellation-data``   RW
- ``raw_ro``    -> ``secrets.r2_raw_ro()``    -> ``constellation-data``   RO

The configured `prefix` is pushed into the obstore handle at
construction so `raw_obstore()` returns a prefix-correct handle for
Lance/Zarr backends to consume natively.
"""

from __future__ import annotations

from collections.abc import Callable
from typing import Final

from constellation_utils import secrets
from constellation_utils.secrets.models import R2Secrets
from obstore.store import S3Store

from ursa.store.backends._obstore import ObstoreBackend
from ursa.store.base import ObjectStore
from ursa.store.config import R2Creds, R2StoreConfig

# Map cred-name -> the constellation-utils accessor that returns
# bucket+credentials. lru_cache inside each accessor keeps repeat
# lookups free; we don't add a second layer of caching here.
_CREDS_MAP: Final[dict[R2Creds, Callable[[], R2Secrets]]] = {
    "raw_rw": secrets.r2_raw_rw,
    "raw_ro": secrets.r2_raw_ro,
    "assets_rw": secrets.r2_assets_rw,
    "assets_ro": secrets.r2_assets_ro,
}


[docs] def build_r2_store( cfg: R2StoreConfig, *, role: str, allow_http: bool = False, ) -> ObjectStore: """Construct an R2-backed `ObjectStore` for the given role. R2 requires `virtual_hosted_style_request=False` (path-style addressing); region is always `"auto"` per Cloudflare's S3 protocol. ``allow_http`` should only be set to ``True`` in tests against a local HTTP endpoint (e.g. MinIO). Production R2 always uses HTTPS. """ creds = _CREDS_MAP[cfg.creds]() inner = S3Store( bucket=creds.bucket, prefix=cfg.prefix or None, endpoint=creds.endpoint, access_key_id=creds.access_key_id, secret_access_key=creds.secret_access_key, region=creds.region, virtual_hosted_style_request=False, allow_http=allow_http, ) # Compose the lancedb-shaped connection. The URI is bucket + configured # prefix; consumers nesting under a sub-namespace (e.g. Catalog adding # "catalog/") append the suffix themselves. `endpoint=` is required for # R2 — without it the underlying S3 client targets AWS and fails # opaquely. `allow_http=true` mirrors the obstore handle's flag for # MinIO-style local test endpoints. prefix_part = cfg.prefix.lstrip("/") if cfg.prefix else "" lance_uri = f"s3://{creds.bucket}/{prefix_part}".rstrip("/") lance_storage_options: dict[str, str] = { "endpoint": creds.endpoint, "access_key_id": creds.access_key_id, "secret_access_key": creds.secret_access_key, "region": creds.region, } if allow_http: # lancedb's storage_options is dict[str, str]; bool must be stringified. lance_storage_options["allow_http"] = "true" return ObstoreBackend( inner, role=role, backend="r2", prefix=cfg.prefix, lance_uri=lance_uri, lance_storage_options=lance_storage_options, )