Source code for ursa.store.base

"""Public surface of the Ursa object-store layer.

`ObjectStore` is the single abstraction Ursa code calls into for blob IO.
Backends live under `ursa.store.backends`; the Pydantic config layer in
`ursa.store.config` and the factory in `ursa.store.factory` decide which
backend a given role binds to.

The wrapper hides the configured prefix from every caller: keys passed in
and returned out are always relative to the store's prefix, which is set
once on construction inside the underlying obstore handle. `raw_obstore()`
exposes the prefix-scoped obstore handle for Lance/Zarr backends that
consume it natively.
"""

from __future__ import annotations

from contextlib import AbstractContextManager
from dataclasses import dataclass
from datetime import datetime
from typing import (
    TYPE_CHECKING,
    BinaryIO,
    Iterator,
    Literal,
    Mapping,
    Protocol,
    runtime_checkable,
)

if TYPE_CHECKING:
    import obstore.store as _obstore_store


__all__ = [
    "ObjectMeta",
    "ObjectStore",
    "ObjectStoreError",
    "ObjectNotFound",
    "ObjectExists",
    "ETagMismatch",
    "InvalidMetadataError",
    "ObjectAccessDenied",
]


[docs] @dataclass(frozen=True, slots=True) class ObjectMeta: """Metadata for one object in a store. `key` is always relative to the store's configured prefix — callers do not see the prefix at any boundary. `sha256` is lifted from `x-amz-meta-sha256` user metadata when present; absent on objects written without it. """ key: str size: int etag: str | None last_modified: datetime sha256: str | None = None
[docs] class ObjectStoreError(RuntimeError): """Base class for all Ursa object-store errors."""
[docs] class ObjectNotFound(ObjectStoreError): """Raised by `head`, `get`, `get_range`, `open`, `delete`, `copy` (src) when the key does not exist in the store."""
[docs] class ObjectExists(ObjectStoreError): """Raised when `if_none_match` precondition fails (the object already exists, or its ETag matched)."""
[docs] class ETagMismatch(ObjectStoreError): """Raised when `if_match` precondition fails (the object's ETag does not match the expected value — concurrent write detected)."""
[docs] class InvalidMetadataError(ObjectStoreError): """Raised by `put` when `extra_metadata` violates S3/R2 user-metadata constraints (key charset, total header size). Validated locally before any network call."""
[docs] class ObjectAccessDenied(ObjectStoreError): """Raised when the underlying R2/S3 service rejects the request as unauthorized (HTTP 401/403). Common cause: writing through a read-only credential, or a credential whose policy does not cover the target prefix."""
[docs] @runtime_checkable class ObjectStore(Protocol): """Generic blob-IO surface used by all Ursa code. Sync-primary; an async sibling will land in a follow-up issue when a real caller (e.g. ENG-893's prefetcher) needs it. Prefix semantics: the configured prefix is invisible to callers. All keys are relative to it; `raw_obstore()` returns a handle that is *also* prefix-scoped, so Lance/Zarr backings consuming the handle write under the same logical namespace as wrapper-owned objects. """ # ---- reads -----------------------------------------------------------
[docs] def get(self, key: str) -> bytes: """Return the full object as bytes. For multi-MB objects use `open()` (streaming) or `get_range()` (random access).""" ...
[docs] def get_range(self, key: str, *, start: int, length: int) -> bytes: """Return `length` bytes from byte offset `start`.""" ...
[docs] def open(self, key: str) -> AbstractContextManager[BinaryIO]: """Return a context manager over a forward-only `BinaryIO` view. The reader does **not** support `seek()`. Use `get_range()` for random access. """ ...
# ---- writes ----------------------------------------------------------
[docs] def put( self, key: str, data: bytes | BinaryIO, *, content_type: str | None = None, sha256: str | None = None, extra_metadata: Mapping[str, str] | None = None, if_none_match: Literal["*"] | None = None, if_match: str | None = None, ) -> ObjectMeta: """Write `data` to `key`. Conditional writes (mutually exclusive): - `if_none_match="*"` — fail with `ObjectExists` if the key already exists. Create-if-not-exists semantics. - `if_match=etag` — fail with `ETagMismatch` if the current object's ETag is not `etag`. Compare-and-swap updates. `extra_metadata` keys are validated locally against the S3/R2 user-metadata charset (`[a-z0-9_-]+`) and total header size. """ ...
[docs] def copy( self, src: str, dst: str, *, overwrite: bool = False, ) -> ObjectMeta: """Server-side copy from `src` to `dst` (no host round-trip). `overwrite=False` (default) raises `ObjectExists` if `dst` already exists. ETag-conditional copy is not supported by the underlying obstore API; if you need CAS semantics, do a `get` + `put(if_match=...)` instead. """ ...
# ---- discovery -------------------------------------------------------
[docs] def head(self, key: str) -> ObjectMeta: """Return metadata for `key`. Raises `ObjectNotFound` if absent.""" ...
[docs] def exists(self, key: str) -> bool: """`True` if `key` exists in the store, `False` otherwise.""" ...
[docs] def list(self, prefix: str = "") -> Iterator[ObjectMeta]: """Recursively yield objects whose keys start with `prefix`. Returned `ObjectMeta.key` values are relative to the store's configured prefix. """ ...
[docs] def list_prefixes(self, prefix: str = "") -> Iterator[str]: """Yield one-level common prefixes under `prefix` (delimiter '/'). Returns `str` rather than `ObjectMeta` because prefix entries have no size / ETag / last-modified. """ ...
# ---- mutation --------------------------------------------------------
[docs] def delete(self, key: str) -> None: """Delete `key`. Raises `ObjectNotFound` if absent.""" ...
# ---- escape hatch ----------------------------------------------------
[docs] def raw_obstore(self) -> _obstore_store.ObjectStore: """Return the underlying, already-prefix-scoped obstore handle. For Lance/Zarr backends only — those libraries accept obstore handles natively. Common callers should use the wrapped surface above. """ ...
[docs] def lance_connection(self) -> tuple[str, dict[str, str]]: """Return ``(uri, storage_options)`` for ``lancedb.connect()``. Returns ------- (uri, storage_options) ``uri`` is the lancedb-compatible root for this store (filesystem path for local, ``s3://<bucket>/<prefix>`` for R2, with no trailing slash). Callers append their own subpath if they want to nest data under a sub-namespace — ``Catalog`` appends ``/catalog`` for example. ``storage_options`` carries everything lancedb needs beyond the URI scheme. For R2 this MUST include ``endpoint=`` (without it the S3 client targets AWS and fails opaquely), plus ``access_key_id``, ``secret_access_key``, and ``region``. Local stores return an empty dict. The returned dict is a fresh copy on each call; mutations are not reflected back into the store. """ ...