"""obstore-backed `ObjectStore` implementation.
One class wraps any obstore handle (S3Store for R2, LocalStore, future
GCSStore/AzureStore). Responsibilities:
- Translate obstore types into Ursa types (`ObjectMeta`).
- Translate obstore exceptions into Ursa errors.
- Validate `extra_metadata` against S3/R2 user-metadata constraints
*before* any network call so callers see typed errors rather than
opaque obstore tracebacks.
- Emit OTel spans on each public op (low-cardinality attrs only — never
the caller-supplied `key`).
The wrapper does **no** path composition: prefix scoping lives in the
obstore handle (`S3Store(prefix=...)`, `LocalStore(prefix=...)`). This
keeps `raw_obstore()` correct for Lance/Zarr backends that bypass the
wrapper.
"""
from __future__ import annotations
import functools
import io
import re
from collections.abc import Iterator, Mapping
from contextlib import AbstractContextManager, contextmanager
from datetime import datetime, timezone
from typing import (
TYPE_CHECKING,
Any,
BinaryIO,
Callable,
Final,
Literal,
TypeVar,
cast,
)
import obstore
from obstore.exceptions import (
AlreadyExistsError,
NotFoundError,
PermissionDeniedError,
PreconditionError,
UnauthenticatedError,
)
from opentelemetry import trace
from ursa.store.base import (
ETagMismatch,
InvalidMetadataError,
ObjectAccessDenied,
ObjectExists,
ObjectMeta,
ObjectNotFound,
)
if TYPE_CHECKING:
import obstore.store as _obstore_store
__all__ = ["ObstoreBackend"]
_TRACER = trace.get_tracer("ursa.store")
_F = TypeVar("_F", bound=Callable[..., Any])
# S3/R2 user metadata key constraints. obstore passes attribute keys
# straight through to the backend, which fails opaquely if the key is
# uppercase or contains characters S3 rejects. Pre-validate here so
# callers see a clear local error.
_METADATA_KEY_RE: Final = re.compile(r"^[a-z0-9_-]+$")
_METADATA_TOTAL_BYTES_LIMIT: Final = 2048
# obstore attribute keys that aren't user metadata.
_RESERVED_ATTRIBUTES: Final = frozenset(
{
"Content-Disposition",
"Content-Encoding",
"Content-Language",
"Content-Type",
"Cache-Control",
}
)
[docs]
def _trace(op: str) -> Callable[[_F], _F]:
"""Wrap a public method with an OTel span.
Span attributes are intentionally low-cardinality:
`ursa.store.role`, `ursa.store.backend`, `ursa.store.op`. Keys are
*not* included — they're unbounded-cardinality and don't belong in
span tags.
"""
def decorator(func: _F) -> _F:
@functools.wraps(func)
def wrapper(self: "ObstoreBackend", *args: Any, **kwargs: Any) -> Any:
with _TRACER.start_as_current_span(f"ursa.store.{op}") as span:
span.set_attribute("ursa.store.role", self._role)
span.set_attribute("ursa.store.backend", self._backend)
span.set_attribute("ursa.store.op", op)
return func(self, *args, **kwargs)
return cast(_F, wrapper)
return decorator
[docs]
def _build_attributes(
*,
content_type: str | None,
sha256: str | None,
extra_metadata: Mapping[str, str] | None,
) -> dict[str, str] | None:
"""Compose obstore `Attributes` (a dict) from the wrapper's split args."""
attrs: dict[str, str] = {}
if content_type is not None:
attrs["Content-Type"] = content_type
if sha256 is not None:
attrs["sha256"] = sha256
if extra_metadata:
for k, v in extra_metadata.items():
if k in _RESERVED_ATTRIBUTES:
# The caller probably meant to pass this via its dedicated
# field; reject rather than silently override.
raise InvalidMetadataError(
f"extra_metadata key {k!r} collides with a reserved "
f"attribute; pass via its dedicated field instead"
)
attrs[k] = v
return attrs or None
[docs]
@contextmanager
def _translate_errors() -> Iterator[None]:
"""Map obstore exceptions to Ursa errors."""
try:
yield
except (NotFoundError, FileNotFoundError) as exc:
raise ObjectNotFound(str(exc)) from exc
except AlreadyExistsError as exc:
raise ObjectExists(str(exc)) from exc
except PreconditionError as exc:
raise ETagMismatch(str(exc)) from exc
except (PermissionDeniedError, UnauthenticatedError) as exc:
raise ObjectAccessDenied(str(exc)) from exc
[docs]
class ObstoreBackend:
"""`ObjectStore` implementation over an obstore handle.
The handle is constructed already-prefix-scoped by the per-backend
factories (`backends/r2.py`, `backends/local.py`); this class does
no path composition.
"""
def __init__(
self,
inner: "_obstore_store.ObjectStore",
*,
role: str,
backend: str,
prefix: str = "",
supports_attributes: bool = True,
supports_etag_match: bool = True,
lance_uri: str = "",
lance_storage_options: Mapping[str, str] | None = None,
) -> None:
"""`supports_attributes` / `supports_etag_match` are backend
capability flags. obstore's `LocalStore` raises
`NotImplementedError` when attributes or `UpdateVersion` modes
are passed; those backends opt out by setting these to `False`,
and `put` raises a typed `InvalidMetadataError` with a clear
message rather than letting the obstore error leak through.
`prefix` is the same value pushed into the obstore handle. The
wrapper records it to strip from result paths because obstore's
S3Store returns prefixed paths from `head` (while LocalStore
returns them relative). Stripping here makes the contract
consistent.
"""
self._inner = inner
self._role = role
self._backend = backend
self._prefix = prefix
self._supports_attributes = supports_attributes
self._supports_etag_match = supports_etag_match
self._lance_uri = lance_uri
self._lance_storage_options: dict[str, str] = dict(lance_storage_options or {})
[docs]
def _strip_prefix(self, path: str) -> str:
if self._prefix and path.startswith(self._prefix):
return path[len(self._prefix) :]
return path
# ---- reads -----------------------------------------------------------
[docs]
@_trace("get")
def get(self, key: str) -> bytes:
with _translate_errors():
result = obstore.get(self._inner, key)
return bytes(result.bytes())
[docs]
@_trace("get_range")
def get_range(self, key: str, *, start: int, length: int) -> bytes:
with _translate_errors():
data = obstore.get_range(self._inner, key, start=start, length=length)
return bytes(data)
[docs]
@_trace("open")
def open(self, key: str) -> AbstractContextManager[BinaryIO]:
return _OpenReader(self._inner, key)
# ---- writes ----------------------------------------------------------
[docs]
@_trace("put")
def put(
self,
key: str,
data: bytes | BinaryIO,
*,
content_type: str | None = None,
sha256: str | None = None,
extra_metadata: Mapping[str, str] | None = None,
if_none_match: Literal["*"] | None = None,
if_match: str | None = None,
) -> ObjectMeta:
if if_none_match is not None and if_match is not None:
raise ValueError("if_none_match and if_match are mutually exclusive")
_validate_extra_metadata(extra_metadata)
attributes = _build_attributes(
content_type=content_type,
sha256=sha256,
extra_metadata=extra_metadata,
)
if attributes is not None and not self._supports_attributes:
raise InvalidMetadataError(
f"backend {self._backend!r} does not support object "
f"attributes (content_type / sha256 / extra_metadata). "
f"Use a cloud backend (e.g. r2) if metadata round-trip is required."
)
if if_match is not None and not self._supports_etag_match:
raise InvalidMetadataError(
f"backend {self._backend!r} does not support ETag-conditional "
f"writes (if_match). Use a cloud backend (e.g. r2) for CAS."
)
# S3-compatible stores (including MinIO) do not reliably honour
# obstore's native mode="create" / mode={"e_tag": ...} preconditions
# — the PUT can silently succeed without raising AlreadyExistsError /
# PreconditionError. Mirror the approach used by `copy(overwrite=False)`:
# enforce preconditions with an explicit HEAD round-trip before writing.
# Small TOCTOU window; acceptable for single-writer ingestion workloads.
with _translate_errors():
if if_none_match == "*":
try:
obstore.head(self._inner, key)
except (NotFoundError, FileNotFoundError):
pass
else:
raise ObjectExists(f"object already exists: {key}")
elif if_match is not None:
try:
existing = obstore.head(self._inner, key)
except (NotFoundError, FileNotFoundError) as exc:
raise ETagMismatch(f"object does not exist: {key}") from exc
actual = (existing.get("e_tag") or "").strip('"')
expected = if_match.strip('"')
if actual != expected:
raise ETagMismatch(
f"ETag mismatch for {key!r}: "
f"expected {if_match!r}, got {existing.get('e_tag')!r}"
)
# obstore.put returns only {e_tag, version}; HEAD after to get full
# ObjectMeta (size + last_modified). One extra round-trip; consistent
# across backends.
obstore.put(self._inner, key, data, attributes=attributes)
head = obstore.head(self._inner, key)
return _meta_from_obstore(head, strip_prefix=self._prefix, sha256=sha256)
[docs]
@_trace("copy")
def copy(self, src: str, dst: str, *, overwrite: bool = False) -> ObjectMeta:
# S3 (and therefore R2) does not implement copy-if-not-exists
# natively — `obstore.copy(..., overwrite=False)` raises
# `NotSupportedError`. Emulate at the wrapper: HEAD the
# destination first, raise `ObjectExists` if present, then
# always do an overwriting copy. There's a small TOCTOU window
# — acceptable for the M1 use case (ingestion, no concurrent
# writers); a follow-up can wire in the native primitive on
# backends that support it.
with _translate_errors():
if not overwrite:
try:
obstore.head(self._inner, dst)
except (NotFoundError, FileNotFoundError):
pass
else:
raise ObjectExists(f"copy destination already exists: {dst}")
obstore.copy(self._inner, src, dst, overwrite=True)
head = obstore.head(self._inner, dst)
return _meta_from_obstore(head, strip_prefix=self._prefix)
# ---- discovery -------------------------------------------------------
[docs]
@_trace("head")
def head(self, key: str) -> ObjectMeta:
with _translate_errors():
head = obstore.head(self._inner, key)
return _meta_from_obstore(head, strip_prefix=self._prefix)
[docs]
@_trace("exists")
def exists(self, key: str) -> bool:
# Call obstore.head directly (rather than self.head) so we don't
# produce a nested `ursa.store.head` span inside every
# `ursa.store.exists` span.
try:
with _translate_errors():
obstore.head(self._inner, key)
except ObjectNotFound:
return False
return True
[docs]
@_trace("list")
def list(self, prefix: str = "") -> Iterator[ObjectMeta]:
with _translate_errors():
stream = obstore.list(self._inner, prefix or None)
for chunk in stream:
for raw in chunk:
yield _meta_from_obstore(raw, strip_prefix=self._prefix)
[docs]
@_trace("list_prefixes")
def list_prefixes(self, prefix: str = "") -> Iterator[str]:
with _translate_errors():
result = obstore.list_with_delimiter(self._inner, prefix or None)
for cp in result["common_prefixes"]:
# Strip the configured prefix (S3Store includes it; Local
# doesn't), then normalize trailing '/' for consistency.
cp = self._strip_prefix(cp)
yield cp if cp.endswith("/") else cp + "/"
# ---- mutation --------------------------------------------------------
[docs]
@_trace("delete")
def delete(self, key: str) -> None:
with _translate_errors():
# obstore.delete on Local raises FileNotFoundError on missing;
# _translate_errors maps it to ObjectNotFound. On S3, delete
# is idempotent — head first to give consistent semantics
# across backends.
obstore.head(self._inner, key)
obstore.delete(self._inner, key)
# ---- escape hatch ----------------------------------------------------
[docs]
def raw_obstore(self) -> "_obstore_store.ObjectStore":
return self._inner
[docs]
def lance_connection(self) -> tuple[str, dict[str, str]]:
if not self._lance_uri:
raise NotImplementedError(
f"backend {self._backend!r} does not support lancedb connections. "
f"The backend factory (e.g. ursa.store.backends.{self._backend}.build_*_store) "
f"must construct ObstoreBackend with lance_uri=... and "
f"lance_storage_options=... arguments. Returns a fresh dict on "
f"each call so callers can mutate without affecting other consumers."
)
return self._lance_uri, dict(self._lance_storage_options)
[docs]
class _OpenReader(AbstractContextManager[BinaryIO]):
"""Forward-only streaming reader returned by `ObjectStore.open()`.
Implemented over `obstore.get(...).stream()` rather than
`obstore.open_reader` because the latter has a known prefix-handling
bug on S3Store: when the store is constructed with `prefix=`, the
reader double-applies the prefix at request time. `get().stream()`
is consistent across backends.
"""
def __init__(self, inner: "_obstore_store.ObjectStore", key: str) -> None:
self._inner = inner
self._key = key
self._iter: Any = None
[docs]
def __enter__(self) -> BinaryIO:
with _translate_errors():
result = obstore.get(self._inner, self._key)
self._iter = result.stream()
return cast(BinaryIO, _NoSeekStreamReader(self._iter))
[docs]
def __exit__(self, *exc: object) -> None:
# The underlying stream cleans up on its own when iteration
# completes or the iterator is garbage-collected; nothing
# explicit to close.
self._iter = None
[docs]
class _NoSeekStreamReader(io.RawIOBase):
"""Adapt an obstore byte-chunk iterator into a forward-only `BinaryIO`.
Explicitly `seekable() == False`; `seek()` raises so callers can't
silently get wrong bytes.
"""
def __init__(self, chunks: Any) -> None:
super().__init__()
self._chunks = iter(chunks)
self._buf = bytearray()
self._exhausted = False
[docs]
def readable(self) -> bool:
return True
[docs]
def seekable(self) -> bool:
return False
[docs]
def seek(self, offset: int, whence: int = 0) -> int:
raise io.UnsupportedOperation(
"ObjectStore.open() returns a forward-only stream; "
"use ObjectStore.get_range(...) for random access"
)
[docs]
def read(self, size: int = -1) -> bytes:
if size < 0:
# Drain the rest of the stream.
while not self._exhausted:
self._pull_chunk()
out = bytes(self._buf)
self._buf.clear()
return out
while len(self._buf) < size and not self._exhausted:
self._pull_chunk()
out = bytes(self._buf[:size])
del self._buf[:size]
return out
[docs]
def _pull_chunk(self) -> None:
try:
chunk = next(self._chunks)
except StopIteration:
self._exhausted = True
return
self._buf.extend(bytes(chunk))
[docs]
def readinto(self, buffer: Any) -> int:
data = self.read(len(buffer))
n = len(data)
buffer[:n] = data
return n