Source code for ursa.store.backends._obstore

"""obstore-backed `ObjectStore` implementation.

One class wraps any obstore handle (S3Store for R2, LocalStore, future
GCSStore/AzureStore). Responsibilities:

- Translate obstore types into Ursa types (`ObjectMeta`).
- Translate obstore exceptions into Ursa errors.
- Validate `extra_metadata` against S3/R2 user-metadata constraints
  *before* any network call so callers see typed errors rather than
  opaque obstore tracebacks.
- Emit OTel spans on each public op (low-cardinality attrs only — never
  the caller-supplied `key`).

The wrapper does **no** path composition: prefix scoping lives in the
obstore handle (`S3Store(prefix=...)`, `LocalStore(prefix=...)`). This
keeps `raw_obstore()` correct for Lance/Zarr backends that bypass the
wrapper.
"""

from __future__ import annotations

import functools
import io
import re
from collections.abc import Iterator, Mapping
from contextlib import AbstractContextManager, contextmanager
from datetime import datetime, timezone
from typing import (
    TYPE_CHECKING,
    Any,
    BinaryIO,
    Callable,
    Final,
    Literal,
    TypeVar,
    cast,
)

import obstore
from obstore.exceptions import (
    AlreadyExistsError,
    NotFoundError,
    PermissionDeniedError,
    PreconditionError,
    UnauthenticatedError,
)
from opentelemetry import trace

from ursa.store.base import (
    ETagMismatch,
    InvalidMetadataError,
    ObjectAccessDenied,
    ObjectExists,
    ObjectMeta,
    ObjectNotFound,
)

if TYPE_CHECKING:
    import obstore.store as _obstore_store


__all__ = ["ObstoreBackend"]


_TRACER = trace.get_tracer("ursa.store")
_F = TypeVar("_F", bound=Callable[..., Any])

# S3/R2 user metadata key constraints. obstore passes attribute keys
# straight through to the backend, which fails opaquely if the key is
# uppercase or contains characters S3 rejects. Pre-validate here so
# callers see a clear local error.
_METADATA_KEY_RE: Final = re.compile(r"^[a-z0-9_-]+$")
_METADATA_TOTAL_BYTES_LIMIT: Final = 2048

# obstore attribute keys that aren't user metadata.
_RESERVED_ATTRIBUTES: Final = frozenset(
    {
        "Content-Disposition",
        "Content-Encoding",
        "Content-Language",
        "Content-Type",
        "Cache-Control",
    }
)


