diff --git a/mkdocs/docs/geospatial.md b/mkdocs/docs/geospatial.md index f7b8433b49..30aac5d383 100644 --- a/mkdocs/docs/geospatial.md +++ b/mkdocs/docs/geospatial.md @@ -84,11 +84,11 @@ point_wkb = bytes.fromhex("0101000000000000000000000000000000000000") 1. **WKB/WKT Conversion**: Converting between WKB bytes and WKT strings requires external libraries (like Shapely). PyIceberg does not include this conversion to avoid heavy dependencies. -2. **Spatial Predicates**: Spatial filtering (e.g., ST_Contains, ST_Intersects) is not yet supported for query pushdown. +2. **Spatial Predicates Execution**: Spatial predicate APIs (`st-contains`, `st-intersects`, `st-within`, `st-overlaps`) are available in expression trees and binding. Row-level execution and metrics/pushdown evaluation are not implemented yet. -3. **Bounds Metrics**: Geometry/geography columns do not currently contribute to data file bounds metrics. +3. **Without geoarrow-pyarrow**: When the `geoarrow-pyarrow` package is not installed, geometry and geography columns are stored as binary without GeoArrow extension type metadata. The Iceberg schema preserves type information, but other tools reading the Parquet files directly may not recognize them as spatial types. Install with `pip install pyiceberg[geoarrow]` for full GeoArrow support. -4. **Without geoarrow-pyarrow**: When the `geoarrow-pyarrow` package is not installed, geometry and geography columns are stored as binary without GeoArrow extension type metadata. The Iceberg schema preserves type information, but other tools reading the Parquet files directly may not recognize them as spatial types. Install with `pip install pyiceberg[geoarrow]` for full GeoArrow support. +4. **GeoArrow planar ambiguity**: In GeoArrow metadata, `geometry` and `geography(..., 'planar')` can be encoded identically (no explicit edge metadata). PyIceberg resolves this ambiguity at the Arrow/Parquet schema-compatibility boundary by treating them as compatible when CRS matches, while keeping core schema compatibility strict elsewhere. ## Format Version diff --git a/mkdocs/docs/plans/Geospatial_PR_How_To_Review.md b/mkdocs/docs/plans/Geospatial_PR_How_To_Review.md new file mode 100644 index 0000000000..cdde383564 --- /dev/null +++ b/mkdocs/docs/plans/Geospatial_PR_How_To_Review.md @@ -0,0 +1,75 @@ +# How To Review: Geospatial Compatibility, Metrics, and Expressions PR + +## Goal + +This PR is large because it spans expression APIs, Arrow/Parquet conversion, metrics generation, and documentation. +Recommended strategy: review in focused passes by concern, not file order. + +## Recommended Review Order + +1. **Core geospatial utility correctness** + - `pyiceberg/utils/geospatial.py` + - `tests/utils/test_geospatial.py` + - Focus on envelope extraction, antimeridian behavior, and bound encoding formats. + +1. **Metrics integration and write/import paths** + - `pyiceberg/io/pyarrow.py` + - `tests/io/test_pyarrow_stats.py` + - Focus on: + - geospatial bounds derived from row WKB values + - skipping Parquet binary min/max for geospatial columns + - partition inference not using geospatial envelope bounds + +1. **GeoArrow compatibility and ambiguity boundary** + - `pyiceberg/schema.py` + - `pyiceberg/io/pyarrow.py` + - `tests/io/test_pyarrow.py` + - Confirm: + - planar equivalence enabled only at Arrow/Parquet boundary + - spherical mismatch still fails + - fallback warning when GeoArrow dependency is absent + +1. **Spatial expression surface area** + - `pyiceberg/expressions/__init__.py` + - `pyiceberg/expressions/visitors.py` + - `tests/expressions/test_spatial_predicates.py` + - Confirm: + - bind-time type checks (geometry/geography only) + - visitor plumbing is complete + - conservative evaluator behavior is explicit and documented + +1. **User-facing docs** + - `mkdocs/docs/geospatial.md` + - Check limitations and behavior notes match implementation. + +## High-Risk Areas To Inspect Closely + +1. **Boundary scope leakage** + - Ensure planar-equivalence relaxation is not enabled globally. + +2. **Envelope semantics** + - Geography antimeridian cases (`xmin > xmax`) are expected and intentional. + +3. **Metrics correctness** + - Geospatial bounds are serialized envelopes, not raw value min/max. + +4. **Conservative evaluator behavior** + - Spatial predicates should not accidentally become strict in metrics/manifest evaluators. + +## Quick Validation Commands + +```bash +uv run --extra hive --extra bigquery python -m pytest tests/utils/test_geospatial.py -q +uv run --extra hive --extra bigquery python -m pytest tests/io/test_pyarrow_stats.py -k "geospatial or planar_geography_schema or partition_inference_skips_geospatial_bounds" -q +uv run --extra hive --extra bigquery python -m pytest tests/io/test_pyarrow.py -k "geoarrow or planar_geography_geometry_equivalence or spherical_geography_geometry_equivalence or logs_warning_once" -q +uv run --extra hive --extra bigquery python -m pytest tests/expressions/test_spatial_predicates.py tests/expressions/test_visitors.py -k "spatial or translate_column_names" -q +``` + +## Review Outcome Checklist + +1. Geometry/geography bounds are present and correctly encoded for write/import paths. +2. `geometry` vs `geography(planar)` is only equivalent at Arrow/Parquet compatibility boundary with CRS equality. +3. `geography(spherical)` remains incompatible with `geometry`. +4. Spatial predicates are correctly modeled/bound; execution and pushdown remain intentionally unimplemented. +5. Missing GeoArrow dependency degrades gracefully with explicit warning. +6. Docs match implemented behavior and limitations. diff --git a/pyiceberg/expressions/__init__.py b/pyiceberg/expressions/__init__.py index 3910a146c7..019200e0de 100644 --- a/pyiceberg/expressions/__init__.py +++ b/pyiceberg/expressions/__init__.py @@ -17,10 +17,11 @@ from __future__ import annotations +import builtins from abc import ABC, abstractmethod from collections.abc import Callable, Iterable, Sequence from functools import cached_property -from typing import Any, TypeAlias +from typing import Any, TypeAlias, cast from typing import Literal as TypingLiteral from pydantic import ConfigDict, Field, SerializeAsAny, model_validator @@ -29,7 +30,7 @@ from pyiceberg.expressions.literals import AboveMax, BelowMin, Literal, literal from pyiceberg.schema import Accessor, Schema from pyiceberg.typedef import IcebergBaseModel, IcebergRootModel, L, LiteralValue, StructProtocol -from pyiceberg.types import DoubleType, FloatType, NestedField +from pyiceberg.types import DoubleType, FloatType, GeographyType, GeometryType, NestedField from pyiceberg.utils.singleton import Singleton @@ -48,6 +49,16 @@ def _to_literal(value: L | Literal[L]) -> Literal[L]: return literal(value) +def _to_bytes(value: bytes | bytearray | memoryview) -> bytes: + if isinstance(value, bytes): + return value + if isinstance(value, bytearray): + return bytes(value) + if isinstance(value, memoryview): + return value.tobytes() + raise TypeError(f"Expected bytes-like value, got {type(value)}") + + class BooleanExpression(IcebergBaseModel, ABC): """An expression that evaluates to a boolean.""" @@ -109,6 +120,14 @@ def handle_primitive_type(cls, v: Any, handler: ValidatorFunctionWrapHandler) -> return StartsWith(**v) elif field_type == "not-starts-with": return NotStartsWith(**v) + elif field_type == "st-contains": + return STContains(**v) + elif field_type == "st-intersects": + return STIntersects(**v) + elif field_type == "st-within": + return STWithin(**v) + elif field_type == "st-overlaps": + return STOverlaps(**v) # Set elif field_type == "in": @@ -1106,3 +1125,169 @@ def __invert__(self) -> StartsWith: @property def as_bound(self) -> type[BoundNotStartsWith]: # type: ignore return BoundNotStartsWith + + +class SpatialPredicate(UnboundPredicate, ABC): + type: TypingLiteral["st-contains", "st-intersects", "st-within", "st-overlaps"] = Field(alias="type") + term: UnboundTerm + value: bytes = Field() + model_config = ConfigDict(populate_by_name=True, frozen=True, arbitrary_types_allowed=True) + + def __init__( + self, + term: str | UnboundTerm, + geometry: bytes | bytearray | memoryview | None = None, + **kwargs: Any, + ) -> None: + if geometry is None and "value" in kwargs: + geometry = kwargs["value"] + if geometry is None: + raise TypeError("Spatial predicates require WKB bytes") + + super().__init__(term=_to_unbound_term(term), value=_to_bytes(geometry)) + + @property + def geometry(self) -> bytes: + return self.value + + def bind(self, schema: Schema, case_sensitive: bool = True) -> BoundSpatialPredicate: + bound_term = self.term.bind(schema, case_sensitive) + if not isinstance(bound_term.ref().field.field_type, (GeometryType, GeographyType)): + raise TypeError(f"Spatial predicates can only be bound against geometry/geography fields: {bound_term.ref().field}") + bound_cls = cast(Any, self.as_bound) + return bound_cls(bound_term, self.geometry) + + def __eq__(self, other: Any) -> bool: + """Return whether two spatial predicates are equivalent.""" + if isinstance(other, self.__class__): + return self.term == other.term and self.geometry == other.geometry + return False + + def __str__(self) -> str: + """Return a human-readable representation.""" + return f"{str(self.__class__.__name__)}(term={repr(self.term)}, geometry={self.geometry!r})" + + def __repr__(self) -> str: + """Return the debug representation.""" + return f"{str(self.__class__.__name__)}(term={repr(self.term)}, geometry={self.geometry!r})" + + @property + @abstractmethod + def as_bound(self) -> builtins.type[BoundSpatialPredicate]: ... + + +class BoundSpatialPredicate(BoundPredicate, ABC): + value: bytes = Field() + + def __init__(self, term: BoundTerm, geometry: bytes | bytearray | memoryview): + super().__init__(term=term, value=_to_bytes(geometry)) + + @property + def geometry(self) -> bytes: + return self.value + + def __eq__(self, other: Any) -> bool: + """Return whether two bound spatial predicates are equivalent.""" + if isinstance(other, self.__class__): + return self.term == other.term and self.geometry == other.geometry + return False + + def __str__(self) -> str: + """Return a human-readable representation.""" + return f"{self.__class__.__name__}(term={str(self.term)}, geometry={self.geometry!r})" + + def __repr__(self) -> str: + """Return the debug representation.""" + return f"{str(self.__class__.__name__)}(term={repr(self.term)}, geometry={self.geometry!r})" + + @property + @abstractmethod + def as_unbound(self) -> type[SpatialPredicate]: ... + + +class BoundSTContains(BoundSpatialPredicate): + def __invert__(self) -> BooleanExpression: + """Return the negated expression.""" + return Not(child=self) + + @property + def as_unbound(self) -> type[STContains]: + return STContains + + +class BoundSTIntersects(BoundSpatialPredicate): + def __invert__(self) -> BooleanExpression: + """Return the negated expression.""" + return Not(child=self) + + @property + def as_unbound(self) -> type[STIntersects]: + return STIntersects + + +class BoundSTWithin(BoundSpatialPredicate): + def __invert__(self) -> BooleanExpression: + """Return the negated expression.""" + return Not(child=self) + + @property + def as_unbound(self) -> type[STWithin]: + return STWithin + + +class BoundSTOverlaps(BoundSpatialPredicate): + def __invert__(self) -> BooleanExpression: + """Return the negated expression.""" + return Not(child=self) + + @property + def as_unbound(self) -> type[STOverlaps]: + return STOverlaps + + +class STContains(SpatialPredicate): + type: TypingLiteral["st-contains"] = Field(default="st-contains", alias="type") + + def __invert__(self) -> BooleanExpression: + """Return the negated expression.""" + return Not(child=self) + + @property + def as_bound(self) -> builtins.type[BoundSTContains]: + return BoundSTContains + + +class STIntersects(SpatialPredicate): + type: TypingLiteral["st-intersects"] = Field(default="st-intersects", alias="type") + + def __invert__(self) -> BooleanExpression: + """Return the negated expression.""" + return Not(child=self) + + @property + def as_bound(self) -> builtins.type[BoundSTIntersects]: + return BoundSTIntersects + + +class STWithin(SpatialPredicate): + type: TypingLiteral["st-within"] = Field(default="st-within", alias="type") + + def __invert__(self) -> BooleanExpression: + """Return the negated expression.""" + return Not(child=self) + + @property + def as_bound(self) -> builtins.type[BoundSTWithin]: + return BoundSTWithin + + +class STOverlaps(SpatialPredicate): + type: TypingLiteral["st-overlaps"] = Field(default="st-overlaps", alias="type") + + def __invert__(self) -> BooleanExpression: + """Return the negated expression.""" + return Not(child=self) + + @property + def as_bound(self) -> builtins.type[BoundSTOverlaps]: + return BoundSTOverlaps diff --git a/pyiceberg/expressions/visitors.py b/pyiceberg/expressions/visitors.py index 0beb0f3df0..ea87f634a6 100644 --- a/pyiceberg/expressions/visitors.py +++ b/pyiceberg/expressions/visitors.py @@ -47,7 +47,12 @@ BoundNotStartsWith, BoundPredicate, BoundSetPredicate, + BoundSpatialPredicate, BoundStartsWith, + BoundSTContains, + BoundSTIntersects, + BoundSTOverlaps, + BoundSTWithin, BoundTerm, BoundUnaryPredicate, Not, @@ -326,6 +331,18 @@ def visit_starts_with(self, term: BoundTerm, literal: LiteralValue) -> T: def visit_not_starts_with(self, term: BoundTerm, literal: LiteralValue) -> T: """Visit bound NotStartsWith predicate.""" + def visit_st_contains(self, term: BoundTerm, geometry: bytes) -> T: + raise NotImplementedError(f"{self.__class__.__name__} does not implement st-contains") + + def visit_st_intersects(self, term: BoundTerm, geometry: bytes) -> T: + raise NotImplementedError(f"{self.__class__.__name__} does not implement st-intersects") + + def visit_st_within(self, term: BoundTerm, geometry: bytes) -> T: + raise NotImplementedError(f"{self.__class__.__name__} does not implement st-within") + + def visit_st_overlaps(self, term: BoundTerm, geometry: bytes) -> T: + raise NotImplementedError(f"{self.__class__.__name__} does not implement st-overlaps") + def visit_unbound_predicate(self, predicate: UnboundPredicate) -> T: """Visit an unbound predicate. @@ -421,6 +438,26 @@ def _(expr: BoundNotStartsWith, visitor: BoundBooleanExpressionVisitor[T]) -> T: return visitor.visit_not_starts_with(term=expr.term, literal=expr.literal) +@visit_bound_predicate.register(BoundSTContains) +def _(expr: BoundSTContains, visitor: BoundBooleanExpressionVisitor[T]) -> T: + return visitor.visit_st_contains(term=expr.term, geometry=expr.geometry) + + +@visit_bound_predicate.register(BoundSTIntersects) +def _(expr: BoundSTIntersects, visitor: BoundBooleanExpressionVisitor[T]) -> T: + return visitor.visit_st_intersects(term=expr.term, geometry=expr.geometry) + + +@visit_bound_predicate.register(BoundSTWithin) +def _(expr: BoundSTWithin, visitor: BoundBooleanExpressionVisitor[T]) -> T: + return visitor.visit_st_within(term=expr.term, geometry=expr.geometry) + + +@visit_bound_predicate.register(BoundSTOverlaps) +def _(expr: BoundSTOverlaps, visitor: BoundBooleanExpressionVisitor[T]) -> T: + return visitor.visit_st_overlaps(term=expr.term, geometry=expr.geometry) + + def rewrite_not(expr: BooleanExpression) -> BooleanExpression: return visit(expr, _RewriteNotVisitor()) @@ -514,6 +551,18 @@ def visit_starts_with(self, term: BoundTerm, literal: LiteralValue) -> bool: def visit_not_starts_with(self, term: BoundTerm, literal: LiteralValue) -> bool: return not self.visit_starts_with(term, literal) + def visit_st_contains(self, term: BoundTerm, geometry: bytes) -> bool: + raise NotImplementedError("st-contains row-level evaluation is not implemented") + + def visit_st_intersects(self, term: BoundTerm, geometry: bytes) -> bool: + raise NotImplementedError("st-intersects row-level evaluation is not implemented") + + def visit_st_within(self, term: BoundTerm, geometry: bytes) -> bool: + raise NotImplementedError("st-within row-level evaluation is not implemented") + + def visit_st_overlaps(self, term: BoundTerm, geometry: bytes) -> bool: + raise NotImplementedError("st-overlaps row-level evaluation is not implemented") + def visit_true(self) -> bool: return True @@ -762,6 +811,18 @@ def visit_not_starts_with(self, term: BoundTerm, literal: LiteralValue) -> bool: return ROWS_MIGHT_MATCH + def visit_st_contains(self, term: BoundTerm, geometry: bytes) -> bool: + return ROWS_MIGHT_MATCH + + def visit_st_intersects(self, term: BoundTerm, geometry: bytes) -> bool: + return ROWS_MIGHT_MATCH + + def visit_st_within(self, term: BoundTerm, geometry: bytes) -> bool: + return ROWS_MIGHT_MATCH + + def visit_st_overlaps(self, term: BoundTerm, geometry: bytes) -> bool: + return ROWS_MIGHT_MATCH + def visit_true(self) -> bool: return ROWS_MIGHT_MATCH @@ -905,6 +966,8 @@ def visit_bound_predicate(self, predicate: BoundPredicate) -> BooleanExpression: pred = predicate.as_unbound(field.name, predicate.literal) elif isinstance(predicate, BoundSetPredicate): pred = predicate.as_unbound(field.name, predicate.literals) + elif isinstance(predicate, BoundSpatialPredicate): + raise NotImplementedError("Spatial predicate translation is not supported when source columns are missing") else: raise ValueError(f"Unsupported predicate: {predicate}") @@ -926,6 +989,8 @@ def visit_bound_predicate(self, predicate: BoundPredicate) -> BooleanExpression: return predicate.as_unbound(file_column_name, predicate.literal) elif isinstance(predicate, BoundSetPredicate): return predicate.as_unbound(file_column_name, predicate.literals) + elif isinstance(predicate, BoundSpatialPredicate): + return predicate.as_unbound(file_column_name, predicate.geometry) else: raise ValueError(f"Unsupported predicate: {predicate}") @@ -1065,6 +1130,18 @@ def visit_starts_with(self, term: BoundTerm, literal: LiteralValue) -> list[tupl def visit_not_starts_with(self, term: BoundTerm, literal: LiteralValue) -> list[tuple[str, str, Any]]: return [] + def visit_st_contains(self, term: BoundTerm, geometry: bytes) -> list[tuple[str, str, Any]]: + return [] + + def visit_st_intersects(self, term: BoundTerm, geometry: bytes) -> list[tuple[str, str, Any]]: + return [] + + def visit_st_within(self, term: BoundTerm, geometry: bytes) -> list[tuple[str, str, Any]]: + return [] + + def visit_st_overlaps(self, term: BoundTerm, geometry: bytes) -> list[tuple[str, str, Any]]: + return [] + def visit_true(self) -> list[tuple[str, str, Any]]: return [] # Not supported @@ -1153,6 +1230,18 @@ def _is_nan(self, val: Any) -> bool: # In the case of None or other non-numeric types return False + def visit_st_contains(self, term: BoundTerm, geometry: bytes) -> bool: + return ROWS_MIGHT_MATCH + + def visit_st_intersects(self, term: BoundTerm, geometry: bytes) -> bool: + return ROWS_MIGHT_MATCH + + def visit_st_within(self, term: BoundTerm, geometry: bytes) -> bool: + return ROWS_MIGHT_MATCH + + def visit_st_overlaps(self, term: BoundTerm, geometry: bytes) -> bool: + return ROWS_MIGHT_MATCH + class _InclusiveMetricsEvaluator(_MetricsEvaluator): struct: StructType @@ -1739,6 +1828,18 @@ def visit_starts_with(self, term: BoundTerm, literal: LiteralValue) -> bool: def visit_not_starts_with(self, term: BoundTerm, literal: LiteralValue) -> bool: return ROWS_MIGHT_NOT_MATCH + def visit_st_contains(self, term: BoundTerm, geometry: bytes) -> bool: + return ROWS_MIGHT_NOT_MATCH + + def visit_st_intersects(self, term: BoundTerm, geometry: bytes) -> bool: + return ROWS_MIGHT_NOT_MATCH + + def visit_st_within(self, term: BoundTerm, geometry: bytes) -> bool: + return ROWS_MIGHT_NOT_MATCH + + def visit_st_overlaps(self, term: BoundTerm, geometry: bytes) -> bool: + return ROWS_MIGHT_NOT_MATCH + def _get_field(self, field_id: int) -> NestedField: field = self.struct.field(field_id=field_id) if field is None: diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 0dfc5eb55a..6b870b6139 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -30,6 +30,7 @@ import functools import importlib import itertools +import json import logging import operator import os @@ -180,6 +181,12 @@ from pyiceberg.utils.config import Config from pyiceberg.utils.datetime import millis_to_datetime from pyiceberg.utils.decimal import unscaled_to_decimal +from pyiceberg.utils.geospatial import ( + GeometryEnvelope, + extract_envelope_from_wkb, + merge_envelopes, + serialize_geospatial_bound, +) from pyiceberg.utils.properties import get_first_property_value, property_as_bool, property_as_int from pyiceberg.utils.singleton import Singleton from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, truncate_upper_bound_text_string @@ -207,6 +214,14 @@ T = TypeVar("T") +@lru_cache(maxsize=1) +def _warn_geoarrow_unavailable() -> None: + logger.warning( + "geoarrow-pyarrow is not installed; falling back to binary for geometry/geography columns. " + "Install pyiceberg with the geoarrow extra to preserve GeoArrow metadata in Parquet." + ) + + @lru_cache def _cached_resolve_s3_region(bucket: str) -> str | None: from pyarrow.fs import resolve_s3_region @@ -812,6 +827,7 @@ def visit_geometry(self, geometry_type: GeometryType) -> pa.DataType: return ga.wkb().with_crs(geometry_type.crs) except ImportError: + _warn_geoarrow_unavailable() return pa.large_binary() def visit_geography(self, geography_type: GeographyType) -> pa.DataType: @@ -830,6 +846,7 @@ def visit_geography(self, geography_type: GeographyType) -> pa.DataType: # "planar" is the default edge type in GeoArrow, no need to set explicitly return wkb_type except ImportError: + _warn_geoarrow_unavailable() return pa.large_binary() @@ -1341,6 +1358,51 @@ def _get_field_id(field: pa.Field) -> int | None: return None +def _geoarrow_wkb_to_iceberg(primitive: pa.DataType) -> PrimitiveType | None: + if not isinstance(primitive, pa.ExtensionType) or primitive.extension_name != "geoarrow.wkb": + return None + + # Default CRS in the Iceberg spec for both geometry and geography. + crs = "OGC:CRS84" + + # Avoid conversions that may require optional CRS dependencies. + primitive_crs = getattr(primitive, "crs", None) + raw_crs = getattr(primitive_crs, "_crs", None) + if isinstance(raw_crs, str) and raw_crs: + crs = raw_crs + elif isinstance(primitive_crs, str) and primitive_crs: + crs = primitive_crs + + edges: str | None = None + try: + serialized = primitive.__arrow_ext_serialize__() + if serialized: + payload = json.loads(serialized.decode("utf-8")) + if isinstance(payload, dict): + if isinstance(payload.get("crs"), str) and payload["crs"]: + crs = payload["crs"] + if isinstance(payload.get("edges"), str): + edges = payload["edges"].lower() + except (AttributeError, UnicodeDecodeError, json.JSONDecodeError): + pass + + if edges is None: + edge_type = getattr(primitive, "edge_type", None) + edge_name = getattr(edge_type, "name", None) + if isinstance(edge_name, str) and edge_name.lower() == "spherical": + edges = edge_name.lower() + + if edges == "spherical": + return GeographyType(crs, "spherical") + if edges == "planar": + return GeographyType(crs, "planar") + + # GeoArrow WKB without explicit edge semantics maps best to geometry. + # This is ambiguous with geography(planar); compatibility for that case is handled + # explicitly at the Arrow/Parquet schema-compatibility boundary. + return GeometryType(crs) + + class _HasIds(PyArrowSchemaVisitor[bool]): def schema(self, schema: pa.Schema, struct_result: bool) -> bool: return struct_result @@ -1466,6 +1528,8 @@ def primitive(self, primitive: pa.DataType) -> PrimitiveType: return TimestamptzType() elif primitive.tz is None: return TimestampType() + elif geo_type := _geoarrow_wkb_to_iceberg(primitive): + return geo_type elif pa.types.is_binary(primitive) or pa.types.is_large_binary(primitive) or pa.types.is_binary_view(primitive): return BinaryType() @@ -2248,6 +2312,58 @@ def max_as_bytes(self) -> bytes | None: return self.serialize(self.current_max) +class GeospatialStatsAggregator: + primitive_type: PrimitiveType + current_min: Any + current_max: Any + + def __init__(self, iceberg_type: PrimitiveType) -> None: + if not isinstance(iceberg_type, (GeometryType, GeographyType)): + raise ValueError(f"Expected GeometryType or GeographyType, got {iceberg_type}") + self.primitive_type = iceberg_type + self.current_min = None + self.current_max = None + self._envelope: GeometryEnvelope | None = None + + def update_from_wkb(self, val: bytes | bytearray | memoryview | None) -> None: + if val is None: + return + + envelope = extract_envelope_from_wkb(bytes(val), isinstance(self.primitive_type, GeographyType)) + if envelope is None: + return + + if self._envelope is None: + self._envelope = envelope + else: + self._envelope = merge_envelopes( + self._envelope, + envelope, + is_geography=isinstance(self.primitive_type, GeographyType), + ) + + self.current_min = self._envelope.to_min_bound() + self.current_max = self._envelope.to_max_bound() + + def update_min(self, val: Any | None) -> None: + if isinstance(val, (bytes, bytearray, memoryview)): + self.update_from_wkb(val) + + def update_max(self, val: Any | None) -> None: + if isinstance(val, (bytes, bytearray, memoryview)): + self.update_from_wkb(val) + + def min_as_bytes(self) -> bytes | None: + if self._envelope is None: + return None + return serialize_geospatial_bound(self._envelope.to_min_bound()) + + def max_as_bytes(self) -> bytes | None: + if self._envelope is None: + return None + return serialize_geospatial_bound(self._envelope.to_max_bound()) + + DEFAULT_TRUNCATION_LENGTH = 16 TRUNCATION_EXPR = r"^truncate\((\d+)\)$" @@ -2480,7 +2596,7 @@ class DataFileStatistics: value_counts: dict[int, int] null_value_counts: dict[int, int] nan_value_counts: dict[int, int] - column_aggregates: dict[int, StatsAggregator] + column_aggregates: dict[int, StatsAggregator | GeospatialStatsAggregator] split_offsets: list[int] def _partition_value(self, partition_field: PartitionField, schema: Schema) -> Any: @@ -2488,6 +2604,11 @@ def _partition_value(self, partition_field: PartitionField, schema: Schema) -> A return None source_field = schema.find_field(partition_field.source_id) + if isinstance(source_field.field_type, (GeometryType, GeographyType)): + # Geospatial lower/upper bounds encode envelope extrema, not original values, + # so they cannot be used to infer a partition value. + return None + iceberg_transform = partition_field.transform if not iceberg_transform.preserves_order: @@ -2546,6 +2667,78 @@ def to_serialized_dict(self) -> dict[str, Any]: } +def _iter_wkb_values(column: pa.Array | ChunkedArray) -> Iterator[bytes]: + chunks = column.chunks if isinstance(column, ChunkedArray) else [column] + for chunk in chunks: + if isinstance(chunk, pa.ExtensionArray): + chunk = chunk.storage + + for scalar in chunk: + if not scalar.is_valid: + continue + + value = scalar.as_py() + if isinstance(value, bytes): + yield value + elif isinstance(value, bytearray): + yield bytes(value) + elif isinstance(value, memoryview): + yield value.tobytes() + elif hasattr(value, "to_wkb"): + yield bytes(value.to_wkb()) + else: + raise ValueError(f"Expected a bytes-like WKB value, got {type(value)}") + + +def geospatial_column_aggregates_from_arrow_table( + arrow_table: pa.Table, stats_columns: dict[int, StatisticsCollector] +) -> dict[int, GeospatialStatsAggregator]: + geospatial_aggregates: dict[int, GeospatialStatsAggregator] = {} + + for field_id, stats_col in stats_columns.items(): + if stats_col.mode.type in (MetricModeTypes.NONE, MetricModeTypes.COUNTS): + continue + + if not isinstance(stats_col.iceberg_type, (GeometryType, GeographyType)): + continue + + column = _get_field_from_arrow_table(arrow_table, stats_col.column_name) + aggregator = GeospatialStatsAggregator(stats_col.iceberg_type) + + try: + for value in _iter_wkb_values(column): + aggregator.update_from_wkb(value) + except ValueError as exc: + logger.warning("Skipping geospatial bounds for column %s: %s", stats_col.column_name, exc) + continue + + if aggregator.min_as_bytes() is not None: + geospatial_aggregates[field_id] = aggregator + + return geospatial_aggregates + + +def geospatial_column_aggregates_from_parquet_file( + input_file: InputFile, stats_columns: dict[int, StatisticsCollector] +) -> dict[int, GeospatialStatsAggregator]: + geospatial_stats_columns = { + field_id: stats_col + for field_id, stats_col in stats_columns.items() + if stats_col.mode.type not in (MetricModeTypes.NONE, MetricModeTypes.COUNTS) + and isinstance(stats_col.iceberg_type, (GeometryType, GeographyType)) + } + if not geospatial_stats_columns: + return {} + + with input_file.open() as input_stream: + arrow_table = pq.read_table( + input_stream, + columns=[stats_col.column_name for stats_col in geospatial_stats_columns.values()], + ) + + return geospatial_column_aggregates_from_arrow_table(arrow_table, geospatial_stats_columns) + + def data_file_statistics_from_parquet_metadata( parquet_metadata: pq.FileMetaData, stats_columns: dict[int, StatisticsCollector], @@ -2617,6 +2810,11 @@ def data_file_statistics_from_parquet_metadata( if stats_col.mode == MetricsMode(MetricModeTypes.COUNTS): continue + if isinstance(stats_col.iceberg_type, (GeometryType, GeographyType)): + # Geospatial metrics bounds are computed from row values (WKB parsing), + # not Parquet binary min/max statistics. + continue + if field_id not in col_aggs: try: col_aggs[field_id] = StatsAggregator( @@ -2660,7 +2858,7 @@ def data_file_statistics_from_parquet_metadata( value_counts=value_counts, null_value_counts=null_value_counts, nan_value_counts=nan_value_counts, - column_aggregates=col_aggs, + column_aggregates=cast(dict[int, StatsAggregator | GeospatialStatsAggregator], col_aggs), split_offsets=split_offsets, ) @@ -2707,11 +2905,13 @@ def write_parquet(task: WriteTask) -> DataFile: fos, schema=arrow_table.schema, store_decimal_as_integer=True, **parquet_writer_kwargs ) as writer: writer.write(arrow_table, row_group_size=row_group_size) + stats_columns = compute_statistics_plan(file_schema, table_metadata.properties) statistics = data_file_statistics_from_parquet_metadata( parquet_metadata=writer.writer.metadata, - stats_columns=compute_statistics_plan(file_schema, table_metadata.properties), + stats_columns=stats_columns, parquet_column_mapping=parquet_path_to_id_mapping(file_schema), ) + statistics.column_aggregates.update(geospatial_column_aggregates_from_arrow_table(arrow_table, stats_columns)) data_file = DataFile.from_args( content=DataFileContent.DATA, file_path=file_path, @@ -2785,7 +2985,7 @@ def _check_pyarrow_schema_compatible( f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. " "Update the schema first (hint, use union_by_name)." ) from e - _check_schema_compatible(requested_schema, provided_schema) + _check_schema_compatible(requested_schema, provided_schema, allow_planar_geospatial_equivalence=True) def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, file_paths: Iterator[str]) -> Iterator[DataFile]: @@ -2803,12 +3003,14 @@ def parquet_file_to_data_file(io: FileIO, table_metadata: TableMetadata, file_pa schema = table_metadata.schema() _check_pyarrow_schema_compatible(schema, arrow_schema, format_version=table_metadata.format_version) + stats_columns = compute_statistics_plan(schema, table_metadata.properties) statistics = data_file_statistics_from_parquet_metadata( parquet_metadata=parquet_metadata, - stats_columns=compute_statistics_plan(schema, table_metadata.properties), + stats_columns=stats_columns, parquet_column_mapping=parquet_path_to_id_mapping(schema), ) + statistics.column_aggregates.update(geospatial_column_aggregates_from_parquet_file(input_file, stats_columns)) data_file = DataFile.from_args( content=DataFileContent.DATA, file_path=file_path, diff --git a/pyiceberg/schema.py b/pyiceberg/schema.py index fd60eb8f94..2b3abd54e0 100644 --- a/pyiceberg/schema.py +++ b/pyiceberg/schema.py @@ -1723,7 +1723,9 @@ def _(file_type: UnknownType, read_type: IcebergType) -> IcebergType: raise ResolveError(f"Cannot promote {file_type} to {read_type}") -def _check_schema_compatible(requested_schema: Schema, provided_schema: Schema) -> None: +def _check_schema_compatible( + requested_schema: Schema, provided_schema: Schema, allow_planar_geospatial_equivalence: bool = False +) -> None: """ Check if the `provided_schema` is compatible with `requested_schema`. @@ -1737,23 +1739,43 @@ def _check_schema_compatible(requested_schema: Schema, provided_schema: Schema) Raises: ValueError: If the schemas are not compatible. """ - pre_order_visit(requested_schema, _SchemaCompatibilityVisitor(provided_schema)) + pre_order_visit( + requested_schema, + _SchemaCompatibilityVisitor( + provided_schema=provided_schema, + allow_planar_geospatial_equivalence=allow_planar_geospatial_equivalence, + ), + ) class _SchemaCompatibilityVisitor(PreOrderSchemaVisitor[bool]): provided_schema: Schema + allow_planar_geospatial_equivalence: bool - def __init__(self, provided_schema: Schema): + def __init__(self, provided_schema: Schema, allow_planar_geospatial_equivalence: bool = False): from rich.console import Console from rich.table import Table as RichTable self.provided_schema = provided_schema + self.allow_planar_geospatial_equivalence = allow_planar_geospatial_equivalence self.rich_table = RichTable(show_header=True, header_style="bold") self.rich_table.add_column("") self.rich_table.add_column("Table field") self.rich_table.add_column("Dataframe field") self.console = Console(record=True) + def _is_planar_geospatial_equivalent(self, lhs: IcebergType, rhs: IcebergType) -> bool: + if not self.allow_planar_geospatial_equivalence: + return False + + if isinstance(lhs, GeometryType) and isinstance(rhs, GeographyType): + return rhs.algorithm == "planar" and lhs.crs == rhs.crs + + if isinstance(lhs, GeographyType) and isinstance(rhs, GeometryType): + return lhs.algorithm == "planar" and lhs.crs == rhs.crs + + return False + def _is_field_compatible(self, lhs: NestedField) -> bool: # Validate nullability first. # An optional field can be missing in the provided schema @@ -1776,6 +1798,9 @@ def _is_field_compatible(self, lhs: NestedField) -> bool: if lhs.field_type == rhs.field_type: self.rich_table.add_row("✅", str(lhs), str(rhs)) return True + elif self._is_planar_geospatial_equivalent(lhs.field_type, rhs.field_type): + self.rich_table.add_row("✅", str(lhs), str(rhs)) + return True # We only check that the parent node is also of the same type. # We check the type of the child nodes when we traverse them later. elif any( diff --git a/pyiceberg/utils/geospatial.py b/pyiceberg/utils/geospatial.py new file mode 100644 index 0000000000..baf4044329 --- /dev/null +++ b/pyiceberg/utils/geospatial.py @@ -0,0 +1,437 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import math +import struct +from dataclasses import dataclass + +_WKB_POINT = 1 +_WKB_LINESTRING = 2 +_WKB_POLYGON = 3 +_WKB_MULTIPOINT = 4 +_WKB_MULTILINESTRING = 5 +_WKB_MULTIPOLYGON = 6 +_WKB_GEOMETRYCOLLECTION = 7 + +_EWKB_Z_FLAG = 0x80000000 +_EWKB_M_FLAG = 0x40000000 +_EWKB_SRID_FLAG = 0x20000000 + + +@dataclass(frozen=True) +class GeospatialBound: + x: float + y: float + z: float | None = None + m: float | None = None + + @property + def has_z(self) -> bool: + return self.z is not None + + @property + def has_m(self) -> bool: + return self.m is not None + + +@dataclass(frozen=True) +class GeometryEnvelope: + x_min: float + y_min: float + z_min: float | None + m_min: float | None + x_max: float + y_max: float + z_max: float | None + m_max: float | None + + def to_min_bound(self) -> GeospatialBound: + return GeospatialBound(x=self.x_min, y=self.y_min, z=self.z_min, m=self.m_min) + + def to_max_bound(self) -> GeospatialBound: + return GeospatialBound(x=self.x_max, y=self.y_max, z=self.z_max, m=self.m_max) + + +def serialize_geospatial_bound(bound: GeospatialBound) -> bytes: + if bound.z is None and bound.m is None: + return struct.pack("