-
Notifications
You must be signed in to change notification settings - Fork 421
fix: Use binary(16) for UUID type to ensure Spark compatibility #2881
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1500,7 +1500,7 @@ def test_table_write_schema_with_valid_upcast( | |
| pa.field("list", pa.list_(pa.int64()), nullable=False), | ||
| pa.field("map", pa.map_(pa.string(), pa.int64()), nullable=False), | ||
| pa.field("double", pa.float64(), nullable=True), # can support upcasting float to double | ||
| pa.field("uuid", pa.uuid(), nullable=True), | ||
| pa.field("uuid", pa.binary(16), nullable=True), | ||
| ) | ||
| ) | ||
| ) | ||
|
|
@@ -2138,7 +2138,7 @@ def test_uuid_partitioning(session_catalog: Catalog, spark: SparkSession, transf | |
| tbl.append(arr_table) | ||
|
|
||
| lhs = [r[0] for r in spark.table(identifier).collect()] | ||
| rhs = [str(u.as_py()) for u in tbl.scan().to_arrow()["uuid"].combine_chunks()] | ||
| rhs = [str(uuid.UUID(bytes=u.as_py())) for u in tbl.scan().to_arrow()["uuid"].combine_chunks()] | ||
| assert lhs == rhs | ||
|
|
||
|
|
||
|
|
@@ -2530,3 +2530,63 @@ def test_v3_write_and_read_row_lineage(spark: SparkSession, session_catalog: Cat | |
| assert tbl.metadata.next_row_id == initial_next_row_id + len(test_data), ( | ||
| "Expected next_row_id to be incremented by the number of added rows" | ||
| ) | ||
|
|
||
|
|
||
| @pytest.mark.integration | ||
| def test_write_uuid_in_pyiceberg_and_scan(session_catalog: Catalog, spark: SparkSession) -> None: | ||
| """Test UUID compatibility between PyIceberg and Spark. | ||
|
|
||
| UUIDs must be written as binary(16) for Spark compatibility since Java Arrow | ||
| metadata differs from Python Arrow metadata for UUID types. | ||
| """ | ||
| identifier = "default.test_write_uuid_in_pyiceberg_and_scan" | ||
|
|
||
| catalog = load_catalog("default", type="in-memory") | ||
| catalog.create_namespace("ns") | ||
|
|
||
| schema = Schema(NestedField(field_id=1, name="uuid_col", field_type=UUIDType(), required=False)) | ||
|
|
||
| test_data_with_null = { | ||
| "uuid_col": [ | ||
| uuid.UUID("00000000-0000-0000-0000-000000000000").bytes, | ||
| None, | ||
| uuid.UUID("11111111-1111-1111-1111-111111111111").bytes, | ||
| ] | ||
| } | ||
|
|
||
| try: | ||
| session_catalog.drop_table(identifier=identifier) | ||
| except NoSuchTableError: | ||
| pass | ||
|
|
||
| table = _create_table(session_catalog, identifier, {"format-version": "2"}, schema=schema) | ||
|
|
||
| arrow_table = pa.table(test_data_with_null, schema=schema.as_arrow()) | ||
|
|
||
| # Write with pyarrow | ||
| table.append(arrow_table) | ||
|
|
||
| # Write with pyspark | ||
| spark.sql( | ||
| f""" | ||
| INSERT INTO {identifier} VALUES ("22222222-2222-2222-2222-222222222222") | ||
| """ | ||
| ) | ||
| df = spark.table(identifier) | ||
|
|
||
| table.refresh() | ||
|
|
||
| assert df.count() == 4 | ||
| assert len(table.scan().to_arrow()) == 4 | ||
|
|
||
| result = df.where("uuid_col = '00000000-0000-0000-0000-000000000000'") | ||
| assert result.count() == 1 | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I ran this test on current main branch with 1.10.1 and this is the stacktrace. This is different from the stacktrace in #2007
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i also downloaded the 2 data files
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The stacktrace has changed because of the fix that I made in the Java implementation. This PR (apache/iceberg#14027) has more details about the problem and in the issue #2372 I explain the problem from the pyiceberg side and why we are changing back to binary(16).
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Parquet files look wrong, and hot sure what happened there. UUID should annotate
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The pa.binary(16) change fixed the comparison issue but broke Parquet spec compliance by removing the UUID logical type annotation. We can get back to UUID in the visitor and raise an exception with a better message when the user tries to filter a UUID column, since PyArrow does not support filtering. |
||
|
|
||
| result = df.where("uuid_col = '22222222-2222-2222-2222-222222222222'") | ||
| assert result.count() == 1 | ||
|
|
||
| result = table.scan(row_filter=EqualTo("uuid_col", uuid.UUID("00000000-0000-0000-0000-000000000000").bytes)).to_arrow() | ||
| assert len(result) == 1 | ||
|
|
||
| result = table.scan(row_filter=EqualTo("uuid_col", uuid.UUID("22222222-2222-2222-2222-222222222222").bytes)).to_arrow() | ||
| assert len(result) == 1 | ||

There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure if this is the right fix.
We explicity changed binary(16) to uuid in this PR
https://github.com/apache/iceberg-python/pull/2007/files#diff-8d5e63f2a87ead8cebe2fd8ac5dcf2198d229f01e16bb9e06e21f7277c328abdR687
The current change reverts it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe that my other comment clarifies this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which comment? I would prefer to keep it UUID and fix this on the Java side.
Most implementations don't really look at the Arrow/Parquet/etc logical annotations, so both
uuid(which is afixed[16]with an UUID label on it) and cast it to a type that's compatible with the query engine. Spark has shown to be problematic because it doesn't have a native UUID type, but it handles it internally as a string.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was referring to this comment #2881 (comment), which points to the #2372 issue where I explain the problem. In this issue your suggestion was to change to fixed[16].
@kevinjqliu added the error we're getting now in the other comment. The problem is filtering a parquet file with UUID, not reading it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the PR body to add the filtering error.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see. I think that's reasonable.
Let's add a comment in the code, referring to #2372 explaining why we we use
binary(16)rather thanuuid()There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Fokko done in a7bdfca