Source code for ursa.catalog._filters

"""Compile a filter spec into a lancedb ``where=`` SQL clause.

M2 supports the small set of predicates ``ursa.query`` (ENG-889) needs:

* equality on a top-level column: ``{"col": "value"}``
* set membership on a top-level column: ``{"col": ["a", "b"]}``
* half-open range on a top-level column: ``{"col": ("between", lo, hi)}``
* raw SQL passthrough: ``where="col = 'x' AND ..."``

Filters that target a key inside a ``MetadataDict`` column
(``"metadata.session": "morning"``) raise ``NotImplementedError`` —
metadata is JSON-blobbed in M2, so pushdown filtering is impossible
without the M3 promotion to Lance ``MapType`` + hot-key columns
(ENG-1066).

DataFusion (Lance's underlying SQL engine) does *not* honor C-style
backslash escapes in string literals. Only single quotes inside string
values need escaping, and the convention is to double them
(``'`` → ``''``). Backslashes pass through unchanged.
"""

from __future__ import annotations

from collections.abc import Mapping
from datetime import datetime, timezone
from typing import Any

__all__ = ["compile_where", "MetadataPushdownNotImplemented"]


[docs] class MetadataPushdownNotImplemented(NotImplementedError): """Raised when ``compile_where`` receives a ``"metadata.<key>"`` filter. Tracked by ENG-1066 (Lance MapType + hot-key promotion). """
[docs] def compile_where(filters: str | Mapping[str, Any] | None) -> str | None: """Translate a filter spec into a DataFusion SQL string. ``None`` and the empty mapping return ``None`` so callers can pass the result straight to ``LanceTable.search().where(...)``. """ if filters is None: return None if isinstance(filters, str): return filters or None if not isinstance(filters, Mapping): raise TypeError(f"filters must be str | Mapping | None, got {type(filters).__name__}") if not filters: return None clauses: list[str] = [] for col, value in filters.items(): if "." in col: raise MetadataPushdownNotImplemented( f"metadata-key filter {col!r} requires Lance MapType pushdown; " f"tracked by ENG-1066. Workaround: scan() and filter in Python." ) clauses.append(_compile_one(col, value)) return " AND ".join(clauses)
[docs] def _compile_one(col: str, value: Any) -> str: if isinstance(value, tuple) and len(value) == 3 and value[0] == "between": _, lo, hi = value return f"{col} >= {_lit(lo)} AND {col} < {_lit(hi)}" if isinstance(value, list): if not value: # Empty IN-list is invalid SQL across engines; emit a tautology # that selects no rows so the call site doesn't have to special-case. return "1 = 0" rendered = ", ".join(_lit(v) for v in value) return f"{col} IN ({rendered})" return f"{col} = {_lit(value)}"
[docs] def _lit(value: Any) -> str: """Render a Python value as a DataFusion SQL literal.""" if value is None: return "NULL" if isinstance(value, bool): # bool is a subclass of int; check it first. return "TRUE" if value else "FALSE" if isinstance(value, (int, float)): return repr(value) if isinstance(value, datetime): if value.tzinfo is None: raise ValueError(f"datetime literal must be timezone-aware: {value!r}") # DataFusion's TIMESTAMP literal grammar accepts ISO 8601 strings. # Normalize to UTC for consistency with the catalog's UTC convention. utc = value.astimezone(timezone.utc) return f"TIMESTAMP '{utc.isoformat()}'" if isinstance(value, str): escaped = value.replace("'", "''") return f"'{escaped}'" raise TypeError(f"unsupported filter value type: {type(value).__name__} ({value!r})")