Source code for ursa.store.backends.r2
"""Cloudflare R2 backend, S3-protocol via obstore.
Credentials (and bucket) are pulled from constellation-utils based on
the `creds` field of `R2StoreConfig`. The cred-name selector is the
single source of truth for which bucket the store binds to:
- ``assets_rw`` -> ``secrets.r2_assets_rw()`` -> ``constellation-assets`` RW
- ``assets_ro`` -> ``secrets.r2_assets_ro()`` -> ``constellation-assets`` RO
- ``raw_rw`` -> ``secrets.r2_raw_rw()`` -> ``constellation-data`` RW
- ``raw_ro`` -> ``secrets.r2_raw_ro()`` -> ``constellation-data`` RO
The configured `prefix` is pushed into the obstore handle at
construction so `raw_obstore()` returns a prefix-correct handle for
Lance/Zarr backends to consume natively.
"""
from __future__ import annotations
from collections.abc import Callable
from typing import Final
from constellation_utils import secrets
from constellation_utils.secrets.models import R2Secrets
from obstore.store import S3Store
from ursa.store.backends._obstore import ObstoreBackend
from ursa.store.base import ObjectStore
from ursa.store.config import R2Creds, R2StoreConfig
# Map cred-name -> the constellation-utils accessor that returns
# bucket+credentials. lru_cache inside each accessor keeps repeat
# lookups free; we don't add a second layer of caching here.
_CREDS_MAP: Final[dict[R2Creds, Callable[[], R2Secrets]]] = {
"raw_rw": secrets.r2_raw_rw,
"raw_ro": secrets.r2_raw_ro,
"assets_rw": secrets.r2_assets_rw,
"assets_ro": secrets.r2_assets_ro,
}
[docs]
def build_r2_store(
cfg: R2StoreConfig,
*,
role: str,
allow_http: bool = False,
) -> ObjectStore:
"""Construct an R2-backed `ObjectStore` for the given role.
R2 requires `virtual_hosted_style_request=False` (path-style
addressing); region is always `"auto"` per Cloudflare's S3 protocol.
``allow_http`` should only be set to ``True`` in tests against a
local HTTP endpoint (e.g. MinIO). Production R2 always uses HTTPS.
"""
creds = _CREDS_MAP[cfg.creds]()
inner = S3Store(
bucket=creds.bucket,
prefix=cfg.prefix or None,
endpoint=creds.endpoint,
access_key_id=creds.access_key_id,
secret_access_key=creds.secret_access_key,
region=creds.region,
virtual_hosted_style_request=False,
allow_http=allow_http,
)
# Compose the lancedb-shaped connection. The URI is bucket + configured
# prefix; consumers nesting under a sub-namespace (e.g. Catalog adding
# "catalog/") append the suffix themselves. `endpoint=` is required for
# R2 — without it the underlying S3 client targets AWS and fails
# opaquely. `allow_http=true` mirrors the obstore handle's flag for
# MinIO-style local test endpoints.
prefix_part = cfg.prefix.lstrip("/") if cfg.prefix else ""
lance_uri = f"s3://{creds.bucket}/{prefix_part}".rstrip("/")
lance_storage_options: dict[str, str] = {
"endpoint": creds.endpoint,
"access_key_id": creds.access_key_id,
"secret_access_key": creds.secret_access_key,
"region": creds.region,
}
if allow_http:
# lancedb's storage_options is dict[str, str]; bool must be stringified.
lance_storage_options["allow_http"] = "true"
return ObstoreBackend(
inner,
role=role,
backend="r2",
prefix=cfg.prefix,
lance_uri=lance_uri,
lance_storage_options=lance_storage_options,
)