From 75d97317ac834d1538ece5c527d9d85605001ee6 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 29 Jan 2026 16:13:36 +0800 Subject: [PATCH 01/10] support format table for rest --- .../pypaimon/catalog/rest/rest_catalog.py | 36 +++- .../pypaimon/table/format/__init__.py | 37 +++++ .../format/format_batch_write_builder.py | 55 ++++++ .../table/format/format_commit_message.py | 26 +++ .../table/format/format_data_split.py | 30 ++++ .../table/format/format_read_builder.py | 71 ++++++++ .../pypaimon/table/format/format_table.py | 109 ++++++++++++ .../table/format/format_table_commit.py | 72 ++++++++ .../table/format/format_table_read.py | 149 +++++++++++++++++ .../table/format/format_table_scan.py | 115 +++++++++++++ .../table/format/format_table_write.py | 157 ++++++++++++++++++ 11 files changed, 855 insertions(+), 2 deletions(-) create mode 100644 paimon-python/pypaimon/table/format/__init__.py create mode 100644 paimon-python/pypaimon/table/format/format_batch_write_builder.py create mode 100644 paimon-python/pypaimon/table/format/format_commit_message.py create mode 100644 paimon-python/pypaimon/table/format/format_data_split.py create mode 100644 paimon-python/pypaimon/table/format/format_read_builder.py create mode 100644 paimon-python/pypaimon/table/format/format_table.py create mode 100644 paimon-python/pypaimon/table/format/format_table_commit.py create mode 100644 paimon-python/pypaimon/table/format/format_table_read.py create mode 100644 paimon-python/pypaimon/table/format/format_table_scan.py create mode 100644 paimon-python/pypaimon/table/format/format_table_write.py diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py b/paimon-python/pypaimon/catalog/rest/rest_catalog.py index 41a3061fb93c..db9cad4a7202 100644 --- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py +++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py @@ -42,6 +42,11 @@ from pypaimon.snapshot.snapshot import Snapshot from pypaimon.snapshot.snapshot_commit import PartitionStatistics from pypaimon.table.file_store_table import FileStoreTable +from pypaimon.table.format.format_table import FormatTable, Format + + +# Table type value from Java TableType.FORMAT_TABLE +FORMAT_TABLE_TYPE = "format-table" class RESTCatalog(Catalog): @@ -180,7 +185,7 @@ def list_tables_paged( except ForbiddenException as e: raise DatabaseNoPermissionException(database_name) from e - def get_table(self, identifier: Union[str, Identifier]) -> FileStoreTable: + def get_table(self, identifier: Union[str, Identifier]): if not isinstance(identifier, Identifier): identifier = Identifier.from_string(identifier) return self.load_table( @@ -263,9 +268,12 @@ def load_table(self, internal_file_io: Callable[[str], Any], external_file_io: Callable[[str], Any], metadata_loader: Callable[[Identifier], TableMetadata], - ) -> FileStoreTable: + ): metadata = metadata_loader(identifier) schema = metadata.schema + table_type = schema.options.get(CoreOptions.TYPE.key(), "").strip().lower() + if table_type == FORMAT_TABLE_TYPE: + return self._create_format_table(identifier, metadata, internal_file_io, external_file_io) data_file_io = external_file_io if metadata.is_external else internal_file_io catalog_env = CatalogEnvironment( identifier=identifier, @@ -281,6 +289,30 @@ def load_table(self, catalog_env) return table + def _create_format_table(self, + identifier: Identifier, + metadata: TableMetadata, + internal_file_io: Callable[[str], Any], + external_file_io: Callable[[str], Any], + ) -> FormatTable: + schema = metadata.schema + location = schema.options.get(CoreOptions.PATH.key()) + if not location: + raise ValueError("Format table schema must have path option") + data_file_io = external_file_io if metadata.is_external else internal_file_io + file_io = data_file_io(location) + file_format = schema.options.get(CoreOptions.FILE_FORMAT.key(), "parquet") + fmt = Format.parse(file_format) + return FormatTable( + file_io=file_io, + identifier=identifier, + table_schema=schema, + location=location, + format=fmt, + options=dict(schema.options), + comment=schema.comment, + ) + @staticmethod def create(file_io: FileIO, table_path: str, diff --git a/paimon-python/pypaimon/table/format/__init__.py b/paimon-python/pypaimon/table/format/__init__.py new file mode 100644 index 000000000000..ab6995a69607 --- /dev/null +++ b/paimon-python/pypaimon/table/format/__init__.py @@ -0,0 +1,37 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pypaimon.table.format.format_data_split import FormatDataSplit +from pypaimon.table.format.format_table import FormatTable, Format +from pypaimon.table.format.format_read_builder import FormatReadBuilder +from pypaimon.table.format.format_table_scan import FormatTableScan +from pypaimon.table.format.format_table_read import FormatTableRead +from pypaimon.table.format.format_batch_write_builder import FormatBatchWriteBuilder +from pypaimon.table.format.format_table_write import FormatTableWrite +from pypaimon.table.format.format_table_commit import FormatTableCommit +from pypaimon.table.format.format_commit_message import FormatTableCommitMessage + +__all__ = [ + "FormatDataSplit", + "FormatTable", + "Format", + "FormatReadBuilder", + "FormatTableScan", + "FormatTableRead", + "FormatBatchWriteBuilder", + "FormatTableWrite", + "FormatTableCommit", +] diff --git a/paimon-python/pypaimon/table/format/format_batch_write_builder.py b/paimon-python/pypaimon/table/format/format_batch_write_builder.py new file mode 100644 index 000000000000..7fa101e81329 --- /dev/null +++ b/paimon-python/pypaimon/table/format/format_batch_write_builder.py @@ -0,0 +1,55 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Optional + +from pypaimon.table.format.format_table import FormatTable +from pypaimon.table.format.format_table_commit import FormatTableCommit +from pypaimon.table.format.format_table_write import FormatTableWrite + + +class FormatBatchWriteBuilder: + def __init__(self, table: FormatTable): + self.table = table + self._overwrite = False + self._static_partition: Optional[dict] = None + + def overwrite(self, static_partition: Optional[dict] = None) -> "FormatBatchWriteBuilder": + self._overwrite = True + self._validate_static_partition(static_partition) + self._static_partition = static_partition if static_partition is not None else {} + return self + + def _validate_static_partition(self, static_partition: Optional[dict]) -> None: + if not static_partition: + return + if not self.table.partition_keys: + raise ValueError( + "Format table is not partitioned, static partition values are not allowed." + ) + for key in static_partition: + if key not in self.table.partition_keys: + raise ValueError(f"Unknown static partition column: {key}") + + def new_write(self) -> FormatTableWrite: + return FormatTableWrite(self.table, overwrite=self._overwrite) + + def new_commit(self) -> FormatTableCommit: + return FormatTableCommit( + table=self.table, + overwrite=self._overwrite, + static_partitions=self._static_partition, + ) diff --git a/paimon-python/pypaimon/table/format/format_commit_message.py b/paimon-python/pypaimon/table/format/format_commit_message.py new file mode 100644 index 000000000000..c9a253ef58f9 --- /dev/null +++ b/paimon-python/pypaimon/table/format/format_commit_message.py @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass +from typing import List + + +@dataclass +class FormatTableCommitMessage: + written_paths: List[str] + + def is_empty(self) -> bool: + return not self.written_paths diff --git a/paimon-python/pypaimon/table/format/format_data_split.py b/paimon-python/pypaimon/table/format/format_data_split.py new file mode 100644 index 000000000000..a71c9622a832 --- /dev/null +++ b/paimon-python/pypaimon/table/format/format_data_split.py @@ -0,0 +1,30 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass +from typing import Dict, Optional, Any + + +@dataclass(frozen=True)xi +class FormatDataSplit: + file_path: str + file_size: int + offset: int = 0 + length: Optional[int] = None # None means read whole file + partition: Optional[Dict[str, Any]] = None # partition column name -> value + + def data_path(self) -> str: + return self.file_path diff --git a/paimon-python/pypaimon/table/format/format_read_builder.py b/paimon-python/pypaimon/table/format/format_read_builder.py new file mode 100644 index 000000000000..9cd62272b0a5 --- /dev/null +++ b/paimon-python/pypaimon/table/format/format_read_builder.py @@ -0,0 +1,71 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import List, Optional + +from pypaimon.common.predicate import Predicate +from pypaimon.common.predicate_builder import PredicateBuilder +from pypaimon.schema.data_types import DataField +from pypaimon.table.format.format_table import FormatTable +from pypaimon.table.format.format_table_scan import FormatTableScan +from pypaimon.table.format.format_table_read import FormatTableRead + + +class FormatReadBuilder: + def __init__(self, table: FormatTable): + self.table = table + self._projection: Optional[List[str]] = None + self._limit: Optional[int] = None + self._partition_filter: Optional[dict] = None + + def with_filter(self, predicate: Predicate) -> "FormatReadBuilder": + # Format table supports partition filter only; data predicate applied in read + self._partition_filter = None # could extract partition from predicate + return self + + def with_projection(self, projection: List[str]) -> "FormatReadBuilder": + self._projection = projection + return self + + def with_limit(self, limit: int) -> "FormatReadBuilder": + self._limit = limit + return self + + def with_partition_filter(self, partition_spec: Optional[dict]) -> "FormatReadBuilder": + self._partition_filter = partition_spec + return self + + def new_scan(self) -> FormatTableScan: + return FormatTableScan( + self.table, + partition_filter=self._partition_filter, + limit=self._limit, + ) + + def new_read(self) -> FormatTableRead: + return FormatTableRead( + table=self.table, + projection=self._projection, + limit=self._limit, + ) + + def new_predicate_builder(self) -> PredicateBuilder: + return PredicateBuilder(self.read_type()) + + def read_type(self) -> List[DataField]: + if self._projection: + return [f for f in self.table.fields if f.name in self._projection] + return list(self.table.fields) diff --git a/paimon-python/pypaimon/table/format/format_table.py b/paimon-python/pypaimon/table/format/format_table.py new file mode 100644 index 000000000000..e7ad03dd16f9 --- /dev/null +++ b/paimon-python/pypaimon/table/format/format_table.py @@ -0,0 +1,109 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from enum import Enum +from typing import Dict, List, Optional + +from pypaimon.common.file_io import FileIO +from pypaimon.common.identifier import Identifier +from pypaimon.common.options.core_options import CoreOptions +from pypaimon.common.options.options import Options + +PARTITION_DEFAULT_NAME_KEY = "partition.default-name" +PARTITION_DEFAULT_NAME_DEFAULT = "__DEFAULT_PARTITION__" +from pypaimon.schema.table_schema import TableSchema +from pypaimon.table.table import Table +from pypaimon.table.format.format_read_builder import FormatReadBuilder +from pypaimon.table.format.format_batch_write_builder import FormatBatchWriteBuilder + + +class Format(str, Enum): + ORC = "orc" + PARQUET = "parquet" + CSV = "csv" + TEXT = "text" + JSON = "json" + + @classmethod + def parse(cls, file_format: str) -> "Format": + s = (file_format or "parquet").strip().upper() + try: + return cls[s] + except KeyError: + raise ValueError( + f"Format table unsupported file format: {file_format}. " + f"Supported: {[f.name for f in cls]}" + ) + + +class FormatTable(Table): + def __init__( + self, + file_io: FileIO, + identifier: Identifier, + table_schema: TableSchema, + location: str, + format: Format, + options: Optional[Dict[str, str]] = None, + comment: Optional[str] = None, + ): + self.file_io = file_io + self.identifier = identifier + self._table_schema = table_schema + self._location = location.rstrip("/") + self._format = format + self.options = options or dict(table_schema.options) + self.comment = comment + self.fields = table_schema.fields + self.field_names = [f.name for f in self.fields] + self.partition_keys = table_schema.partition_keys or [] + self.primary_keys: List[str] = [] # format table has no primary key + self._core_options = CoreOptions(Options(self.options)) + + def name(self) -> str: + return self.identifier.get_table_name() + + def full_name(self) -> str: + return self.identifier.get_full_name() + + @property + def table_schema(self) -> TableSchema: + return self._table_schema + + @table_schema.setter + def table_schema(self, value: TableSchema): + self._table_schema = value + + def location(self) -> str: + return self._location + + def format(self) -> Format: + return self._format + + def options(self) -> Dict[str, str]: + return self.options + + def default_part_name(self) -> str: + return self.options.get(PARTITION_DEFAULT_NAME_KEY, PARTITION_DEFAULT_NAME_DEFAULT) + + def new_read_builder(self) -> FormatReadBuilder: + return FormatReadBuilder(self) + + def new_batch_write_builder(self) -> FormatBatchWriteBuilder: + return FormatBatchWriteBuilder(self) + + def new_stream_write_builder(self): + raise NotImplementedError("Format table does not support stream write.") diff --git a/paimon-python/pypaimon/table/format/format_table_commit.py b/paimon-python/pypaimon/table/format/format_table_commit.py new file mode 100644 index 000000000000..5009e4bdc7da --- /dev/null +++ b/paimon-python/pypaimon/table/format/format_table_commit.py @@ -0,0 +1,72 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import List, Optional + +import pyarrow.fs as pafs + +from pypaimon.table.format.format_table import FormatTable +from pypaimon.table.format.format_table_scan import _is_data_file_name +from pypaimon.table.format.format_commit_message import FormatTableCommitMessage + + +def _delete_data_files_in_path(file_io, path: str) -> None: + try: + infos = file_io.list_status(path) + except Exception: + return + for info in infos: + if info.type == pafs.FileType.Directory: + _delete_data_files_in_path(file_io, info.path) + elif info.type == pafs.FileType.File: + name = info.path.split("/")[-1] if "/" in info.path else info.path + if _is_data_file_name(name): + try: + file_io.delete(info.path, False) + except Exception: + pass + + +class FormatTableCommit: + + def __init__( + self, + table: FormatTable, + overwrite: bool = False, + static_partitions: Optional[dict] = None, + ): + self.table = table + self.overwrite = overwrite + self.static_partitions = static_partitions or {} + self._committed = False + + def commit(self, commit_messages: List[FormatTableCommitMessage]) -> None: + if self._committed: + raise RuntimeError("FormatTableCommit supports only one commit.") + self._committed = True + return + + def abort(self, commit_messages: List[FormatTableCommitMessage]) -> None: + for msg in commit_messages: + for path in msg.written_paths: + try: + if self.table.file_io.exists(path): + self.table.file_io.delete(path, False) + except Exception: + pass + + def close(self) -> None: + pass diff --git a/paimon-python/pypaimon/table/format/format_table_read.py b/paimon-python/pypaimon/table/format/format_table_read.py new file mode 100644 index 000000000000..0448c5041f4b --- /dev/null +++ b/paimon-python/pypaimon/table/format/format_table_read.py @@ -0,0 +1,149 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, Dict, Iterator, List, Optional + +import pandas +import pyarrow + +from pypaimon.schema.data_types import PyarrowFieldParser +from pypaimon.table.format.format_data_split import FormatDataSplit +from pypaimon.table.format.format_table import FormatTable, Format + + +def _read_file_to_arrow( + file_io: Any, + split: FormatDataSplit, + fmt: Format, + partition_spec: Optional[Dict[str, str]], + read_fields: Optional[List[str]], +) -> pyarrow.Table: + path = split.data_path() + opts = {} + if fmt == Format.CSV: + opts = {"max_read_options": {"block_size": 1 << 20}} + try: + with file_io.new_input_stream(path) as stream: + data = stream.read() + except Exception as e: + raise RuntimeError(f"Failed to read {path}") from e + + if fmt == Format.PARQUET: + import io + tbl = pyarrow.parquet.read_table(io.BytesIO(data)) + elif fmt == Format.CSV: + tbl = pyarrow.csv.read_csv(pyarrow.BufferReader(data), **opts) + elif fmt == Format.JSON: + import json + text = data.decode("utf-8") if isinstance(data, bytes) else data + records = [] + for line in text.strip().split("\n"): + line = line.strip() + if line: + records.append(json.loads(line)) + if not records: + return pyarrow.table({}) + tbl = pyarrow.Table.from_pylist(records) + else: + raise ValueError(f"Format {fmt} read not implemented in Python") + + if partition_spec: + for k, v in partition_spec.items(): + tbl = tbl.append_column( + k, + pyarrow.array([v] * tbl.num_rows, type=pyarrow.string()), + ) + if read_fields: + col_order = [c for c in read_fields if c in tbl.column_names] + if col_order: + tbl = tbl.select(col_order) + + if read_fields and tbl.num_columns > 0: + existing = [c for c in read_fields if c in tbl.column_names] + if existing: + tbl = tbl.select(existing) + return tbl + + +class FormatTableRead: + + def __init__( + self, + table: FormatTable, + projection: Optional[List[str]] = None, + limit: Optional[int] = None, + ): + self.table = table + self.projection = projection + self.limit = limit + + def to_arrow( + self, + splits: List[FormatDataSplit], + ) -> pyarrow.Table: + read_fields = self.projection + fmt = self.table.format() + tables = [] + nrows = 0 + for split in splits: + t = _read_file_to_arrow( + self.table.file_io, + split, + fmt, + split.partition, + read_fields, + ) + if t.num_rows > 0: + tables.append(t) + nrows += t.num_rows + if self.limit is not None and nrows >= self.limit: + if nrows > self.limit: + excess = nrows - self.limit + last = tables[-1] + tables[-1] = last.slice(0, last.num_rows - excess) + break + if not tables: + fields = self.table.fields + if read_fields: + fields = [f for f in self.table.fields if f.name in read_fields] + schema = PyarrowFieldParser.from_paimon_schema(fields) + return pyarrow.Table.from_pydict( + {n: [] for n in schema.names}, + schema=schema, + ) + out = pyarrow.concat_tables(tables) + if self.limit is not None and out.num_rows > self.limit: + out = out.slice(0, self.limit) + return out + + def to_pandas(self, splits: List[FormatDataSplit]) -> pandas.DataFrame: + return self.to_arrow(splits).to_pandas() + + def to_iterator( + self, + splits: List[FormatDataSplit], + ) -> Iterator[Any]: + for split in splits: + t = _read_file_to_arrow( + self.table.file_io, + split, + self.table.format(), + split.partition, + self.projection, + ) + for batch in t.to_batches(): + for i in range(batch.num_rows): + yield batch.slice(i, 1) diff --git a/paimon-python/pypaimon/table/format/format_table_scan.py b/paimon-python/pypaimon/table/format/format_table_scan.py new file mode 100644 index 000000000000..40ff8f072ac6 --- /dev/null +++ b/paimon-python/pypaimon/table/format/format_table_scan.py @@ -0,0 +1,115 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Dict, List, Optional + +import pyarrow.fs as pafs + +from pypaimon.common.file_io import FileIO +from pypaimon.read.plan import Plan +from pypaimon.table.format.format_data_split import FormatDataSplit +from pypaimon.table.format.format_table import FormatTable + + +def _is_data_file_name(name: str) -> bool: + """Match Java FormatTableScan.isDataFileName: exclude hidden and metadata.""" + return name is not None and not name.startswith(".") and not name.startswith("_") + + +def _list_data_files_recursive( + file_io: FileIO, + path: str, + base_path: str, + partition_keys: List[str], + partition_only_value: bool, + rel_path_parts: Optional[List[str]] = None, +) -> List[FormatDataSplit]: + splits: List[FormatDataSplit] = [] + rel_path_parts = rel_path_parts or [] + try: + infos = file_io.list_status(path) + except Exception: + return splits + if not infos: + return splits + for info in infos: + name = info.path.split("/")[-1] if "/" in info.path else info.path + if info.type == pafs.FileType.Directory: + part_value = name + if not partition_only_value and "=" in name: + part_value = name.split("=", 1)[1] + child_parts = rel_path_parts + [part_value] + if len(child_parts) <= len(partition_keys): + sub_splits = _list_data_files_recursive( + file_io, + info.path, + base_path, + partition_keys, + partition_only_value, + child_parts, + ) + splits.extend(sub_splits) + elif info.type == pafs.FileType.File and _is_data_file_name(name): + size = getattr(info, "size", None) or 0 + part_spec: Optional[Dict[str, str]] = None + if partition_keys and len(rel_path_parts) >= len(partition_keys): + part_spec = dict(zip(partition_keys, rel_path_parts[: len(partition_keys)])) + splits.append( + FormatDataSplit( + file_path=info.path, + file_size=size if size is not None else 0, + partition=part_spec, + ) + ) + return splits + + +class FormatTableScan: + + def __init__( + self, + table: FormatTable, + partition_filter: Optional[Dict[str, str]] = None, + limit: Optional[int] = None, + ): + self.table = table + self.partition_filter = partition_filter # optional equality filter + self.limit = limit + + def plan(self) -> Plan: + partition_only_value = self.table.options.get( + "format-table.partition-path-only-value", "false" + ).lower() == "true" + splits = _list_data_files_recursive( + self.table.file_io, + self.table.location(), + self.table.location(), + self.table.partition_keys, + partition_only_value, + ) + if self.partition_filter: + filtered = [] + for s in splits: + if s.partition and all( + s.partition.get(k) == v for k, v in self.partition_filter.items() + ): + filtered.append(s) + splits = filtered + if self.limit is not None and self.limit <= 0: + splits = [] + elif self.limit is not None and len(splits) > self.limit: + splits = splits[: self.limit] + return Plan(_splits=splits) diff --git a/paimon-python/pypaimon/table/format/format_table_write.py b/paimon-python/pypaimon/table/format/format_table_write.py new file mode 100644 index 000000000000..e3ae21d7b210 --- /dev/null +++ b/paimon-python/pypaimon/table/format/format_table_write.py @@ -0,0 +1,157 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import io +import uuid +from collections import defaultdict +from typing import List + +import pyarrow + +from pypaimon.schema.data_types import PyarrowFieldParser +from pypaimon.table.format.format_commit_message import FormatTableCommitMessage +from pypaimon.table.format.format_table import FormatTable, Format + + +def _partition_path(partition_spec: dict, partition_keys: List[str], only_value: bool) -> str: + parts = [] + for k in partition_keys: + v = partition_spec.get(k) + if v is None: + break + parts.append(str(v) if only_value else f"{k}={v}") + return "/".join(parts) + + +def _partition_from_row( + row: pyarrow.RecordBatch, + partition_keys: List[str], + row_index: int, +) -> tuple: + out = [] + for k in partition_keys: + col = row.column(row.schema.get_field_index(k)) + val = col[row_index] + if val is None or (hasattr(val, "as_py") and val.as_py() is None): + out.append(None) + else: + out.append(val.as_py() if hasattr(val, "as_py") else val) + return tuple(out) + + +class FormatTableWrite: + """Batch write for format table: writes Arrow/Pandas to partition dirs as files.""" + + def __init__(self, table: FormatTable, overwrite: bool = False): + self.table = table + self._overwrite = overwrite + self._written_paths: List[str] = [] + self._partition_only_value = ( + table.options.get("format-table.partition-path-only-value", "false").lower() == "true" + ) + self._file_format = table.format() + self._data_file_prefix = "data-" + self._suffix = {"parquet": ".parquet", "csv": ".csv", "json": ".json"}.get( + self._file_format.value, ".parquet" + ) + + def write_arrow(self, data: pyarrow.Table) -> None: + for batch in data.to_batches(): + self.write_arrow_batch(batch) + + def write_arrow_batch(self, data: pyarrow.RecordBatch) -> None: + partition_keys = self.table.partition_keys + if not partition_keys: + part_spec = {} + self._write_single_batch(data, part_spec) + return + # Group rows by partition + parts_to_indices = defaultdict(list) + for i in range(data.num_rows): + part = _partition_from_row(data, partition_keys, i) + parts_to_indices[part].append(i) + for part_tuple, indices in parts_to_indices.items(): + part_spec = dict(zip(partition_keys, part_tuple)) + sub = data.take(pyarrow.array(indices)) + self._write_single_batch(sub, part_spec) + + def write_pandas(self, df) -> None: + pa_schema = PyarrowFieldParser.from_paimon_schema(self.table.fields) + batch = pyarrow.RecordBatch.from_pandas(df, schema=pa_schema) + self.write_arrow_batch(batch) + + def _write_single_batch( + self, + data: pyarrow.RecordBatch, + partition_spec: dict, + ) -> None: + if data.num_rows == 0: + return + location = self.table.location() + partition_only_value = self._partition_only_value + part_path = _partition_path( + partition_spec, + self.table.partition_keys, + partition_only_value, + ) + if part_path: + dir_path = f"{location}/{part_path}" + else: + dir_path = location + if self._overwrite and self.table.file_io.exists(dir_path): + from pypaimon.table.format.format_table_commit import _delete_data_files_in_path + _delete_data_files_in_path(self.table.file_io, dir_path) + self.table.file_io.check_or_mkdirs(dir_path) + file_name = f"{self._data_file_prefix}{uuid.uuid4().hex}{self._suffix}" + path = f"{dir_path}/{file_name}" + + fmt = self._file_format + if fmt == Format.PARQUET: + buf = io.BytesIO() + pyarrow.parquet.write_table( + pyarrow.Table.from_batches([data]), + buf, + compression="zstd", + ) + raw = buf.getvalue() + elif fmt == Format.CSV: + buf = io.BytesIO() + pyarrow.csv.write_csv( + pyarrow.Table.from_batches([data]), + buf, + ) + raw = buf.getvalue() + elif fmt == Format.JSON: + tbl = pyarrow.Table.from_batches([data]) + import json + lines = [] + for i in range(tbl.num_rows): + row = {tbl.column_names[j]: tbl.column(j)[i].as_py() for j in range(tbl.num_columns)} + lines.append(json.dumps(row) + "\n") + raw = "".join(lines).encode("utf-8") + else: + raise ValueError(f"Format table write not implemented for {fmt}") + + with self.table.file_io.new_output_stream(path) as out: + out.write(raw) + + self._written_paths.append(path) + + def prepare_commit(self) -> List[FormatTableCommitMessage]: + return [FormatTableCommitMessage(written_paths=list(self._written_paths))] + + def close(self) -> None: + pass From 819b59e47dc8e899ec1bb02cc845c59e96ab5454 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 29 Jan 2026 23:53:19 +0800 Subject: [PATCH 02/10] fix test case failure --- .../table/format/format_data_split.py | 2 +- .../pypaimon/table/format/format_table.py | 8 ++++---- .../table/format/format_table_read.py | 19 +++++++++++++++++-- .../table/format/format_table_scan.py | 8 ++++++-- 4 files changed, 28 insertions(+), 9 deletions(-) diff --git a/paimon-python/pypaimon/table/format/format_data_split.py b/paimon-python/pypaimon/table/format/format_data_split.py index a71c9622a832..b078e490d011 100644 --- a/paimon-python/pypaimon/table/format/format_data_split.py +++ b/paimon-python/pypaimon/table/format/format_data_split.py @@ -18,7 +18,7 @@ from typing import Dict, Optional, Any -@dataclass(frozen=True)xi +@dataclass(frozen=True) class FormatDataSplit: file_path: str file_size: int diff --git a/paimon-python/pypaimon/table/format/format_table.py b/paimon-python/pypaimon/table/format/format_table.py index e7ad03dd16f9..86204392d814 100644 --- a/paimon-python/pypaimon/table/format/format_table.py +++ b/paimon-python/pypaimon/table/format/format_table.py @@ -26,8 +26,6 @@ PARTITION_DEFAULT_NAME_DEFAULT = "__DEFAULT_PARTITION__" from pypaimon.schema.table_schema import TableSchema from pypaimon.table.table import Table -from pypaimon.table.format.format_read_builder import FormatReadBuilder -from pypaimon.table.format.format_batch_write_builder import FormatBatchWriteBuilder class Format(str, Enum): @@ -99,10 +97,12 @@ def options(self) -> Dict[str, str]: def default_part_name(self) -> str: return self.options.get(PARTITION_DEFAULT_NAME_KEY, PARTITION_DEFAULT_NAME_DEFAULT) - def new_read_builder(self) -> FormatReadBuilder: + def new_read_builder(self): + from pypaimon.table.format.format_read_builder import FormatReadBuilder return FormatReadBuilder(self) - def new_batch_write_builder(self) -> FormatBatchWriteBuilder: + def new_batch_write_builder(self): + from pypaimon.table.format.format_batch_write_builder import FormatBatchWriteBuilder return FormatBatchWriteBuilder(self) def new_stream_write_builder(self): diff --git a/paimon-python/pypaimon/table/format/format_table_read.py b/paimon-python/pypaimon/table/format/format_table_read.py index 0448c5041f4b..5e136d8a234d 100644 --- a/paimon-python/pypaimon/table/format/format_table_read.py +++ b/paimon-python/pypaimon/table/format/format_table_read.py @@ -37,13 +37,28 @@ def _read_file_to_arrow( opts = {"max_read_options": {"block_size": 1 << 20}} try: with file_io.new_input_stream(path) as stream: - data = stream.read() + chunks = [] + while True: + chunk = stream.read() + if not chunk: + break + chunks.append(chunk if isinstance(chunk, bytes) else bytes(chunk)) + data = b"".join(chunks) except Exception as e: raise RuntimeError(f"Failed to read {path}") from e + if not data or len(data) == 0: + return pyarrow.table({}) + if fmt == Format.PARQUET: import io - tbl = pyarrow.parquet.read_table(io.BytesIO(data)) + data = bytes(data) if not isinstance(data, bytes) else data + if len(data) < 4 or data[:4] != b"PAR1": + return pyarrow.table({}) + try: + tbl = pyarrow.parquet.read_table(io.BytesIO(data)) + except pyarrow.ArrowInvalid: + return pyarrow.table({}) elif fmt == Format.CSV: tbl = pyarrow.csv.read_csv(pyarrow.BufferReader(data), **opts) elif fmt == Format.JSON: diff --git a/paimon-python/pypaimon/table/format/format_table_scan.py b/paimon-python/pypaimon/table/format/format_table_scan.py index 40ff8f072ac6..df0f59ee1806 100644 --- a/paimon-python/pypaimon/table/format/format_table_scan.py +++ b/paimon-python/pypaimon/table/format/format_table_scan.py @@ -45,8 +45,12 @@ def _list_data_files_recursive( return splits if not infos: return splits + path_rstrip = path.rstrip("/") for info in infos: name = info.path.split("/")[-1] if "/" in info.path else info.path + full_path = f"{path_rstrip}/{name}" if path_rstrip else name + if info.path.startswith("/") or info.path.startswith("file:"): + full_path = info.path if info.type == pafs.FileType.Directory: part_value = name if not partition_only_value and "=" in name: @@ -55,7 +59,7 @@ def _list_data_files_recursive( if len(child_parts) <= len(partition_keys): sub_splits = _list_data_files_recursive( file_io, - info.path, + full_path, base_path, partition_keys, partition_only_value, @@ -69,7 +73,7 @@ def _list_data_files_recursive( part_spec = dict(zip(partition_keys, rel_path_parts[: len(partition_keys)])) splits.append( FormatDataSplit( - file_path=info.path, + file_path=full_path, file_size=size if size is not None else 0, partition=part_spec, ) From bf157a4cb1b7995926f8bc46776e2ffd6c456c8f Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Fri, 30 Jan 2026 01:04:36 +0800 Subject: [PATCH 03/10] support text write and add test case --- .../table/format/format_table_read.py | 27 +- .../table/format/format_table_write.py | 49 ++- .../tests/rest/rest_format_table_test.py | 301 ++++++++++++++++++ 3 files changed, 367 insertions(+), 10 deletions(-) create mode 100644 paimon-python/pypaimon/tests/rest/rest_format_table_test.py diff --git a/paimon-python/pypaimon/table/format/format_table_read.py b/paimon-python/pypaimon/table/format/format_table_read.py index 5e136d8a234d..1d1e09835b1f 100644 --- a/paimon-python/pypaimon/table/format/format_table_read.py +++ b/paimon-python/pypaimon/table/format/format_table_read.py @@ -60,7 +60,12 @@ def _read_file_to_arrow( except pyarrow.ArrowInvalid: return pyarrow.table({}) elif fmt == Format.CSV: - tbl = pyarrow.csv.read_csv(pyarrow.BufferReader(data), **opts) + if hasattr(pyarrow, "csv"): + tbl = pyarrow.csv.read_csv(pyarrow.BufferReader(data), **opts) + else: + import io + df = pandas.read_csv(io.BytesIO(data)) + tbl = pyarrow.Table.from_pandas(df) elif fmt == Format.JSON: import json text = data.decode("utf-8") if isinstance(data, bytes) else data @@ -72,6 +77,26 @@ def _read_file_to_arrow( if not records: return pyarrow.table({}) tbl = pyarrow.Table.from_pylist(records) + elif fmt == Format.ORC: + import io + data = bytes(data) if not isinstance(data, bytes) else data + if hasattr(pyarrow, "orc"): + try: + tbl = pyarrow.orc.read_table(io.BytesIO(data)) + except Exception: + return pyarrow.table({}) + else: + raise ValueError( + "Format table read for ORC requires PyArrow with ORC support (pyarrow.orc)" + ) + elif fmt == Format.TEXT: + text = data.decode("utf-8") if isinstance(data, bytes) else data + line_delimiter = "\n" + lines = text.rstrip(line_delimiter).split(line_delimiter) if text else [] + if not lines: + return pyarrow.table({}) + col_name = "value" if not read_fields else read_fields[0] + tbl = pyarrow.table({col_name: lines}) else: raise ValueError(f"Format {fmt} read not implemented in Python") diff --git a/paimon-python/pypaimon/table/format/format_table_write.py b/paimon-python/pypaimon/table/format/format_table_write.py index e3ae21d7b210..bf8b8ddcac86 100644 --- a/paimon-python/pypaimon/table/format/format_table_write.py +++ b/paimon-python/pypaimon/table/format/format_table_write.py @@ -64,9 +64,13 @@ def __init__(self, table: FormatTable, overwrite: bool = False): ) self._file_format = table.format() self._data_file_prefix = "data-" - self._suffix = {"parquet": ".parquet", "csv": ".csv", "json": ".json"}.get( - self._file_format.value, ".parquet" - ) + self._suffix = { + "parquet": ".parquet", + "csv": ".csv", + "json": ".json", + "orc": ".orc", + "text": ".txt", + }.get(self._file_format.value, ".parquet") def write_arrow(self, data: pyarrow.Table) -> None: for batch in data.to_batches(): @@ -128,12 +132,15 @@ def _write_single_batch( ) raw = buf.getvalue() elif fmt == Format.CSV: - buf = io.BytesIO() - pyarrow.csv.write_csv( - pyarrow.Table.from_batches([data]), - buf, - ) - raw = buf.getvalue() + tbl = pyarrow.Table.from_batches([data]) + if hasattr(pyarrow, "csv"): + buf = io.BytesIO() + pyarrow.csv.write_csv(tbl, buf) + raw = buf.getvalue() + else: + buf = io.StringIO() + tbl.to_pandas().to_csv(buf, index=False) + raw = buf.getvalue().encode("utf-8") elif fmt == Format.JSON: tbl = pyarrow.Table.from_batches([data]) import json @@ -142,6 +149,30 @@ def _write_single_batch( row = {tbl.column_names[j]: tbl.column(j)[i].as_py() for j in range(tbl.num_columns)} lines.append(json.dumps(row) + "\n") raw = "".join(lines).encode("utf-8") + elif fmt == Format.ORC: + tbl = pyarrow.Table.from_batches([data]) + if hasattr(pyarrow, "orc"): + buf = io.BytesIO() + pyarrow.orc.write_table(tbl, buf) + raw = buf.getvalue() + else: + raise ValueError( + "Format table write for ORC requires PyArrow with ORC support (pyarrow.orc)" + ) + elif fmt == Format.TEXT: + tbl = pyarrow.Table.from_batches([data]) + if tbl.num_columns != 1 or not pyarrow.types.is_string(tbl.schema.field(0).type): + raise ValueError( + "TEXT format only supports a single string column, " + f"got {tbl.num_columns} columns" + ) + line_delimiter = self.table.options.get("text.line-delimiter", "\n") + lines = [] + col = tbl.column(0) + for i in range(tbl.num_rows): + val = col[i] + lines.append((val.as_py() if val is not None else "") + line_delimiter) + raw = "".join(lines).encode("utf-8") else: raise ValueError(f"Format table write not implemented for {fmt}") diff --git a/paimon-python/pypaimon/tests/rest/rest_format_table_test.py b/paimon-python/pypaimon/tests/rest/rest_format_table_test.py new file mode 100644 index 000000000000..0fea93dd436f --- /dev/null +++ b/paimon-python/pypaimon/tests/rest/rest_format_table_test.py @@ -0,0 +1,301 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import unittest + +import pandas as pd +import pyarrow as pa +from parameterized import parameterized + +from pypaimon import Schema +from pypaimon.catalog.catalog_exception import TableNotExistException +from pypaimon.table.format import FormatTable +from pypaimon.tests.rest.rest_base_test import RESTBaseTest + + +def _format_table_read_write_formats(): + formats = [("parquet",), ("csv",), ("json",)] + if hasattr(pa, "orc"): + formats.append(("orc",)) + return formats + + +class RESTFormatTableTest(RESTBaseTest): + + @parameterized.expand(_format_table_read_write_formats()) + def test_format_table_read_write(self, file_format): + pa_schema = pa.schema([ + ("a", pa.int32()), + ("b", pa.int32()), + ("c", pa.int32()), + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, + options={"type": "format-table", "file.format": file_format}, + ) + table_name = f"default.format_table_rw_{file_format}" + try: + self.rest_catalog.drop_table(table_name, True) + except Exception: + pass + self.rest_catalog.create_table(table_name, schema, False) + table = self.rest_catalog.get_table(table_name) + self.assertIsInstance(table, FormatTable) + self.assertEqual(table.format().value, file_format) + + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + df = pd.DataFrame({ + "a": [10, 10], + "b": [1, 2], + "c": [1, 2], + }) + table_write.write_pandas(df) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + read_builder = table.new_read_builder() + splits = read_builder.new_scan().plan().splits() + table_read = read_builder.new_read() + actual = table_read.to_pandas(splits).sort_values(by="b").reset_index(drop=True) + expected = pa.Table.from_pydict( + {"a": [10, 10], "b": [1, 2], "c": [1, 2]}, + schema=pa_schema, + ).to_pandas() + for col in expected.columns: + if col in actual.columns and actual[col].dtype != expected[col].dtype: + actual[col] = actual[col].astype(expected[col].dtype) + pd.testing.assert_frame_equal(actual, expected) + + def test_format_table_text_read_write(self): + pa_schema = pa.schema([("value", pa.string())]) + schema = Schema.from_pyarrow_schema( + pa_schema, + options={"type": "format-table", "file.format": "text"}, + ) + table_name = "default.format_table_rw_text" + try: + self.rest_catalog.drop_table(table_name, True) + except Exception: + pass + self.rest_catalog.create_table(table_name, schema, False) + table = self.rest_catalog.get_table(table_name) + self.assertIsInstance(table, FormatTable) + self.assertEqual(table.format().value, "text") + + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + df = pd.DataFrame({"value": ["hello", "world"]}) + table_write.write_pandas(df) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + read_builder = table.new_read_builder() + splits = read_builder.new_scan().plan().splits() + table_read = read_builder.new_read() + actual = table_read.to_pandas(splits).sort_values(by="value").reset_index(drop=True) + expected = pd.DataFrame({"value": ["hello", "world"]}) + pd.testing.assert_frame_equal(actual, expected) + + @parameterized.expand(_format_table_read_write_formats()) + def test_format_table_partitioned_overwrite(self, file_format): + pa_schema = pa.schema([ + ("a", pa.int32()), + ("b", pa.int32()), + ("c", pa.int32()), + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, + partition_keys=["c"], + options={"type": "format-table", "file.format": file_format}, + ) + table_name = f"default.format_table_partitioned_overwrite_{file_format}" + self.rest_catalog.drop_table(table_name, True) + self.rest_catalog.create_table(table_name, schema, False) + table = self.rest_catalog.get_table(table_name) + + write_builder = table.new_batch_write_builder() + tw = write_builder.new_write() + tc = write_builder.new_commit() + tw.write_pandas(pd.DataFrame({"a": [10, 10], "b": [10, 20], "c": [1, 1]})) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + tw = table.new_batch_write_builder().overwrite({"c": 1}).new_write() + tc = table.new_batch_write_builder().overwrite({"c": 1}).new_commit() + tw.write_pandas(pd.DataFrame({"a": [12, 12], "b": [100, 200], "c": [1, 1]})) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + read_builder = table.new_read_builder() + splits = read_builder.new_scan().plan().splits() + actual = read_builder.new_read().to_pandas(splits).sort_values(by="b") + self.assertEqual(len(actual), 2) + self.assertEqual(actual["b"].tolist(), [100, 200]) + + @parameterized.expand(_format_table_read_write_formats()) + def test_format_table_partitioned_read_write(self, file_format): + pa_schema = pa.schema([ + ("a", pa.int32()), + ("b", pa.int32()), + ("dt", pa.int32()), + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, + partition_keys=["dt"], + options={"type": "format-table", "file.format": file_format}, + ) + table_name = f"default.format_table_partitioned_rw_{file_format}" + self.rest_catalog.drop_table(table_name, True) + self.rest_catalog.create_table(table_name, schema, False) + table = self.rest_catalog.get_table(table_name) + self.assertIsInstance(table, FormatTable) + + wb = table.new_batch_write_builder() + tw = wb.new_write() + tc = wb.new_commit() + tw.write_pandas(pd.DataFrame({"a": [1, 2], "b": [10, 20], "dt": [10, 10]})) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + tw = wb.new_write() + tc = wb.new_commit() + tw.write_pandas(pd.DataFrame({"a": [3, 4], "b": [30, 40], "dt": [11, 11]})) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + rb = table.new_read_builder() + splits_all = rb.new_scan().plan().splits() + actual_all = rb.new_read().to_pandas(splits_all).sort_values(by="b") + self.assertEqual(len(actual_all), 4) + self.assertEqual(sorted(actual_all["b"].tolist()), [10, 20, 30, 40]) + + rb_dt10 = table.new_read_builder().with_partition_filter({"dt": "10"}) + splits_dt10 = rb_dt10.new_scan().plan().splits() + actual_dt10 = rb_dt10.new_read().to_pandas(splits_dt10).sort_values(by="b") + self.assertEqual(len(actual_dt10), 2) + self.assertEqual(actual_dt10["b"].tolist(), [10, 20]) + + @parameterized.expand(_format_table_read_write_formats()) + def test_format_table_full_overwrite(self, file_format): + pa_schema = pa.schema([ + ("a", pa.int32()), + ("b", pa.int32()), + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, + options={"type": "format-table", "file.format": file_format}, + ) + table_name = f"default.format_table_full_overwrite_{file_format}" + self.rest_catalog.drop_table(table_name, True) + self.rest_catalog.create_table(table_name, schema, False) + table = self.rest_catalog.get_table(table_name) + + wb = table.new_batch_write_builder() + tw = wb.new_write() + tc = wb.new_commit() + tw.write_pandas(pd.DataFrame({"a": [1, 2], "b": [10, 20]})) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + tw = wb.overwrite().new_write() + tc = wb.overwrite().new_commit() + tw.write_pandas(pd.DataFrame({"a": [3], "b": [30]})) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + rb = table.new_read_builder() + splits = rb.new_scan().plan().splits() + actual = rb.new_read().to_pandas(splits) + self.assertEqual(len(actual), 1) + self.assertEqual(actual["b"].tolist(), [30]) + + @parameterized.expand(_format_table_read_write_formats()) + def test_format_table_split_read(self, file_format): + pa_schema = pa.schema([ + ("id", pa.int32()), + ("name", pa.string()), + ("score", pa.float64()), + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, + options={ + "type": "format-table", + "file.format": file_format, + "source.split.target-size": "54", + }, + ) + table_name = f"default.format_table_split_read_{file_format}" + self.rest_catalog.drop_table(table_name, True) + self.rest_catalog.create_table(table_name, schema, False) + table = self.rest_catalog.get_table(table_name) + + size = 50 + for i in range(0, size, 10): + batch = pd.DataFrame({ + "id": list(range(i, min(i + 10, size))), + "name": [f"User{j}" for j in range(i, min(i + 10, size))], + "score": [85.5 + (j % 15) for j in range(i, min(i + 10, size))], + }) + wb = table.new_batch_write_builder() + tw = wb.new_write() + tc = wb.new_commit() + tw.write_pandas(batch) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + rb = table.new_read_builder() + splits = rb.new_scan().plan().splits() + actual = rb.new_read().to_pandas(splits).sort_values(by="id") + self.assertEqual(len(actual), size) + self.assertEqual(actual["id"].tolist(), list(range(size))) + + @parameterized.expand(_format_table_read_write_formats()) + def test_format_table_catalog(self, file_format): + pa_schema = pa.schema([ + ("str", pa.string()), + ("int", pa.int32()), + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, + options={"type": "format-table", "file.format": file_format}, + ) + table_name = f"default.format_table_catalog_{file_format}" + self.rest_catalog.drop_table(table_name, True) + self.rest_catalog.create_table(table_name, schema, False) + self.assertIn(f"format_table_catalog_{file_format}", self.rest_catalog.list_tables("default")) + table = self.rest_catalog.get_table(table_name) + self.assertIsInstance(table, FormatTable) + + self.rest_catalog.drop_table(table_name, False) + with self.assertRaises(TableNotExistException): + self.rest_catalog.get_table(table_name) + + +if __name__ == "__main__": + unittest.main() From 55553005f692fc23f13b388fea2e3c8b0b04dff4 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Fri, 30 Jan 2026 01:33:12 +0800 Subject: [PATCH 04/10] fix self.options typeerror and so on --- .../pypaimon/table/format/__init__.py | 1 - .../format/format_batch_write_builder.py | 6 +- .../pypaimon/table/format/format_table.py | 13 +--- .../table/format/format_table_commit.py | 12 +-- .../table/format/format_table_read.py | 17 ++++- .../table/format/format_table_scan.py | 2 +- .../table/format/format_table_write.py | 7 +- .../tests/rest/rest_format_table_test.py | 74 +++++++++++++++++++ 8 files changed, 98 insertions(+), 34 deletions(-) diff --git a/paimon-python/pypaimon/table/format/__init__.py b/paimon-python/pypaimon/table/format/__init__.py index ab6995a69607..228f165248ed 100644 --- a/paimon-python/pypaimon/table/format/__init__.py +++ b/paimon-python/pypaimon/table/format/__init__.py @@ -22,7 +22,6 @@ from pypaimon.table.format.format_batch_write_builder import FormatBatchWriteBuilder from pypaimon.table.format.format_table_write import FormatTableWrite from pypaimon.table.format.format_table_commit import FormatTableCommit -from pypaimon.table.format.format_commit_message import FormatTableCommitMessage __all__ = [ "FormatDataSplit", diff --git a/paimon-python/pypaimon/table/format/format_batch_write_builder.py b/paimon-python/pypaimon/table/format/format_batch_write_builder.py index 7fa101e81329..77477b388ba0 100644 --- a/paimon-python/pypaimon/table/format/format_batch_write_builder.py +++ b/paimon-python/pypaimon/table/format/format_batch_write_builder.py @@ -48,8 +48,4 @@ def new_write(self) -> FormatTableWrite: return FormatTableWrite(self.table, overwrite=self._overwrite) def new_commit(self) -> FormatTableCommit: - return FormatTableCommit( - table=self.table, - overwrite=self._overwrite, - static_partitions=self._static_partition, - ) + return FormatTableCommit(table=self.table) diff --git a/paimon-python/pypaimon/table/format/format_table.py b/paimon-python/pypaimon/table/format/format_table.py index 86204392d814..564bd086255d 100644 --- a/paimon-python/pypaimon/table/format/format_table.py +++ b/paimon-python/pypaimon/table/format/format_table.py @@ -19,11 +19,6 @@ from pypaimon.common.file_io import FileIO from pypaimon.common.identifier import Identifier -from pypaimon.common.options.core_options import CoreOptions -from pypaimon.common.options.options import Options - -PARTITION_DEFAULT_NAME_KEY = "partition.default-name" -PARTITION_DEFAULT_NAME_DEFAULT = "__DEFAULT_PARTITION__" from pypaimon.schema.table_schema import TableSchema from pypaimon.table.table import Table @@ -63,13 +58,12 @@ def __init__( self._table_schema = table_schema self._location = location.rstrip("/") self._format = format - self.options = options or dict(table_schema.options) + self._options = options or dict(table_schema.options) self.comment = comment self.fields = table_schema.fields self.field_names = [f.name for f in self.fields] self.partition_keys = table_schema.partition_keys or [] self.primary_keys: List[str] = [] # format table has no primary key - self._core_options = CoreOptions(Options(self.options)) def name(self) -> str: return self.identifier.get_table_name() @@ -92,10 +86,7 @@ def format(self) -> Format: return self._format def options(self) -> Dict[str, str]: - return self.options - - def default_part_name(self) -> str: - return self.options.get(PARTITION_DEFAULT_NAME_KEY, PARTITION_DEFAULT_NAME_DEFAULT) + return self._options def new_read_builder(self): from pypaimon.table.format.format_read_builder import FormatReadBuilder diff --git a/paimon-python/pypaimon/table/format/format_table_commit.py b/paimon-python/pypaimon/table/format/format_table_commit.py index 5009e4bdc7da..d869744590bd 100644 --- a/paimon-python/pypaimon/table/format/format_table_commit.py +++ b/paimon-python/pypaimon/table/format/format_table_commit.py @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import List, Optional +from typing import List import pyarrow.fs as pafs @@ -41,16 +41,10 @@ def _delete_data_files_in_path(file_io, path: str) -> None: class FormatTableCommit: + """Commit for format table. Overwrite is applied in FormatTableWrite at write time.""" - def __init__( - self, - table: FormatTable, - overwrite: bool = False, - static_partitions: Optional[dict] = None, - ): + def __init__(self, table: FormatTable): self.table = table - self.overwrite = overwrite - self.static_partitions = static_partitions or {} self._committed = False def commit(self, commit_messages: List[FormatTableCommitMessage]) -> None: diff --git a/paimon-python/pypaimon/table/format/format_table_read.py b/paimon-python/pypaimon/table/format/format_table_read.py index 1d1e09835b1f..b341e7fbbe35 100644 --- a/paimon-python/pypaimon/table/format/format_table_read.py +++ b/paimon-python/pypaimon/table/format/format_table_read.py @@ -32,9 +32,9 @@ def _read_file_to_arrow( read_fields: Optional[List[str]], ) -> pyarrow.Table: path = split.data_path() - opts = {} - if fmt == Format.CSV: - opts = {"max_read_options": {"block_size": 1 << 20}} + csv_read_options = None + if fmt == Format.CSV and hasattr(pyarrow, "csv"): + csv_read_options = pyarrow.csv.ReadOptions(block_size=1 << 20) try: with file_io.new_input_stream(path) as stream: chunks = [] @@ -61,7 +61,10 @@ def _read_file_to_arrow( return pyarrow.table({}) elif fmt == Format.CSV: if hasattr(pyarrow, "csv"): - tbl = pyarrow.csv.read_csv(pyarrow.BufferReader(data), **opts) + tbl = pyarrow.csv.read_csv( + pyarrow.BufferReader(data), + read_options=csv_read_options, + ) else: import io df = pandas.read_csv(io.BytesIO(data)) @@ -176,7 +179,10 @@ def to_iterator( self, splits: List[FormatDataSplit], ) -> Iterator[Any]: + n_yielded = 0 for split in splits: + if self.limit is not None and n_yielded >= self.limit: + break t = _read_file_to_arrow( self.table.file_io, split, @@ -186,4 +192,7 @@ def to_iterator( ) for batch in t.to_batches(): for i in range(batch.num_rows): + if self.limit is not None and n_yielded >= self.limit: + return yield batch.slice(i, 1) + n_yielded += 1 diff --git a/paimon-python/pypaimon/table/format/format_table_scan.py b/paimon-python/pypaimon/table/format/format_table_scan.py index df0f59ee1806..39a600961c17 100644 --- a/paimon-python/pypaimon/table/format/format_table_scan.py +++ b/paimon-python/pypaimon/table/format/format_table_scan.py @@ -94,7 +94,7 @@ def __init__( self.limit = limit def plan(self) -> Plan: - partition_only_value = self.table.options.get( + partition_only_value = self.table.options().get( "format-table.partition-path-only-value", "false" ).lower() == "true" splits = _list_data_files_recursive( diff --git a/paimon-python/pypaimon/table/format/format_table_write.py b/paimon-python/pypaimon/table/format/format_table_write.py index bf8b8ddcac86..b0b03fa822d7 100644 --- a/paimon-python/pypaimon/table/format/format_table_write.py +++ b/paimon-python/pypaimon/table/format/format_table_write.py @@ -60,7 +60,7 @@ def __init__(self, table: FormatTable, overwrite: bool = False): self._overwrite = overwrite self._written_paths: List[str] = [] self._partition_only_value = ( - table.options.get("format-table.partition-path-only-value", "false").lower() == "true" + table.options().get("format-table.partition-path-only-value", "false").lower() == "true" ) self._file_format = table.format() self._data_file_prefix = "data-" @@ -166,12 +166,13 @@ def _write_single_batch( "TEXT format only supports a single string column, " f"got {tbl.num_columns} columns" ) - line_delimiter = self.table.options.get("text.line-delimiter", "\n") + line_delimiter = self.table.options().get("text.line-delimiter", "\n") lines = [] col = tbl.column(0) for i in range(tbl.num_rows): val = col[i] - lines.append((val.as_py() if val is not None else "") + line_delimiter) + py_val = val.as_py() if hasattr(val, "as_py") else val + lines.append(("" if py_val is None else str(py_val)) + line_delimiter) raw = "".join(lines).encode("utf-8") else: raise ValueError(f"Format table write not implemented for {fmt}") diff --git a/paimon-python/pypaimon/tests/rest/rest_format_table_test.py b/paimon-python/pypaimon/tests/rest/rest_format_table_test.py index 0fea93dd436f..883ec30a4286 100644 --- a/paimon-python/pypaimon/tests/rest/rest_format_table_test.py +++ b/paimon-python/pypaimon/tests/rest/rest_format_table_test.py @@ -56,6 +56,9 @@ def test_format_table_read_write(self, file_format): table = self.rest_catalog.get_table(table_name) self.assertIsInstance(table, FormatTable) self.assertEqual(table.format().value, file_format) + opts = table.options() + self.assertIsInstance(opts, dict) + self.assertEqual(opts.get("file.format"), file_format) write_builder = table.new_batch_write_builder() table_write = write_builder.new_write() @@ -98,6 +101,9 @@ def test_format_table_text_read_write(self): table = self.rest_catalog.get_table(table_name) self.assertIsInstance(table, FormatTable) self.assertEqual(table.format().value, "text") + opts = table.options() + self.assertIsInstance(opts, dict) + self.assertEqual(opts.get("file.format"), "text") write_builder = table.new_batch_write_builder() table_write = write_builder.new_write() @@ -115,6 +121,74 @@ def test_format_table_text_read_write(self): expected = pd.DataFrame({"value": ["hello", "world"]}) pd.testing.assert_frame_equal(actual, expected) + def test_format_table_text_read_write_with_nulls(self): + """TEXT format: null string values are written as empty string and read back as empty.""" + pa_schema = pa.schema([("value", pa.string())]) + schema = Schema.from_pyarrow_schema( + pa_schema, + options={"type": "format-table", "file.format": "text"}, + ) + table_name = "default.format_table_rw_text_nulls" + try: + self.rest_catalog.drop_table(table_name, True) + except Exception: + pass + self.rest_catalog.create_table(table_name, schema, False) + table = self.rest_catalog.get_table(table_name) + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + df = pd.DataFrame({"value": ["hello", None, "world"]}) + table_write.write_pandas(df) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + read_builder = table.new_read_builder() + splits = read_builder.new_scan().plan().splits() + table_read = read_builder.new_read() + actual = table_read.to_pandas(splits) + self.assertEqual(actual.shape[0], 3) + # Nulls are written as empty string; read back as "" + self.assertEqual(set(actual["value"].fillna("").astype(str)), {"", "hello", "world"}) + self.assertIn("", actual["value"].values) + + def test_format_table_read_with_limit_to_iterator(self): + """with_limit(N) must be respected by to_iterator (same as to_arrow/to_pandas).""" + pa_schema = pa.schema([ + ("a", pa.int32()), + ("b", pa.int32()), + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, + options={"type": "format-table", "file.format": "parquet"}, + ) + table_name = "default.format_table_limit_iterator" + try: + self.rest_catalog.drop_table(table_name, True) + except Exception: + pass + self.rest_catalog.create_table(table_name, schema, False) + table = self.rest_catalog.get_table(table_name) + write_builder = table.new_batch_write_builder() + tw = write_builder.new_write() + tc = write_builder.new_commit() + tw.write_pandas(pd.DataFrame({"a": [1, 2, 3, 4], "b": [10, 20, 30, 40]})) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + splits = table.new_read_builder().new_scan().plan().splits() + limit = 2 + read_builder = table.new_read_builder().with_limit(limit) + table_read = read_builder.new_read() + + df = table_read.to_pandas(splits) + self.assertEqual(len(df), limit, "to_pandas must respect with_limit(2)") + + batches = list(table_read.to_iterator(splits)) + self.assertEqual(len(batches), limit, "to_iterator must respect with_limit(2)") + @parameterized.expand(_format_table_read_write_formats()) def test_format_table_partitioned_overwrite(self, file_format): pa_schema = pa.schema([ From 725ce8fddaf4bc11aae60736ec153f8d2e5c6914 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Fri, 30 Jan 2026 02:04:46 +0800 Subject: [PATCH 05/10] support with_partition_filter --- .../pypaimon/read/push_down_utils.py | 19 +- .../format/format_batch_write_builder.py | 6 +- .../table/format/format_data_split.py | 4 +- .../table/format/format_read_builder.py | 13 +- .../table/format/format_table_write.py | 31 ++- .../tests/rest/rest_format_table_test.py | 201 ++++++++++++++++++ 6 files changed, 263 insertions(+), 11 deletions(-) diff --git a/paimon-python/pypaimon/read/push_down_utils.py b/paimon-python/pypaimon/read/push_down_utils.py index f8123411490c..7ad7e53acccd 100644 --- a/paimon-python/pypaimon/read/push_down_utils.py +++ b/paimon-python/pypaimon/read/push_down_utils.py @@ -16,12 +16,29 @@ # limitations under the License. ################################################################################ -from typing import Dict, List, Set +from typing import Dict, List, Optional, Set from pypaimon.common.predicate import Predicate from pypaimon.common.predicate_builder import PredicateBuilder +def extract_partition_spec_from_predicate( + predicate: Predicate, partition_keys: List[str] +) -> Optional[Dict[str, str]]: + if not predicate or not partition_keys: + return None + parts = _split_and(predicate) + spec: Dict[str, str] = {} + for p in parts: + if p.method != "equal" or p.field is None or p.literals is None or len(p.literals) != 1: + continue + if p.field in partition_keys: + spec[p.field] = str(p.literals[0]) + if set(spec.keys()) == set(partition_keys): + return spec + return None + + def trim_and_transform_predicate(input_predicate: Predicate, all_fields: List[str], trimmed_keys: List[str]): new_predicate = trim_predicate_by_fields(input_predicate, trimmed_keys) part_to_index = {element: idx for idx, element in enumerate(trimmed_keys)} diff --git a/paimon-python/pypaimon/table/format/format_batch_write_builder.py b/paimon-python/pypaimon/table/format/format_batch_write_builder.py index 77477b388ba0..31d865020a13 100644 --- a/paimon-python/pypaimon/table/format/format_batch_write_builder.py +++ b/paimon-python/pypaimon/table/format/format_batch_write_builder.py @@ -45,7 +45,11 @@ def _validate_static_partition(self, static_partition: Optional[dict]) -> None: raise ValueError(f"Unknown static partition column: {key}") def new_write(self) -> FormatTableWrite: - return FormatTableWrite(self.table, overwrite=self._overwrite) + return FormatTableWrite( + self.table, + overwrite=self._overwrite, + static_partitions=self._static_partition, + ) def new_commit(self) -> FormatTableCommit: return FormatTableCommit(table=self.table) diff --git a/paimon-python/pypaimon/table/format/format_data_split.py b/paimon-python/pypaimon/table/format/format_data_split.py index b078e490d011..8536a18025d6 100644 --- a/paimon-python/pypaimon/table/format/format_data_split.py +++ b/paimon-python/pypaimon/table/format/format_data_split.py @@ -20,10 +20,10 @@ @dataclass(frozen=True) class FormatDataSplit: + """Split for format table: one file (or future: byte range) per split.""" + file_path: str file_size: int - offset: int = 0 - length: Optional[int] = None # None means read whole file partition: Optional[Dict[str, Any]] = None # partition column name -> value def data_path(self) -> str: diff --git a/paimon-python/pypaimon/table/format/format_read_builder.py b/paimon-python/pypaimon/table/format/format_read_builder.py index 9cd62272b0a5..60537d454381 100644 --- a/paimon-python/pypaimon/table/format/format_read_builder.py +++ b/paimon-python/pypaimon/table/format/format_read_builder.py @@ -18,6 +18,7 @@ from pypaimon.common.predicate import Predicate from pypaimon.common.predicate_builder import PredicateBuilder +from pypaimon.read.push_down_utils import extract_partition_spec_from_predicate from pypaimon.schema.data_types import DataField from pypaimon.table.format.format_table import FormatTable from pypaimon.table.format.format_table_scan import FormatTableScan @@ -32,8 +33,16 @@ def __init__(self, table: FormatTable): self._partition_filter: Optional[dict] = None def with_filter(self, predicate: Predicate) -> "FormatReadBuilder": - # Format table supports partition filter only; data predicate applied in read - self._partition_filter = None # could extract partition from predicate + """ + Store predicate and, when table has partition keys and no partition filter is set, + try to extract partition spec from predicate (AND of equality on partition columns) + and set partition filter for scan, aligned with Java FormatReadBuilder.withFilter. + Data predicate is not yet applied in read (FormatTableRead does not support filter). + """ + if self._partition_filter is None and self.table.partition_keys and predicate: + spec = extract_partition_spec_from_predicate(predicate, self.table.partition_keys) + if spec is not None: + self._partition_filter = spec return self def with_projection(self, projection: List[str]) -> "FormatReadBuilder": diff --git a/paimon-python/pypaimon/table/format/format_table_write.py b/paimon-python/pypaimon/table/format/format_table_write.py index b0b03fa822d7..62978336a21e 100644 --- a/paimon-python/pypaimon/table/format/format_table_write.py +++ b/paimon-python/pypaimon/table/format/format_table_write.py @@ -17,7 +17,7 @@ import io import uuid from collections import defaultdict -from typing import List +from typing import Dict, List, Optional import pyarrow @@ -55,10 +55,17 @@ def _partition_from_row( class FormatTableWrite: """Batch write for format table: writes Arrow/Pandas to partition dirs as files.""" - def __init__(self, table: FormatTable, overwrite: bool = False): + def __init__( + self, + table: FormatTable, + overwrite: bool = False, + static_partitions: Optional[Dict[str, str]] = None, + ): self.table = table self._overwrite = overwrite + self._static_partitions = static_partitions if static_partitions is not None else {} self._written_paths: List[str] = [] + self._overwritten_dirs: set = set() self._partition_only_value = ( table.options().get("format-table.partition-path-only-value", "false").lower() == "true" ) @@ -115,9 +122,19 @@ def _write_single_batch( dir_path = f"{location}/{part_path}" else: dir_path = location - if self._overwrite and self.table.file_io.exists(dir_path): - from pypaimon.table.format.format_table_commit import _delete_data_files_in_path - _delete_data_files_in_path(self.table.file_io, dir_path) + # When overwrite: clear each partition dir only once per write session (first write to it) + if self._overwrite and dir_path not in self._overwritten_dirs and self.table.file_io.exists(dir_path): + should_delete = ( + not self._static_partitions + or all( + str(partition_spec.get(k)) == str(v) + for k, v in self._static_partitions.items() + ) + ) + if should_delete: + from pypaimon.table.format.format_table_commit import _delete_data_files_in_path + _delete_data_files_in_path(self.table.file_io, dir_path) + self._overwritten_dirs.add(dir_path) self.table.file_io.check_or_mkdirs(dir_path) file_name = f"{self._data_file_prefix}{uuid.uuid4().hex}{self._suffix}" path = f"{dir_path}/{file_name}" @@ -161,6 +178,10 @@ def _write_single_batch( ) elif fmt == Format.TEXT: tbl = pyarrow.Table.from_batches([data]) + partition_keys = self.table.partition_keys + if partition_keys: + data_cols = [c for c in tbl.column_names if c not in partition_keys] + tbl = tbl.select(data_cols) if tbl.num_columns != 1 or not pyarrow.types.is_string(tbl.schema.field(0).type): raise ValueError( "TEXT format only supports a single string column, " diff --git a/paimon-python/pypaimon/tests/rest/rest_format_table_test.py b/paimon-python/pypaimon/tests/rest/rest_format_table_test.py index 883ec30a4286..223c6efe18d6 100644 --- a/paimon-python/pypaimon/tests/rest/rest_format_table_test.py +++ b/paimon-python/pypaimon/tests/rest/rest_format_table_test.py @@ -153,6 +153,43 @@ def test_format_table_text_read_write_with_nulls(self): self.assertEqual(set(actual["value"].fillna("").astype(str)), {"", "hello", "world"}) self.assertIn("", actual["value"].values) + def test_format_table_text_partitioned_read_write(self): + """Partitioned TEXT table: partition columns are stripped before write; read back with partition.""" + pa_schema = pa.schema([ + ("value", pa.string()), + ("dt", pa.int32()), + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, + partition_keys=["dt"], + options={"type": "format-table", "file.format": "text"}, + ) + table_name = "default.format_table_rw_text_partitioned" + try: + self.rest_catalog.drop_table(table_name, True) + except Exception: + pass + self.rest_catalog.create_table(table_name, schema, False) + table = self.rest_catalog.get_table(table_name) + self.assertIsInstance(table, FormatTable) + self.assertEqual(table.format().value, "text") + + write_builder = table.new_batch_write_builder() + tw = write_builder.new_write() + tc = write_builder.new_commit() + tw.write_pandas(pd.DataFrame({"value": ["a", "b"], "dt": [1, 1]})) + tw.write_pandas(pd.DataFrame({"value": ["c"], "dt": [2]})) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + read_builder = table.new_read_builder() + splits = read_builder.new_scan().plan().splits() + actual = read_builder.new_read().to_pandas(splits).sort_values(by=["dt", "value"]).reset_index(drop=True) + self.assertEqual(actual.shape[0], 3) + self.assertEqual(actual["value"].tolist(), ["a", "b", "c"]) + self.assertEqual(actual["dt"].tolist(), [1, 1, 2]) + def test_format_table_read_with_limit_to_iterator(self): """with_limit(N) must be respected by to_iterator (same as to_arrow/to_pandas).""" pa_schema = pa.schema([ @@ -227,6 +264,93 @@ def test_format_table_partitioned_overwrite(self, file_format): self.assertEqual(len(actual), 2) self.assertEqual(actual["b"].tolist(), [100, 200]) + def test_format_table_overwrite_only_specified_partition(self): + """overwrite(static_partition) must only clear that partition; other partitions unchanged.""" + pa_schema = pa.schema([ + ("a", pa.int32()), + ("b", pa.int32()), + ("c", pa.int32()), + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, + partition_keys=["c"], + options={"type": "format-table", "file.format": "parquet"}, + ) + table_name = "default.format_table_overwrite_one_partition" + try: + self.rest_catalog.drop_table(table_name, True) + except Exception: + pass + self.rest_catalog.create_table(table_name, schema, False) + table = self.rest_catalog.get_table(table_name) + + wb = table.new_batch_write_builder() + tw = wb.new_write() + tc = wb.new_commit() + tw.write_pandas(pd.DataFrame({"a": [10, 10], "b": [10, 20], "c": [1, 1]})) + tw.write_pandas(pd.DataFrame({"a": [30, 30], "b": [30, 40], "c": [2, 2]})) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + tw = table.new_batch_write_builder().overwrite({"c": 1}).new_write() + tc = table.new_batch_write_builder().overwrite({"c": 1}).new_commit() + tw.write_pandas(pd.DataFrame({"a": [12, 12], "b": [100, 200], "c": [1, 1]})) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + actual = table.new_read_builder().new_read().to_pandas( + table.new_read_builder().new_scan().plan().splits() + ).sort_values(by=["c", "b"]) + self.assertEqual(len(actual), 4) + self.assertEqual(actual["b"].tolist(), [100, 200, 30, 40]) + self.assertEqual(actual["c"].tolist(), [1, 1, 2, 2]) + c1 = actual[actual["c"] == 1]["b"].tolist() + c2 = actual[actual["c"] == 2]["b"].tolist() + self.assertEqual(c1, [100, 200], "partition c=1 must be overwritten") + self.assertEqual(c2, [30, 40], "partition c=2 must be unchanged") + + def test_format_table_overwrite_multiple_batches_same_partition(self): + """Overwrite mode must clear partition dir only once; multiple batches same partition keep all data.""" + pa_schema = pa.schema([ + ("a", pa.int32()), + ("b", pa.int32()), + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, + options={"type": "format-table", "file.format": "parquet"}, + ) + table_name = "default.format_table_overwrite_multi_batch" + try: + self.rest_catalog.drop_table(table_name, True) + except Exception: + pass + self.rest_catalog.create_table(table_name, schema, False) + table = self.rest_catalog.get_table(table_name) + + wb = table.new_batch_write_builder() + tw = wb.new_write() + tc = wb.new_commit() + tw.write_pandas(pd.DataFrame({"a": [1, 2], "b": [10, 20]})) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + tw = wb.overwrite().new_write() + tc = wb.overwrite().new_commit() + tw.write_pandas(pd.DataFrame({"a": [3, 4], "b": [30, 40]})) + tw.write_pandas(pd.DataFrame({"a": [5, 6], "b": [50, 60]})) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + actual = table.new_read_builder().new_read().to_pandas( + table.new_read_builder().new_scan().plan().splits() + ).sort_values(by="b") + self.assertEqual(len(actual), 4, "overwrite + 2 write_pandas same partition must keep all 4 rows") + self.assertEqual(actual["b"].tolist(), [30, 40, 50, 60]) + @parameterized.expand(_format_table_read_write_formats()) def test_format_table_partitioned_read_write(self, file_format): pa_schema = pa.schema([ @@ -272,6 +396,83 @@ def test_format_table_partitioned_read_write(self, file_format): self.assertEqual(len(actual_dt10), 2) self.assertEqual(actual_dt10["b"].tolist(), [10, 20]) + def test_format_table_with_filter_extracts_partition_like_java(self): + """with_filter(partition equality) extracts partition like Java; does not overwrite partition filter.""" + pa_schema = pa.schema([ + ("a", pa.int32()), + ("b", pa.int32()), + ("dt", pa.int32()), + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, + partition_keys=["dt"], + options={"type": "format-table", "file.format": "parquet"}, + ) + table_name = "default.format_table_with_filter_assert" + try: + self.rest_catalog.drop_table(table_name, True) + except Exception: + pass + self.rest_catalog.create_table(table_name, schema, False) + table = self.rest_catalog.get_table(table_name) + wb = table.new_batch_write_builder() + tw = wb.new_write() + tc = wb.new_commit() + tw.write_pandas(pd.DataFrame({"a": [1, 2], "b": [10, 20], "dt": [10, 10]})) + tw.write_pandas(pd.DataFrame({"a": [3, 4], "b": [30, 40], "dt": [11, 11]})) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + predicate_eq_dt10 = table.new_read_builder().new_predicate_builder().equal("dt", 10) + splits_by_partition_filter = ( + table.new_read_builder().with_partition_filter({"dt": "10"}).new_scan().plan().splits() + ) + splits_by_with_filter = ( + table.new_read_builder().with_filter(predicate_eq_dt10).new_scan().plan().splits() + ) + self.assertEqual( + len(splits_by_with_filter), len(splits_by_partition_filter), + "with_filter(partition equality) must behave like with_partition_filter (Java-aligned)", + ) + actual_from_filter = ( + table.new_read_builder().with_filter(predicate_eq_dt10).new_read().to_pandas(splits_by_with_filter) + ) + self.assertEqual(len(actual_from_filter), 2) + self.assertEqual(actual_from_filter["b"].tolist(), [10, 20]) + + splits_partition_then_filter = ( + table.new_read_builder() + .with_partition_filter({"dt": "10"}) + .with_filter(predicate_eq_dt10) + .new_scan() + .plan() + .splits() + ) + self.assertEqual( + len(splits_partition_then_filter), len(splits_by_partition_filter), + "with_filter must not overwrite a previously set partition filter", + ) + actual = ( + table.new_read_builder() + .with_partition_filter({"dt": "10"}) + .with_filter(predicate_eq_dt10) + .new_read() + .to_pandas(splits_partition_then_filter) + ) + self.assertEqual(len(actual), 2) + self.assertEqual(actual["b"].tolist(), [10, 20]) + + predicate_non_partition = table.new_read_builder().new_predicate_builder().equal("a", 1) + splits_no_filter = table.new_read_builder().new_scan().plan().splits() + splits_with_non_partition_predicate = ( + table.new_read_builder().with_filter(predicate_non_partition).new_scan().plan().splits() + ) + self.assertEqual( + len(splits_with_non_partition_predicate), len(splits_no_filter), + "with_filter(non-partition predicate) must not change scan when no partition spec extracted", + ) + @parameterized.expand(_format_table_read_write_formats()) def test_format_table_full_overwrite(self, file_format): pa_schema = pa.schema([ From 06d4f42fbc007a9d121ea17e93d159cea2a44071 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Fri, 30 Jan 2026 02:10:30 +0800 Subject: [PATCH 06/10] remove extra base_path --- paimon-python/pypaimon/table/format/format_table_scan.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/paimon-python/pypaimon/table/format/format_table_scan.py b/paimon-python/pypaimon/table/format/format_table_scan.py index 39a600961c17..86af77ed089f 100644 --- a/paimon-python/pypaimon/table/format/format_table_scan.py +++ b/paimon-python/pypaimon/table/format/format_table_scan.py @@ -32,7 +32,6 @@ def _is_data_file_name(name: str) -> bool: def _list_data_files_recursive( file_io: FileIO, path: str, - base_path: str, partition_keys: List[str], partition_only_value: bool, rel_path_parts: Optional[List[str]] = None, @@ -60,7 +59,6 @@ def _list_data_files_recursive( sub_splits = _list_data_files_recursive( file_io, full_path, - base_path, partition_keys, partition_only_value, child_parts, @@ -100,7 +98,6 @@ def plan(self) -> Plan: splits = _list_data_files_recursive( self.table.file_io, self.table.location(), - self.table.location(), self.table.partition_keys, partition_only_value, ) From 0c5a54637043a47a43132b40b19607a811d82057 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Fri, 30 Jan 2026 10:37:43 +0800 Subject: [PATCH 07/10] fix column type always string issue --- .../table/format/format_table_read.py | 55 +++++++++--- .../table/format/format_table_scan.py | 35 ++++++-- .../table/format/format_table_write.py | 86 +++++++++++++++---- .../tests/rest/rest_format_table_test.py | 43 ++++++++++ 4 files changed, 183 insertions(+), 36 deletions(-) diff --git a/paimon-python/pypaimon/table/format/format_table_read.py b/paimon-python/pypaimon/table/format/format_table_read.py index b341e7fbbe35..51bc7a7d61c4 100644 --- a/paimon-python/pypaimon/table/format/format_table_read.py +++ b/paimon-python/pypaimon/table/format/format_table_read.py @@ -30,6 +30,7 @@ def _read_file_to_arrow( fmt: Format, partition_spec: Optional[Dict[str, str]], read_fields: Optional[List[str]], + partition_key_types: Optional[Dict[str, pyarrow.DataType]] = None, ) -> pyarrow.Table: path = split.data_path() csv_read_options = None @@ -42,7 +43,9 @@ def _read_file_to_arrow( chunk = stream.read() if not chunk: break - chunks.append(chunk if isinstance(chunk, bytes) else bytes(chunk)) + chunks.append( + chunk if isinstance(chunk, bytes) else bytes(chunk) + ) data = b"".join(chunks) except Exception as e: raise RuntimeError(f"Failed to read {path}") from e @@ -52,7 +55,9 @@ def _read_file_to_arrow( if fmt == Format.PARQUET: import io - data = bytes(data) if not isinstance(data, bytes) else data + data = ( + bytes(data) if not isinstance(data, bytes) else data + ) if len(data) < 4 or data[:4] != b"PAR1": return pyarrow.table({}) try: @@ -90,12 +95,15 @@ def _read_file_to_arrow( return pyarrow.table({}) else: raise ValueError( - "Format table read for ORC requires PyArrow with ORC support (pyarrow.orc)" + "Format table read for ORC requires PyArrow with ORC support " + "(pyarrow.orc)" ) elif fmt == Format.TEXT: text = data.decode("utf-8") if isinstance(data, bytes) else data line_delimiter = "\n" - lines = text.rstrip(line_delimiter).split(line_delimiter) if text else [] + lines = ( + text.rstrip(line_delimiter).split(line_delimiter) if text else [] + ) if not lines: return pyarrow.table({}) col_name = "value" if not read_fields else read_fields[0] @@ -105,14 +113,17 @@ def _read_file_to_arrow( if partition_spec: for k, v in partition_spec.items(): - tbl = tbl.append_column( - k, - pyarrow.array([v] * tbl.num_rows, type=pyarrow.string()), + if k in tbl.column_names: + continue + pa_type = ( + partition_key_types.get(k, pyarrow.string()) + if partition_key_types + else pyarrow.string() ) - if read_fields: - col_order = [c for c in read_fields if c in tbl.column_names] - if col_order: - tbl = tbl.select(col_order) + arr = pyarrow.array([v] * tbl.num_rows, type=pyarrow.string()) + if pa_type != pyarrow.string(): + arr = arr.cast(pa_type) + tbl = tbl.append_column(k, arr) if read_fields and tbl.num_columns > 0: existing = [c for c in read_fields if c in tbl.column_names] @@ -121,6 +132,20 @@ def _read_file_to_arrow( return tbl +def _partition_key_types( + table: FormatTable, +) -> Optional[Dict[str, pyarrow.DataType]]: + """Build partition column name -> PyArrow type from table schema.""" + if not table.partition_keys: + return None + result = {} + for f in table.fields: + if f.name in table.partition_keys: + pa_field = PyarrowFieldParser.from_paimon_field(f) + result[f.name] = pa_field.type + return result if result else None + + class FormatTableRead: def __init__( @@ -139,6 +164,7 @@ def to_arrow( ) -> pyarrow.Table: read_fields = self.projection fmt = self.table.format() + partition_key_types = _partition_key_types(self.table) tables = [] nrows = 0 for split in splits: @@ -148,6 +174,7 @@ def to_arrow( fmt, split.partition, read_fields, + partition_key_types, ) if t.num_rows > 0: tables.append(t) @@ -161,7 +188,9 @@ def to_arrow( if not tables: fields = self.table.fields if read_fields: - fields = [f for f in self.table.fields if f.name in read_fields] + fields = [ + f for f in self.table.fields if f.name in read_fields + ] schema = PyarrowFieldParser.from_paimon_schema(fields) return pyarrow.Table.from_pydict( {n: [] for n in schema.names}, @@ -179,6 +208,7 @@ def to_iterator( self, splits: List[FormatDataSplit], ) -> Iterator[Any]: + partition_key_types = _partition_key_types(self.table) n_yielded = 0 for split in splits: if self.limit is not None and n_yielded >= self.limit: @@ -189,6 +219,7 @@ def to_iterator( self.table.format(), split.partition, self.projection, + partition_key_types, ) for batch in t.to_batches(): for i in range(batch.num_rows): diff --git a/paimon-python/pypaimon/table/format/format_table_scan.py b/paimon-python/pypaimon/table/format/format_table_scan.py index 86af77ed089f..85a3e8f0a5be 100644 --- a/paimon-python/pypaimon/table/format/format_table_scan.py +++ b/paimon-python/pypaimon/table/format/format_table_scan.py @@ -25,8 +25,21 @@ def _is_data_file_name(name: str) -> bool: - """Match Java FormatTableScan.isDataFileName: exclude hidden and metadata.""" - return name is not None and not name.startswith(".") and not name.startswith("_") + """Match Java FormatTableScan.isDataFileName: exclude hidden/metadata.""" + if name is None: + return False + return not name.startswith(".") and not name.startswith("_") + + +def _is_reserved_dir_name(name: str) -> bool: + """Skip metadata/reserved dirs (not treated as partition levels).""" + if not name: + return True + if name.startswith(".") or name.startswith("_"): + return True + if name.lower() in ("schema", "_schema"): + return True + return False def _list_data_files_recursive( @@ -36,6 +49,7 @@ def _list_data_files_recursive( partition_only_value: bool, rel_path_parts: Optional[List[str]] = None, ) -> List[FormatDataSplit]: + """List data files under path, building partition spec from dir names.""" splits: List[FormatDataSplit] = [] rel_path_parts = rel_path_parts or [] try: @@ -51,6 +65,8 @@ def _list_data_files_recursive( if info.path.startswith("/") or info.path.startswith("file:"): full_path = info.path if info.type == pafs.FileType.Directory: + if _is_reserved_dir_name(name): + continue part_value = name if not partition_only_value and "=" in name: part_value = name.split("=", 1)[1] @@ -68,7 +84,12 @@ def _list_data_files_recursive( size = getattr(info, "size", None) or 0 part_spec: Optional[Dict[str, str]] = None if partition_keys and len(rel_path_parts) >= len(partition_keys): - part_spec = dict(zip(partition_keys, rel_path_parts[: len(partition_keys)])) + part_spec = dict( + zip( + partition_keys, + rel_path_parts[: len(partition_keys)], + ) + ) splits.append( FormatDataSplit( file_path=full_path, @@ -104,9 +125,11 @@ def plan(self) -> Plan: if self.partition_filter: filtered = [] for s in splits: - if s.partition and all( - s.partition.get(k) == v for k, v in self.partition_filter.items() - ): + match = s.partition and all( + s.partition.get(k) == v + for k, v in self.partition_filter.items() + ) + if match: filtered.append(s) splits = filtered if self.limit is not None and self.limit <= 0: diff --git a/paimon-python/pypaimon/table/format/format_table_write.py b/paimon-python/pypaimon/table/format/format_table_write.py index 62978336a21e..590597e1f273 100644 --- a/paimon-python/pypaimon/table/format/format_table_write.py +++ b/paimon-python/pypaimon/table/format/format_table_write.py @@ -22,11 +22,18 @@ import pyarrow from pypaimon.schema.data_types import PyarrowFieldParser -from pypaimon.table.format.format_commit_message import FormatTableCommitMessage -from pypaimon.table.format.format_table import FormatTable, Format +from pypaimon.table.format.format_commit_message import ( + FormatTableCommitMessage, +) +from pypaimon.table.format.format_table import ( + Format, + FormatTable, +) -def _partition_path(partition_spec: dict, partition_keys: List[str], only_value: bool) -> str: +def _partition_path( + partition_spec: dict, partition_keys: List[str], only_value: bool +) -> str: parts = [] for k in partition_keys: v = partition_spec.get(k) @@ -36,6 +43,21 @@ def _partition_path(partition_spec: dict, partition_keys: List[str], only_value: return "/".join(parts) +def _validate_partition_columns( + partition_keys: List[str], + data: pyarrow.RecordBatch, +) -> None: + """Raise if partition key missing from data (wrong column indexing).""" + names = set(data.schema.names) if data.schema else set() + missing = [k for k in partition_keys if k not in names] + if missing: + raise ValueError( + f"Partition column(s) missing from input data: {missing}. " + f"Data columns: {list(names)}. " + "Ensure partition keys exist in the Arrow schema." + ) + + def _partition_from_row( row: pyarrow.RecordBatch, partition_keys: List[str], @@ -45,7 +67,10 @@ def _partition_from_row( for k in partition_keys: col = row.column(row.schema.get_field_index(k)) val = col[row_index] - if val is None or (hasattr(val, "as_py") and val.as_py() is None): + is_none = val is None or ( + hasattr(val, "as_py") and val.as_py() is None + ) + if is_none: out.append(None) else: out.append(val.as_py() if hasattr(val, "as_py") else val) @@ -53,7 +78,7 @@ def _partition_from_row( class FormatTableWrite: - """Batch write for format table: writes Arrow/Pandas to partition dirs as files.""" + """Batch write for format table: Arrow/Pandas to partition dirs.""" def __init__( self, @@ -63,12 +88,15 @@ def __init__( ): self.table = table self._overwrite = overwrite - self._static_partitions = static_partitions if static_partitions is not None else {} + self._static_partitions = ( + static_partitions if static_partitions is not None else {} + ) self._written_paths: List[str] = [] self._overwritten_dirs: set = set() - self._partition_only_value = ( - table.options().get("format-table.partition-path-only-value", "false").lower() == "true" + opt = table.options().get( + "format-table.partition-path-only-value", "false" ) + self._partition_only_value = opt.lower() == "true" self._file_format = table.format() self._data_file_prefix = "data-" self._suffix = { @@ -89,6 +117,7 @@ def write_arrow_batch(self, data: pyarrow.RecordBatch) -> None: part_spec = {} self._write_single_batch(data, part_spec) return + _validate_partition_columns(partition_keys, data) # Group rows by partition parts_to_indices = defaultdict(list) for i in range(data.num_rows): @@ -122,8 +151,13 @@ def _write_single_batch( dir_path = f"{location}/{part_path}" else: dir_path = location - # When overwrite: clear each partition dir only once per write session (first write to it) - if self._overwrite and dir_path not in self._overwritten_dirs and self.table.file_io.exists(dir_path): + # When overwrite: clear partition dir only once per write session + overwrite_this = ( + self._overwrite + and dir_path not in self._overwritten_dirs + and self.table.file_io.exists(dir_path) + ) + if overwrite_this: should_delete = ( not self._static_partitions or all( @@ -132,7 +166,9 @@ def _write_single_batch( ) ) if should_delete: - from pypaimon.table.format.format_table_commit import _delete_data_files_in_path + from pypaimon.table.format.format_table_commit import ( + _delete_data_files_in_path, + ) _delete_data_files_in_path(self.table.file_io, dir_path) self._overwritten_dirs.add(dir_path) self.table.file_io.check_or_mkdirs(dir_path) @@ -163,7 +199,10 @@ def _write_single_batch( import json lines = [] for i in range(tbl.num_rows): - row = {tbl.column_names[j]: tbl.column(j)[i].as_py() for j in range(tbl.num_columns)} + row = { + tbl.column_names[j]: tbl.column(j)[i].as_py() + for j in range(tbl.num_columns) + } lines.append(json.dumps(row) + "\n") raw = "".join(lines).encode("utf-8") elif fmt == Format.ORC: @@ -174,26 +213,33 @@ def _write_single_batch( raw = buf.getvalue() else: raise ValueError( - "Format table write for ORC requires PyArrow with ORC support (pyarrow.orc)" + "Format table write for ORC requires PyArrow with ORC " + "support (pyarrow.orc)" ) elif fmt == Format.TEXT: tbl = pyarrow.Table.from_batches([data]) partition_keys = self.table.partition_keys if partition_keys: - data_cols = [c for c in tbl.column_names if c not in partition_keys] + data_cols = [ + c for c in tbl.column_names if c not in partition_keys + ] tbl = tbl.select(data_cols) - if tbl.num_columns != 1 or not pyarrow.types.is_string(tbl.schema.field(0).type): + pa_f0 = tbl.schema.field(0).type + if tbl.num_columns != 1 or not pyarrow.types.is_string(pa_f0): raise ValueError( "TEXT format only supports a single string column, " f"got {tbl.num_columns} columns" ) - line_delimiter = self.table.options().get("text.line-delimiter", "\n") + line_delimiter = self.table.options().get( + "text.line-delimiter", "\n" + ) lines = [] col = tbl.column(0) for i in range(tbl.num_rows): val = col[i] py_val = val.as_py() if hasattr(val, "as_py") else val - lines.append(("" if py_val is None else str(py_val)) + line_delimiter) + line = "" if py_val is None else str(py_val) + lines.append(line + line_delimiter) raw = "".join(lines).encode("utf-8") else: raise ValueError(f"Format table write not implemented for {fmt}") @@ -204,7 +250,11 @@ def _write_single_batch( self._written_paths.append(path) def prepare_commit(self) -> List[FormatTableCommitMessage]: - return [FormatTableCommitMessage(written_paths=list(self._written_paths))] + return [ + FormatTableCommitMessage( + written_paths=list(self._written_paths) + ) + ] def close(self) -> None: pass diff --git a/paimon-python/pypaimon/tests/rest/rest_format_table_test.py b/paimon-python/pypaimon/tests/rest/rest_format_table_test.py index 223c6efe18d6..bfb3a948f0b7 100644 --- a/paimon-python/pypaimon/tests/rest/rest_format_table_test.py +++ b/paimon-python/pypaimon/tests/rest/rest_format_table_test.py @@ -395,6 +395,49 @@ def test_format_table_partitioned_read_write(self, file_format): actual_dt10 = rb_dt10.new_read().to_pandas(splits_dt10).sort_values(by="b") self.assertEqual(len(actual_dt10), 2) self.assertEqual(actual_dt10["b"].tolist(), [10, 20]) + # Partition column must match schema type (int32), not string + self.assertEqual(actual_dt10["dt"].tolist(), [10, 10]) + self.assertEqual(actual_all["dt"].tolist(), [10, 10, 11, 11]) + + def test_format_table_partition_column_returns_schema_type(self): + """Partition columns must be returned with schema type (e.g. int32), not always string.""" + pa_schema = pa.schema([ + ("a", pa.int32()), + ("b", pa.int32()), + ("dt", pa.int32()), + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, + partition_keys=["dt"], + options={"type": "format-table", "file.format": "parquet"}, + ) + table_name = "default.format_table_partition_schema_type" + try: + self.rest_catalog.drop_table(table_name, True) + except Exception: + pass + self.rest_catalog.create_table(table_name, schema, False) + table = self.rest_catalog.get_table(table_name) + wb = table.new_batch_write_builder() + tw = wb.new_write() + tc = wb.new_commit() + tw.write_pandas(pd.DataFrame({"a": [1, 2], "b": [10, 20], "dt": [1, 1]})) + tw.write_pandas(pd.DataFrame({"a": [3, 4], "b": [30, 40], "dt": [2, 2]})) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + rb = table.new_read_builder().with_partition_filter({"dt": "1"}) + splits = rb.new_scan().plan().splits() + actual = rb.new_read().to_pandas(splits).sort_values(by="b") + self.assertEqual(len(actual), 2) + self.assertEqual(actual["b"].tolist(), [10, 20]) + # Must be int list, not string list; fails if partition column is hardcoded as string + self.assertEqual(actual["dt"].tolist(), [1, 1]) + self.assertTrue( + actual["dt"].dtype in (pd.Int32Dtype(), "int32", "int64"), + "dt must be int type per schema, not string", + ) def test_format_table_with_filter_extracts_partition_like_java(self): """with_filter(partition equality) extracts partition like Java; does not overwrite partition filter.""" From 648f01a1c4b9b02b06bb358b34f83d402782d8f0 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Fri, 30 Jan 2026 10:42:05 +0800 Subject: [PATCH 08/10] clean code --- paimon-python/pypaimon/catalog/rest/rest_catalog.py | 1 - paimon-python/pypaimon/table/format/format_read_builder.py | 6 ------ paimon-python/pypaimon/table/format/format_table_scan.py | 3 --- paimon-python/pypaimon/tests/rest/rest_format_table_test.py | 6 ------ 4 files changed, 16 deletions(-) diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py b/paimon-python/pypaimon/catalog/rest/rest_catalog.py index db9cad4a7202..425146fc6829 100644 --- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py +++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py @@ -45,7 +45,6 @@ from pypaimon.table.format.format_table import FormatTable, Format -# Table type value from Java TableType.FORMAT_TABLE FORMAT_TABLE_TYPE = "format-table" diff --git a/paimon-python/pypaimon/table/format/format_read_builder.py b/paimon-python/pypaimon/table/format/format_read_builder.py index 60537d454381..6e777654874a 100644 --- a/paimon-python/pypaimon/table/format/format_read_builder.py +++ b/paimon-python/pypaimon/table/format/format_read_builder.py @@ -33,12 +33,6 @@ def __init__(self, table: FormatTable): self._partition_filter: Optional[dict] = None def with_filter(self, predicate: Predicate) -> "FormatReadBuilder": - """ - Store predicate and, when table has partition keys and no partition filter is set, - try to extract partition spec from predicate (AND of equality on partition columns) - and set partition filter for scan, aligned with Java FormatReadBuilder.withFilter. - Data predicate is not yet applied in read (FormatTableRead does not support filter). - """ if self._partition_filter is None and self.table.partition_keys and predicate: spec = extract_partition_spec_from_predicate(predicate, self.table.partition_keys) if spec is not None: diff --git a/paimon-python/pypaimon/table/format/format_table_scan.py b/paimon-python/pypaimon/table/format/format_table_scan.py index 85a3e8f0a5be..2033083e0b70 100644 --- a/paimon-python/pypaimon/table/format/format_table_scan.py +++ b/paimon-python/pypaimon/table/format/format_table_scan.py @@ -25,14 +25,12 @@ def _is_data_file_name(name: str) -> bool: - """Match Java FormatTableScan.isDataFileName: exclude hidden/metadata.""" if name is None: return False return not name.startswith(".") and not name.startswith("_") def _is_reserved_dir_name(name: str) -> bool: - """Skip metadata/reserved dirs (not treated as partition levels).""" if not name: return True if name.startswith(".") or name.startswith("_"): @@ -49,7 +47,6 @@ def _list_data_files_recursive( partition_only_value: bool, rel_path_parts: Optional[List[str]] = None, ) -> List[FormatDataSplit]: - """List data files under path, building partition spec from dir names.""" splits: List[FormatDataSplit] = [] rel_path_parts = rel_path_parts or [] try: diff --git a/paimon-python/pypaimon/tests/rest/rest_format_table_test.py b/paimon-python/pypaimon/tests/rest/rest_format_table_test.py index bfb3a948f0b7..d081ae6cd044 100644 --- a/paimon-python/pypaimon/tests/rest/rest_format_table_test.py +++ b/paimon-python/pypaimon/tests/rest/rest_format_table_test.py @@ -122,7 +122,6 @@ def test_format_table_text_read_write(self): pd.testing.assert_frame_equal(actual, expected) def test_format_table_text_read_write_with_nulls(self): - """TEXT format: null string values are written as empty string and read back as empty.""" pa_schema = pa.schema([("value", pa.string())]) schema = Schema.from_pyarrow_schema( pa_schema, @@ -154,7 +153,6 @@ def test_format_table_text_read_write_with_nulls(self): self.assertIn("", actual["value"].values) def test_format_table_text_partitioned_read_write(self): - """Partitioned TEXT table: partition columns are stripped before write; read back with partition.""" pa_schema = pa.schema([ ("value", pa.string()), ("dt", pa.int32()), @@ -191,7 +189,6 @@ def test_format_table_text_partitioned_read_write(self): self.assertEqual(actual["dt"].tolist(), [1, 1, 2]) def test_format_table_read_with_limit_to_iterator(self): - """with_limit(N) must be respected by to_iterator (same as to_arrow/to_pandas).""" pa_schema = pa.schema([ ("a", pa.int32()), ("b", pa.int32()), @@ -265,7 +262,6 @@ def test_format_table_partitioned_overwrite(self, file_format): self.assertEqual(actual["b"].tolist(), [100, 200]) def test_format_table_overwrite_only_specified_partition(self): - """overwrite(static_partition) must only clear that partition; other partitions unchanged.""" pa_schema = pa.schema([ ("a", pa.int32()), ("b", pa.int32()), @@ -312,7 +308,6 @@ def test_format_table_overwrite_only_specified_partition(self): self.assertEqual(c2, [30, 40], "partition c=2 must be unchanged") def test_format_table_overwrite_multiple_batches_same_partition(self): - """Overwrite mode must clear partition dir only once; multiple batches same partition keep all data.""" pa_schema = pa.schema([ ("a", pa.int32()), ("b", pa.int32()), @@ -440,7 +435,6 @@ def test_format_table_partition_column_returns_schema_type(self): ) def test_format_table_with_filter_extracts_partition_like_java(self): - """with_filter(partition equality) extracts partition like Java; does not overwrite partition filter.""" pa_schema = pa.schema([ ("a", pa.int32()), ("b", pa.int32()), From 66744869627b17787b092aa14f987e09e22a2ba4 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Fri, 30 Jan 2026 11:17:34 +0800 Subject: [PATCH 09/10] fix limit --- .../table/format/format_read_builder.py | 16 ++++++--- .../table/format/format_table_read.py | 36 +++++++++++++++++-- .../table/format/format_table_scan.py | 4 +-- .../table/format/format_table_write.py | 11 ++---- 4 files changed, 50 insertions(+), 17 deletions(-) diff --git a/paimon-python/pypaimon/table/format/format_read_builder.py b/paimon-python/pypaimon/table/format/format_read_builder.py index 6e777654874a..ea501d56f0b1 100644 --- a/paimon-python/pypaimon/table/format/format_read_builder.py +++ b/paimon-python/pypaimon/table/format/format_read_builder.py @@ -33,8 +33,15 @@ def __init__(self, table: FormatTable): self._partition_filter: Optional[dict] = None def with_filter(self, predicate: Predicate) -> "FormatReadBuilder": - if self._partition_filter is None and self.table.partition_keys and predicate: - spec = extract_partition_spec_from_predicate(predicate, self.table.partition_keys) + ok = ( + self._partition_filter is None + and self.table.partition_keys + and predicate + ) + if ok: + spec = extract_partition_spec_from_predicate( + predicate, self.table.partition_keys + ) if spec is not None: self._partition_filter = spec return self @@ -47,7 +54,9 @@ def with_limit(self, limit: int) -> "FormatReadBuilder": self._limit = limit return self - def with_partition_filter(self, partition_spec: Optional[dict]) -> "FormatReadBuilder": + def with_partition_filter( + self, partition_spec: Optional[dict] + ) -> "FormatReadBuilder": self._partition_filter = partition_spec return self @@ -55,7 +64,6 @@ def new_scan(self) -> FormatTableScan: return FormatTableScan( self.table, partition_filter=self._partition_filter, - limit=self._limit, ) def new_read(self) -> FormatTableRead: diff --git a/paimon-python/pypaimon/table/format/format_table_read.py b/paimon-python/pypaimon/table/format/format_table_read.py index 51bc7a7d61c4..690f63ab3447 100644 --- a/paimon-python/pypaimon/table/format/format_table_read.py +++ b/paimon-python/pypaimon/table/format/format_table_read.py @@ -24,6 +24,20 @@ from pypaimon.table.format.format_table import FormatTable, Format +def _text_format_schema_column(table: FormatTable) -> Optional[str]: + """TEXT format: single data column name from schema (non-partition).""" + if table.format() != Format.TEXT or not table.fields: + return None + data_names = [ + f.name for f in table.fields if f.name not in table.partition_keys + ] + return ( + data_names[0] + if data_names + else (table.field_names[0] if table.field_names else None) + ) + + def _read_file_to_arrow( file_io: Any, split: FormatDataSplit, @@ -31,6 +45,7 @@ def _read_file_to_arrow( partition_spec: Optional[Dict[str, str]], read_fields: Optional[List[str]], partition_key_types: Optional[Dict[str, pyarrow.DataType]] = None, + text_column_name: Optional[str] = None, ) -> pyarrow.Table: path = split.data_path() csv_read_options = None @@ -106,7 +121,11 @@ def _read_file_to_arrow( ) if not lines: return pyarrow.table({}) - col_name = "value" if not read_fields else read_fields[0] + col_name = ( + read_fields[0] + if read_fields + else (text_column_name if text_column_name else "value") + ) tbl = pyarrow.table({col_name: lines}) else: raise ValueError(f"Format {fmt} read not implemented in Python") @@ -165,6 +184,11 @@ def to_arrow( read_fields = self.projection fmt = self.table.format() partition_key_types = _partition_key_types(self.table) + text_col = ( + _text_format_schema_column(self.table) + if fmt == Format.TEXT + else None + ) tables = [] nrows = 0 for split in splits: @@ -175,6 +199,7 @@ def to_arrow( split.partition, read_fields, partition_key_types, + text_column_name=text_col, ) if t.num_rows > 0: tables.append(t) @@ -209,6 +234,12 @@ def to_iterator( splits: List[FormatDataSplit], ) -> Iterator[Any]: partition_key_types = _partition_key_types(self.table) + fmt = self.table.format() + text_col = ( + _text_format_schema_column(self.table) + if fmt == Format.TEXT + else None + ) n_yielded = 0 for split in splits: if self.limit is not None and n_yielded >= self.limit: @@ -216,10 +247,11 @@ def to_iterator( t = _read_file_to_arrow( self.table.file_io, split, - self.table.format(), + fmt, split.partition, self.projection, partition_key_types, + text_column_name=text_col, ) for batch in t.to_batches(): for i in range(batch.num_rows): diff --git a/paimon-python/pypaimon/table/format/format_table_scan.py b/paimon-python/pypaimon/table/format/format_table_scan.py index 2033083e0b70..2773c8c085e9 100644 --- a/paimon-python/pypaimon/table/format/format_table_scan.py +++ b/paimon-python/pypaimon/table/format/format_table_scan.py @@ -90,7 +90,7 @@ def _list_data_files_recursive( splits.append( FormatDataSplit( file_path=full_path, - file_size=size if size is not None else 0, + file_size=size, partition=part_spec, ) ) @@ -123,7 +123,7 @@ def plan(self) -> Plan: filtered = [] for s in splits: match = s.partition and all( - s.partition.get(k) == v + str(s.partition.get(k)) == str(v) for k, v in self.partition_filter.items() ) if match: diff --git a/paimon-python/pypaimon/table/format/format_table_write.py b/paimon-python/pypaimon/table/format/format_table_write.py index 590597e1f273..eb45b718d5c5 100644 --- a/paimon-python/pypaimon/table/format/format_table_write.py +++ b/paimon-python/pypaimon/table/format/format_table_write.py @@ -176,16 +176,12 @@ def _write_single_batch( path = f"{dir_path}/{file_name}" fmt = self._file_format + tbl = pyarrow.Table.from_batches([data]) if fmt == Format.PARQUET: buf = io.BytesIO() - pyarrow.parquet.write_table( - pyarrow.Table.from_batches([data]), - buf, - compression="zstd", - ) + pyarrow.parquet.write_table(tbl, buf, compression="zstd") raw = buf.getvalue() elif fmt == Format.CSV: - tbl = pyarrow.Table.from_batches([data]) if hasattr(pyarrow, "csv"): buf = io.BytesIO() pyarrow.csv.write_csv(tbl, buf) @@ -195,7 +191,6 @@ def _write_single_batch( tbl.to_pandas().to_csv(buf, index=False) raw = buf.getvalue().encode("utf-8") elif fmt == Format.JSON: - tbl = pyarrow.Table.from_batches([data]) import json lines = [] for i in range(tbl.num_rows): @@ -206,7 +201,6 @@ def _write_single_batch( lines.append(json.dumps(row) + "\n") raw = "".join(lines).encode("utf-8") elif fmt == Format.ORC: - tbl = pyarrow.Table.from_batches([data]) if hasattr(pyarrow, "orc"): buf = io.BytesIO() pyarrow.orc.write_table(tbl, buf) @@ -217,7 +211,6 @@ def _write_single_batch( "support (pyarrow.orc)" ) elif fmt == Format.TEXT: - tbl = pyarrow.Table.from_batches([data]) partition_keys = self.table.partition_keys if partition_keys: data_cols = [ From 540f303b244ba5021bc7cd96c4408f298e2b8010 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Fri, 30 Jan 2026 11:44:06 +0800 Subject: [PATCH 10/10] fix text format delimiter --- .../table/format/format_table_read.py | 30 ++++++++++++++----- .../table/format/format_table_scan.py | 6 ---- 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/paimon-python/pypaimon/table/format/format_table_read.py b/paimon-python/pypaimon/table/format/format_table_read.py index 690f63ab3447..11ce3faf5b29 100644 --- a/paimon-python/pypaimon/table/format/format_table_read.py +++ b/paimon-python/pypaimon/table/format/format_table_read.py @@ -46,6 +46,7 @@ def _read_file_to_arrow( read_fields: Optional[List[str]], partition_key_types: Optional[Dict[str, pyarrow.DataType]] = None, text_column_name: Optional[str] = None, + text_line_delimiter: str = "\n", ) -> pyarrow.Table: path = split.data_path() csv_read_options = None @@ -115,17 +116,20 @@ def _read_file_to_arrow( ) elif fmt == Format.TEXT: text = data.decode("utf-8") if isinstance(data, bytes) else data - line_delimiter = "\n" lines = ( - text.rstrip(line_delimiter).split(line_delimiter) if text else [] + text.rstrip(text_line_delimiter).split(text_line_delimiter) + if text + else [] ) if not lines: return pyarrow.table({}) - col_name = ( - read_fields[0] - if read_fields - else (text_column_name if text_column_name else "value") - ) + part_keys = set(partition_spec.keys()) if partition_spec else set() + col_name = text_column_name if text_column_name else "value" + if read_fields: + for f in read_fields: + if f not in part_keys: + col_name = f + break tbl = pyarrow.table({col_name: lines}) else: raise ValueError(f"Format {fmt} read not implemented in Python") @@ -189,6 +193,11 @@ def to_arrow( if fmt == Format.TEXT else None ) + text_delim = ( + self.table.options().get("text.line-delimiter", "\n") + if fmt == Format.TEXT + else "\n" + ) tables = [] nrows = 0 for split in splits: @@ -200,6 +209,7 @@ def to_arrow( read_fields, partition_key_types, text_column_name=text_col, + text_line_delimiter=text_delim, ) if t.num_rows > 0: tables.append(t) @@ -240,6 +250,11 @@ def to_iterator( if fmt == Format.TEXT else None ) + text_delim = ( + self.table.options().get("text.line-delimiter", "\n") + if fmt == Format.TEXT + else "\n" + ) n_yielded = 0 for split in splits: if self.limit is not None and n_yielded >= self.limit: @@ -252,6 +267,7 @@ def to_iterator( self.projection, partition_key_types, text_column_name=text_col, + text_line_delimiter=text_delim, ) for batch in t.to_batches(): for i in range(batch.num_rows): diff --git a/paimon-python/pypaimon/table/format/format_table_scan.py b/paimon-python/pypaimon/table/format/format_table_scan.py index 2773c8c085e9..9ce2d9ea26ad 100644 --- a/paimon-python/pypaimon/table/format/format_table_scan.py +++ b/paimon-python/pypaimon/table/format/format_table_scan.py @@ -103,11 +103,9 @@ def __init__( self, table: FormatTable, partition_filter: Optional[Dict[str, str]] = None, - limit: Optional[int] = None, ): self.table = table self.partition_filter = partition_filter # optional equality filter - self.limit = limit def plan(self) -> Plan: partition_only_value = self.table.options().get( @@ -129,8 +127,4 @@ def plan(self) -> Plan: if match: filtered.append(s) splits = filtered - if self.limit is not None and self.limit <= 0: - splits = [] - elif self.limit is not None and len(splits) > self.limit: - splits = splits[: self.limit] return Plan(_splits=splits)