Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions paimon-python/pypaimon/catalog/rest/rest_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
from pypaimon.snapshot.snapshot import Snapshot
from pypaimon.snapshot.snapshot_commit import PartitionStatistics
from pypaimon.table.file_store_table import FileStoreTable
from pypaimon.table.format.format_table import FormatTable, Format


FORMAT_TABLE_TYPE = "format-table"


class RESTCatalog(Catalog):
Expand Down Expand Up @@ -180,7 +184,7 @@ def list_tables_paged(
except ForbiddenException as e:
raise DatabaseNoPermissionException(database_name) from e

def get_table(self, identifier: Union[str, Identifier]) -> FileStoreTable:
def get_table(self, identifier: Union[str, Identifier]):
if not isinstance(identifier, Identifier):
identifier = Identifier.from_string(identifier)
return self.load_table(
Expand Down Expand Up @@ -263,9 +267,12 @@ def load_table(self,
internal_file_io: Callable[[str], Any],
external_file_io: Callable[[str], Any],
metadata_loader: Callable[[Identifier], TableMetadata],
) -> FileStoreTable:
):
metadata = metadata_loader(identifier)
schema = metadata.schema
table_type = schema.options.get(CoreOptions.TYPE.key(), "").strip().lower()
if table_type == FORMAT_TABLE_TYPE:
return self._create_format_table(identifier, metadata, internal_file_io, external_file_io)
data_file_io = external_file_io if metadata.is_external else internal_file_io
catalog_env = CatalogEnvironment(
identifier=identifier,
Expand All @@ -281,6 +288,30 @@ def load_table(self,
catalog_env)
return table

def _create_format_table(self,
identifier: Identifier,
metadata: TableMetadata,
internal_file_io: Callable[[str], Any],
external_file_io: Callable[[str], Any],
) -> FormatTable:
schema = metadata.schema
location = schema.options.get(CoreOptions.PATH.key())
if not location:
raise ValueError("Format table schema must have path option")
data_file_io = external_file_io if metadata.is_external else internal_file_io
file_io = data_file_io(location)
file_format = schema.options.get(CoreOptions.FILE_FORMAT.key(), "parquet")
fmt = Format.parse(file_format)
return FormatTable(
file_io=file_io,
identifier=identifier,
table_schema=schema,
location=location,
format=fmt,
options=dict(schema.options),
comment=schema.comment,
)

@staticmethod
def create(file_io: FileIO,
table_path: str,
Expand Down
19 changes: 18 additions & 1 deletion paimon-python/pypaimon/read/push_down_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,29 @@
# limitations under the License.
################################################################################

from typing import Dict, List, Set
from typing import Dict, List, Optional, Set

from pypaimon.common.predicate import Predicate
from pypaimon.common.predicate_builder import PredicateBuilder


def extract_partition_spec_from_predicate(
predicate: Predicate, partition_keys: List[str]
) -> Optional[Dict[str, str]]:
if not predicate or not partition_keys:
return None
parts = _split_and(predicate)
spec: Dict[str, str] = {}
for p in parts:
if p.method != "equal" or p.field is None or p.literals is None or len(p.literals) != 1:
continue
if p.field in partition_keys:
spec[p.field] = str(p.literals[0])
if set(spec.keys()) == set(partition_keys):
return spec
return None


def trim_and_transform_predicate(input_predicate: Predicate, all_fields: List[str], trimmed_keys: List[str]):
new_predicate = trim_predicate_by_fields(input_predicate, trimmed_keys)
part_to_index = {element: idx for idx, element in enumerate(trimmed_keys)}
Expand Down
36 changes: 36 additions & 0 deletions paimon-python/pypaimon/table/format/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from pypaimon.table.format.format_data_split import FormatDataSplit
from pypaimon.table.format.format_table import FormatTable, Format
from pypaimon.table.format.format_read_builder import FormatReadBuilder
from pypaimon.table.format.format_table_scan import FormatTableScan
from pypaimon.table.format.format_table_read import FormatTableRead
from pypaimon.table.format.format_batch_write_builder import FormatBatchWriteBuilder
from pypaimon.table.format.format_table_write import FormatTableWrite
from pypaimon.table.format.format_table_commit import FormatTableCommit

__all__ = [
"FormatDataSplit",
"FormatTable",
"Format",
"FormatReadBuilder",
"FormatTableScan",
"FormatTableRead",
"FormatBatchWriteBuilder",
"FormatTableWrite",
"FormatTableCommit",
]
55 changes: 55 additions & 0 deletions paimon-python/pypaimon/table/format/format_batch_write_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Optional

from pypaimon.table.format.format_table import FormatTable
from pypaimon.table.format.format_table_commit import FormatTableCommit
from pypaimon.table.format.format_table_write import FormatTableWrite


class FormatBatchWriteBuilder:
def __init__(self, table: FormatTable):
self.table = table
self._overwrite = False
self._static_partition: Optional[dict] = None

def overwrite(self, static_partition: Optional[dict] = None) -> "FormatBatchWriteBuilder":
self._overwrite = True
self._validate_static_partition(static_partition)
self._static_partition = static_partition if static_partition is not None else {}
return self

def _validate_static_partition(self, static_partition: Optional[dict]) -> None:
if not static_partition:
return
if not self.table.partition_keys:
raise ValueError(
"Format table is not partitioned, static partition values are not allowed."
)
for key in static_partition:
if key not in self.table.partition_keys:
raise ValueError(f"Unknown static partition column: {key}")

def new_write(self) -> FormatTableWrite:
return FormatTableWrite(
self.table,
overwrite=self._overwrite,
static_partitions=self._static_partition,
)

def new_commit(self) -> FormatTableCommit:
return FormatTableCommit(table=self.table)
26 changes: 26 additions & 0 deletions paimon-python/pypaimon/table/format/format_commit_message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from dataclasses import dataclass
from typing import List


@dataclass
class FormatTableCommitMessage:
written_paths: List[str]

def is_empty(self) -> bool:
return not self.written_paths
30 changes: 30 additions & 0 deletions paimon-python/pypaimon/table/format/format_data_split.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from dataclasses import dataclass
from typing import Dict, Optional, Any


@dataclass(frozen=True)
class FormatDataSplit:
"""Split for format table: one file (or future: byte range) per split."""

file_path: str
file_size: int
partition: Optional[Dict[str, Any]] = None # partition column name -> value

def data_path(self) -> str:
return self.file_path
82 changes: 82 additions & 0 deletions paimon-python/pypaimon/table/format/format_read_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List, Optional

from pypaimon.common.predicate import Predicate
from pypaimon.common.predicate_builder import PredicateBuilder
from pypaimon.read.push_down_utils import extract_partition_spec_from_predicate
from pypaimon.schema.data_types import DataField
from pypaimon.table.format.format_table import FormatTable
from pypaimon.table.format.format_table_scan import FormatTableScan
from pypaimon.table.format.format_table_read import FormatTableRead


class FormatReadBuilder:
def __init__(self, table: FormatTable):
self.table = table
self._projection: Optional[List[str]] = None
self._limit: Optional[int] = None
self._partition_filter: Optional[dict] = None

def with_filter(self, predicate: Predicate) -> "FormatReadBuilder":
ok = (
self._partition_filter is None
and self.table.partition_keys
and predicate
)
if ok:
spec = extract_partition_spec_from_predicate(
predicate, self.table.partition_keys
)
if spec is not None:
self._partition_filter = spec
return self

def with_projection(self, projection: List[str]) -> "FormatReadBuilder":
self._projection = projection
return self

def with_limit(self, limit: int) -> "FormatReadBuilder":
self._limit = limit
return self

def with_partition_filter(
self, partition_spec: Optional[dict]
) -> "FormatReadBuilder":
self._partition_filter = partition_spec
return self

def new_scan(self) -> FormatTableScan:
return FormatTableScan(
self.table,
partition_filter=self._partition_filter,
)

def new_read(self) -> FormatTableRead:
return FormatTableRead(
table=self.table,
projection=self._projection,
limit=self._limit,
)

def new_predicate_builder(self) -> PredicateBuilder:
return PredicateBuilder(self.read_type())

def read_type(self) -> List[DataField]:
if self._projection:
return [f for f in self.table.fields if f.name in self._projection]
return list(self.table.fields)
Loading