[docs] def _trace(op: str) -> Callable[[_F], _F]: """Wrap a public method with an OTel span. Span attributes are intentionally low-cardinality: `ursa.store.role`, `ursa.store.backend`, `ursa.store.op`. Keys are *not* included — they're unbounded-cardinality and don't belong in span tags. """ def decorator(func: _F) -> _F: @functools.wraps(func) def wrapper(self: "ObstoreBackend", *args: Any, **kwargs: Any) -> Any: with _TRACER.start_as_current_span(f"ursa.store.{op}") as span: span.set_attribute("ursa.store.role", self._role) span.set_attribute("ursa.store.backend", self._backend) span.set_attribute("ursa.store.op", op) return func(self, *args, **kwargs) return cast(_F, wrapper) return decorator
[docs] def _validate_extra_metadata(extra: Mapping[str, str] | None) -> None: """Validate `extra_metadata` against S3/R2 user-metadata constraints. Raises `InvalidMetadataError` (with a specific reason) if any key fails the charset rule or if the combined header size exceeds 2 KiB. """ if not extra: return total = 0 for k, v in extra.items(): if not isinstance(k, str) or not _METADATA_KEY_RE.fullmatch(k): raise InvalidMetadataError( f"extra_metadata key {k!r} is invalid: must match [a-z0-9_-]+" ) if not isinstance(v, str): raise InvalidMetadataError(f"extra_metadata[{k!r}] must be str, got {type(v).__name__}") # `x-amz-meta-` prefix is added by S3 on the wire; account for it. total += len(k) + len(v) + len("x-amz-meta-") if total > _METADATA_TOTAL_BYTES_LIMIT: raise InvalidMetadataError( f"extra_metadata total size {total} bytes exceeds limit {_METADATA_TOTAL_BYTES_LIMIT}" )
[docs] def _build_attributes( *, content_type: str | None, sha256: str | None, extra_metadata: Mapping[str, str] | None, ) -> dict[str, str] | None: """Compose obstore `Attributes` (a dict) from the wrapper's split args.""" attrs: dict[str, str] = {} if content_type is not None: attrs["Content-Type"] = content_type if sha256 is not None: attrs["sha256"] = sha256 if extra_metadata: for k, v in extra_metadata.items(): if k in _RESERVED_ATTRIBUTES: # The caller probably meant to pass this via its dedicated # field; reject rather than silently override. raise InvalidMetadataError( f"extra_metadata key {k!r} collides with a reserved " f"attribute; pass via its dedicated field instead" ) attrs[k] = v return attrs or None
[docs] def _meta_from_obstore( raw: Mapping[str, Any], *, strip_prefix: str = "", sha256: str | None = None, ) -> ObjectMeta: """Translate obstore's ObjectMeta TypedDict into ours. `path` (obstore) -> `key` (ours, prefix-stripped). `last_modified` is normalized to UTC. `sha256` defaults to `None` because obstore's list/head do not surface user metadata; callers that want sha256 from a read must use `get` and inspect the result's attributes. """ last_modified = raw["last_modified"] if isinstance(last_modified, datetime) and last_modified.tzinfo is None: last_modified = last_modified.replace(tzinfo=timezone.utc) path = raw["path"] if strip_prefix and path.startswith(strip_prefix): path = path[len(strip_prefix) :] return ObjectMeta( key=path, size=int(raw["size"]), etag=raw.get("e_tag"), last_modified=last_modified, sha256=sha256, )
[docs] @contextmanager def _translate_errors() -> Iterator[None]: """Map obstore exceptions to Ursa errors.""" try: yield except (NotFoundError, FileNotFoundError) as exc: raise ObjectNotFound(str(exc)) from exc except AlreadyExistsError as exc: raise ObjectExists(str(exc)) from exc except PreconditionError as exc: raise ETagMismatch(str(exc)) from exc except (PermissionDeniedError, UnauthenticatedError) as exc: raise ObjectAccessDenied(str(exc)) from exc
[docs] class ObstoreBackend: """`ObjectStore` implementation over an obstore handle. The handle is constructed already-prefix-scoped by the per-backend factories (`backends/r2.py`, `backends/local.py`); this class does no path composition. """ def __init__( self, inner: "_obstore_store.ObjectStore", *, role: str, backend: str, prefix: str = "", supports_attributes: bool = True, supports_etag_match: bool = True, lance_uri: str = "", lance_storage_options: Mapping[str, str] | None = None, ) -> None: """`supports_attributes` / `supports_etag_match` are backend capability flags. obstore's `LocalStore` raises `NotImplementedError` when attributes or `UpdateVersion` modes are passed; those backends opt out by setting these to `False`, and `put` raises a typed `InvalidMetadataError` with a clear message rather than letting the obstore error leak through. `prefix` is the same value pushed into the obstore handle. The wrapper records it to strip from result paths because obstore's S3Store returns prefixed paths from `head` (while LocalStore returns them relative). Stripping here makes the contract consistent. """ self._inner = inner self._role = role self._backend = backend self._prefix = prefix self._supports_attributes = supports_attributes self._supports_etag_match = supports_etag_match self._lance_uri = lance_uri self._lance_storage_options: dict[str, str] = dict(lance_storage_options or {})
[docs] def _strip_prefix(self, path: str) -> str: if self._prefix and path.startswith(self._prefix): return path[len(self._prefix) :] return path
# ---- reads -----------------------------------------------------------
[docs] @_trace("get") def get(self, key: str) -> bytes: with _translate_errors(): result = obstore.get(self._inner, key) return bytes(result.bytes())
[docs] @_trace("get_range") def get_range(self, key: str, *, start: int, length: int) -> bytes: with _translate_errors(): data = obstore.get_range(self._inner, key, start=start, length=length) return bytes(data)
[docs] @_trace("open") def open(self, key: str) -> AbstractContextManager[BinaryIO]: return _OpenReader(self._inner, key)
# ---- writes ----------------------------------------------------------
[docs] @_trace("put") def put( self, key: str, data: bytes | BinaryIO, *, content_type: str | None = None, sha256: str | None = None, extra_metadata: Mapping[str, str] | None = None, if_none_match: Literal["*"] | None = None, if_match: str | None = None, ) -> ObjectMeta: if if_none_match is not None and if_match is not None: raise ValueError("if_none_match and if_match are mutually exclusive") _validate_extra_metadata(extra_metadata) attributes = _build_attributes( content_type=content_type, sha256=sha256, extra_metadata=extra_metadata, ) if attributes is not None and not self._supports_attributes: raise InvalidMetadataError( f"backend {self._backend!r} does not support object " f"attributes (content_type / sha256 / extra_metadata). " f"Use a cloud backend (e.g. r2) if metadata round-trip is required." ) if if_match is not None and not self._supports_etag_match: raise InvalidMetadataError( f"backend {self._backend!r} does not support ETag-conditional " f"writes (if_match). Use a cloud backend (e.g. r2) for CAS." ) # S3-compatible stores (including MinIO) do not reliably honour # obstore's native mode="create" / mode={"e_tag": ...} preconditions # — the PUT can silently succeed without raising AlreadyExistsError / # PreconditionError. Mirror the approach used by `copy(overwrite=False)`: # enforce preconditions with an explicit HEAD round-trip before writing. # Small TOCTOU window; acceptable for single-writer ingestion workloads. with _translate_errors(): if if_none_match == "*": try: obstore.head(self._inner, key) except (NotFoundError, FileNotFoundError): pass else: raise ObjectExists(f"object already exists: {key}") elif if_match is not None: try: existing = obstore.head(self._inner, key) except (NotFoundError, FileNotFoundError) as exc: raise ETagMismatch(f"object does not exist: {key}") from exc actual = (existing.get("e_tag") or "").strip('"') expected = if_match.strip('"') if actual != expected: raise ETagMismatch( f"ETag mismatch for {key!r}: " f"expected {if_match!r}, got {existing.get('e_tag')!r}" ) # obstore.put returns only {e_tag, version}; HEAD after to get full # ObjectMeta (size + last_modified). One extra round-trip; consistent # across backends. obstore.put(self._inner, key, data, attributes=attributes) head = obstore.head(self._inner, key) return _meta_from_obstore(head, strip_prefix=self._prefix, sha256=sha256)
[docs] @_trace("copy") def copy(self, src: str, dst: str, *, overwrite: bool = False) -> ObjectMeta: # S3 (and therefore R2) does not implement copy-if-not-exists # natively — `obstore.copy(..., overwrite=False)` raises # `NotSupportedError`. Emulate at the wrapper: HEAD the # destination first, raise `ObjectExists` if present, then # always do an overwriting copy. There's a small TOCTOU window # — acceptable for the M1 use case (ingestion, no concurrent # writers); a follow-up can wire in the native primitive on # backends that support it. with _translate_errors(): if not overwrite: try: obstore.head(self._inner, dst) except (NotFoundError, FileNotFoundError): pass else: raise ObjectExists(f"copy destination already exists: {dst}") obstore.copy(self._inner, src, dst, overwrite=True) head = obstore.head(self._inner, dst) return _meta_from_obstore(head, strip_prefix=self._prefix)
# ---- discovery -------------------------------------------------------
[docs] @_trace("head") def head(self, key: str) -> ObjectMeta: with _translate_errors(): head = obstore.head(self._inner, key) return _meta_from_obstore(head, strip_prefix=self._prefix)
[docs] @_trace("exists") def exists(self, key: str) -> bool: # Call obstore.head directly (rather than self.head) so we don't # produce a nested `ursa.store.head` span inside every # `ursa.store.exists` span. try: with _translate_errors(): obstore.head(self._inner, key) except ObjectNotFound: return False return True
[docs] @_trace("list") def list(self, prefix: str = "") -> Iterator[ObjectMeta]: with _translate_errors(): stream = obstore.list(self._inner, prefix or None) for chunk in stream: for raw in chunk: yield _meta_from_obstore(raw, strip_prefix=self._prefix)
[docs] @_trace("list_prefixes") def list_prefixes(self, prefix: str = "") -> Iterator[str]: with _translate_errors(): result = obstore.list_with_delimiter(self._inner, prefix or None) for cp in result["common_prefixes"]: # Strip the configured prefix (S3Store includes it; Local # doesn't), then normalize trailing '/' for consistency. cp = self._strip_prefix(cp) yield cp if cp.endswith("/") else cp + "/"
# ---- mutation --------------------------------------------------------
[docs] @_trace("delete") def delete(self, key: str) -> None: with _translate_errors(): # obstore.delete on Local raises FileNotFoundError on missing; # _translate_errors maps it to ObjectNotFound. On S3, delete # is idempotent — head first to give consistent semantics # across backends. obstore.head(self._inner, key) obstore.delete(self._inner, key)
# ---- escape hatch ----------------------------------------------------
[docs] def raw_obstore(self) -> "_obstore_store.ObjectStore": return self._inner
[docs] def lance_connection(self) -> tuple[str, dict[str, str]]: if not self._lance_uri: raise NotImplementedError( f"backend {self._backend!r} does not support lancedb connections. " f"The backend factory (e.g. ursa.store.backends.{self._backend}.build_*_store) " f"must construct ObstoreBackend with lance_uri=... and " f"lance_storage_options=... arguments. Returns a fresh dict on " f"each call so callers can mutate without affecting other consumers." ) return self._lance_uri, dict(self._lance_storage_options)
[docs] class _OpenReader(AbstractContextManager[BinaryIO]): """Forward-only streaming reader returned by `ObjectStore.open()`. Implemented over `obstore.get(...).stream()` rather than `obstore.open_reader` because the latter has a known prefix-handling bug on S3Store: when the store is constructed with `prefix=`, the reader double-applies the prefix at request time. `get().stream()` is consistent across backends. """ def __init__(self, inner: "_obstore_store.ObjectStore", key: str) -> None: self._inner = inner self._key = key self._iter: Any = None
[docs] def __enter__(self) -> BinaryIO: with _translate_errors(): result = obstore.get(self._inner, self._key) self._iter = result.stream() return cast(BinaryIO, _NoSeekStreamReader(self._iter))
[docs] def __exit__(self, *exc: object) -> None: # The underlying stream cleans up on its own when iteration # completes or the iterator is garbage-collected; nothing # explicit to close. self._iter = None
[docs] class _NoSeekStreamReader(io.RawIOBase): """Adapt an obstore byte-chunk iterator into a forward-only `BinaryIO`. Explicitly `seekable() == False`; `seek()` raises so callers can't silently get wrong bytes. """ def __init__(self, chunks: Any) -> None: super().__init__() self._chunks = iter(chunks) self._buf = bytearray() self._exhausted = False
[docs] def readable(self) -> bool: return True
[docs] def seekable(self) -> bool: return False
[docs] def seek(self, offset: int, whence: int = 0) -> int: raise io.UnsupportedOperation( "ObjectStore.open() returns a forward-only stream; " "use ObjectStore.get_range(...) for random access" )
[docs] def read(self, size: int = -1) -> bytes: if size < 0: # Drain the rest of the stream. while not self._exhausted: self._pull_chunk() out = bytes(self._buf) self._buf.clear() return out while len(self._buf) < size and not self._exhausted: self._pull_chunk() out = bytes(self._buf[:size]) del self._buf[:size] return out
[docs] def _pull_chunk(self) -> None: try: chunk = next(self._chunks) except StopIteration: self._exhausted = True return self._buf.extend(bytes(chunk))
[docs] def readinto(self, buffer: Any) -> int: data = self.read(len(buffer)) n = len(data) buffer[:n] = data return n