"""Public surface of the Ursa object-store layer.
`ObjectStore` is the single abstraction Ursa code calls into for blob IO.
Backends live under `ursa.store.backends`; the Pydantic config layer in
`ursa.store.config` and the factory in `ursa.store.factory` decide which
backend a given role binds to.
The wrapper hides the configured prefix from every caller: keys passed in
and returned out are always relative to the store's prefix, which is set
once on construction inside the underlying obstore handle. `raw_obstore()`
exposes the prefix-scoped obstore handle for Lance/Zarr backends that
consume it natively.
"""
from __future__ import annotations
from contextlib import AbstractContextManager
from dataclasses import dataclass
from datetime import datetime
from typing import (
TYPE_CHECKING,
BinaryIO,
Iterator,
Literal,
Mapping,
Protocol,
runtime_checkable,
)
if TYPE_CHECKING:
import obstore.store as _obstore_store
__all__ = [
"ObjectMeta",
"ObjectStore",
"ObjectStoreError",
"ObjectNotFound",
"ObjectExists",
"ETagMismatch",
"InvalidMetadataError",
"ObjectAccessDenied",
]
[docs]
class ObjectStoreError(RuntimeError):
"""Base class for all Ursa object-store errors."""
[docs]
class ObjectNotFound(ObjectStoreError):
"""Raised by `head`, `get`, `get_range`, `open`, `delete`, `copy` (src)
when the key does not exist in the store."""
[docs]
class ObjectExists(ObjectStoreError):
"""Raised when `if_none_match` precondition fails (the object already
exists, or its ETag matched)."""
[docs]
class ETagMismatch(ObjectStoreError):
"""Raised when `if_match` precondition fails (the object's ETag does
not match the expected value — concurrent write detected)."""
[docs]
class ObjectAccessDenied(ObjectStoreError):
"""Raised when the underlying R2/S3 service rejects the request as
unauthorized (HTTP 401/403). Common cause: writing through a
read-only credential, or a credential whose policy does not cover
the target prefix."""
[docs]
@runtime_checkable
class ObjectStore(Protocol):
"""Generic blob-IO surface used by all Ursa code.
Sync-primary; an async sibling will land in a follow-up issue when a
real caller (e.g. ENG-893's prefetcher) needs it.
Prefix semantics: the configured prefix is invisible to callers. All
keys are relative to it; `raw_obstore()` returns a handle that is
*also* prefix-scoped, so Lance/Zarr backings consuming the handle
write under the same logical namespace as wrapper-owned objects.
"""
# ---- reads -----------------------------------------------------------
[docs]
def get(self, key: str) -> bytes:
"""Return the full object as bytes. For multi-MB objects use
`open()` (streaming) or `get_range()` (random access)."""
...
[docs]
def get_range(self, key: str, *, start: int, length: int) -> bytes:
"""Return `length` bytes from byte offset `start`."""
...
[docs]
def open(self, key: str) -> AbstractContextManager[BinaryIO]:
"""Return a context manager over a forward-only `BinaryIO` view.
The reader does **not** support `seek()`. Use `get_range()` for
random access.
"""
...
# ---- writes ----------------------------------------------------------
[docs]
def put(
self,
key: str,
data: bytes | BinaryIO,
*,
content_type: str | None = None,
sha256: str | None = None,
extra_metadata: Mapping[str, str] | None = None,
if_none_match: Literal["*"] | None = None,
if_match: str | None = None,
) -> ObjectMeta:
"""Write `data` to `key`.
Conditional writes (mutually exclusive):
- `if_none_match="*"` — fail with `ObjectExists` if the key
already exists. Create-if-not-exists semantics.
- `if_match=etag` — fail with `ETagMismatch` if the current
object's ETag is not `etag`. Compare-and-swap updates.
`extra_metadata` keys are validated locally against the S3/R2
user-metadata charset (`[a-z0-9_-]+`) and total header size.
"""
...
[docs]
def copy(
self,
src: str,
dst: str,
*,
overwrite: bool = False,
) -> ObjectMeta:
"""Server-side copy from `src` to `dst` (no host round-trip).
`overwrite=False` (default) raises `ObjectExists` if `dst`
already exists. ETag-conditional copy is not supported by the
underlying obstore API; if you need CAS semantics, do a
`get` + `put(if_match=...)` instead.
"""
...
# ---- discovery -------------------------------------------------------
[docs]
def head(self, key: str) -> ObjectMeta:
"""Return metadata for `key`. Raises `ObjectNotFound` if absent."""
...
[docs]
def exists(self, key: str) -> bool:
"""`True` if `key` exists in the store, `False` otherwise."""
...
[docs]
def list(self, prefix: str = "") -> Iterator[ObjectMeta]:
"""Recursively yield objects whose keys start with `prefix`.
Returned `ObjectMeta.key` values are relative to the store's
configured prefix.
"""
...
[docs]
def list_prefixes(self, prefix: str = "") -> Iterator[str]:
"""Yield one-level common prefixes under `prefix` (delimiter '/').
Returns `str` rather than `ObjectMeta` because prefix entries have
no size / ETag / last-modified.
"""
...
# ---- mutation --------------------------------------------------------
[docs]
def delete(self, key: str) -> None:
"""Delete `key`. Raises `ObjectNotFound` if absent."""
...
# ---- escape hatch ----------------------------------------------------
[docs]
def raw_obstore(self) -> _obstore_store.ObjectStore:
"""Return the underlying, already-prefix-scoped obstore handle.
For Lance/Zarr backends only — those libraries accept obstore
handles natively. Common callers should use the wrapped surface
above.
"""
...
[docs]
def lance_connection(self) -> tuple[str, dict[str, str]]:
"""Return ``(uri, storage_options)`` for ``lancedb.connect()``.
Returns
-------
(uri, storage_options)
``uri`` is the lancedb-compatible root for this store
(filesystem path for local, ``s3://<bucket>/<prefix>`` for
R2, with no trailing slash). Callers append their own
subpath if they want to nest data under a sub-namespace —
``Catalog`` appends ``/catalog`` for example.
``storage_options`` carries everything lancedb needs beyond
the URI scheme. For R2 this MUST include ``endpoint=``
(without it the S3 client targets AWS and fails opaquely),
plus ``access_key_id``, ``secret_access_key``, and
``region``. Local stores return an empty dict.
The returned dict is a fresh copy on each call; mutations
are not reflected back into the store.
"""
...