-
Notifications
You must be signed in to change notification settings - Fork 1.9k
fix: Use table schema rather than file schema when projecting properties #20174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -158,6 +158,9 @@ pub struct FileScanConfig { | |
| /// DataFusion may reorder file processing for optimization without affecting correctness. | ||
| pub preserve_order: bool, | ||
| /// All equivalent lexicographical orderings that describe the schema. | ||
| /// | ||
| /// Note these orderings are with respect to projected schema (i.e., after | ||
| /// applying [`FileSource::projection`]`). | ||
| pub output_ordering: Vec<LexOrdering>, | ||
| /// File compression type | ||
| pub file_compression_type: FileCompressionType, | ||
|
|
@@ -685,11 +688,10 @@ impl DataSource for FileScanConfig { | |
|
|
||
| fn eq_properties(&self) -> EquivalenceProperties { | ||
| let schema = self.file_source.table_schema().table_schema(); | ||
| let mut eq_properties = EquivalenceProperties::new_with_orderings( | ||
| Arc::clone(schema), | ||
| self.output_ordering.clone(), | ||
| ) | ||
| .with_constraints(self.constraints.clone()); | ||
| let orderings = project_orderings(&self.output_ordering, schema); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is the fix (to call
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After some more thought I am not sure if this is correct 🤔 I need to do some more research into when the projection is applied |
||
| let mut eq_properties = | ||
| EquivalenceProperties::new_with_orderings(Arc::clone(schema), orderings) | ||
| .with_constraints(self.constraints.clone()); | ||
|
|
||
| if let Some(filter) = self.file_source.filter() { | ||
| // We need to remap column indexes to match the projected schema since that's what the equivalence properties deal with. | ||
|
|
@@ -1402,13 +1404,13 @@ mod tests { | |
|
|
||
| use super::*; | ||
| use crate::TableSchema; | ||
| use crate::test_util::col; | ||
| use crate::test_util::{col, mock_source}; | ||
| use crate::{ | ||
| generate_test_files, test_util::MockSource, tests::aggr_test_schema, | ||
| verify_sort_integrity, | ||
| }; | ||
|
|
||
| use arrow::datatypes::Field; | ||
| use arrow::datatypes::{Field, Schema, TimeUnit}; | ||
| use datafusion_common::stats::Precision; | ||
| use datafusion_common::{ColumnStatistics, internal_err}; | ||
| use datafusion_expr::{Operator, SortExpr}; | ||
|
|
@@ -1957,22 +1959,103 @@ mod tests { | |
| } | ||
| } | ||
|
|
||
| #[test] | ||
| fn equivalence_properties_projection_reorders_schema() { | ||
| // This test ensures `project_orderings` is applied even when there are no | ||
| // partition columns: a projection reorders the schema, and the output ordering | ||
| // is specified in projected schema indices. | ||
| let file_schema = Arc::new(Schema::new(vec![ | ||
| Field::new("a", DataType::Int32, false), | ||
| Field::new("b", DataType::Int64, false), | ||
| Field::new("c", DataType::Utf8, true), | ||
| ])); | ||
| let object_store_url = ObjectStoreUrl::parse("test:///").unwrap(); | ||
| let table_schema = TableSchema::from(file_schema); | ||
|
|
||
| let file_source = mock_source(table_schema); | ||
|
|
||
| let config = FileScanConfigBuilder::new(object_store_url.clone(), file_source) | ||
| .with_projection_indices(Some(vec![2, 0])) | ||
| .unwrap() | ||
| // Indices are in the projected schema: [c, a] -> [0, 1]. | ||
| .with_output_ordering(vec![[sort_expr("c", 0), sort_expr("a", 1)].into()]) | ||
| .build(); | ||
|
|
||
| let eq_properties = config.eq_properties(); | ||
| let ordering = eq_properties | ||
| .output_ordering() | ||
| .expect("expected output ordering"); | ||
|
|
||
| let first_col = ordering[0].expr.as_any().downcast_ref::<Column>().unwrap(); | ||
| assert_eq!(first_col.name(), "c"); | ||
| assert_eq!(first_col.index(), 0); | ||
|
|
||
| let second_col = ordering[1].expr.as_any().downcast_ref::<Column>().unwrap(); | ||
| assert_eq!(second_col.name(), "a"); | ||
| assert_eq!(second_col.index(), 1); | ||
| } | ||
|
|
||
| #[test] | ||
| fn equivalence_properties_reindex_output_ordering_for_partition_cols() { | ||
| // This test exercises `project_orderings` via `eq_properties()` when a projection | ||
| // reorders columns and includes a partition column. | ||
| let file_schema = Arc::new(Schema::new(vec![ | ||
| Field::new("f", DataType::Float64, true), | ||
| Field::new( | ||
| "time", | ||
| DataType::Timestamp(TimeUnit::Nanosecond, None), | ||
| false, | ||
| ), | ||
| ])); | ||
| let object_store_url = ObjectStoreUrl::parse("test:///").unwrap(); | ||
|
|
||
| let table_schema = TableSchema::new( | ||
| file_schema, | ||
| vec![Arc::new(Field::new( | ||
| "tag1", | ||
| wrap_partition_type_in_dict(DataType::Utf8), | ||
| true, | ||
| ))], | ||
| ); | ||
|
|
||
| let file_source = mock_source(table_schema); | ||
|
|
||
| let config = FileScanConfigBuilder::new(object_store_url.clone(), file_source) | ||
| .with_projection_indices(Some(vec![2, 0, 1])) | ||
| .unwrap() | ||
| // Output ordering is defined on unprojected (base) column indices. We expect | ||
| // `project_orderings` to remap these to the projected schema indices. | ||
| .with_output_ordering(vec![ | ||
| [sort_expr("tag1", 0), sort_expr("time", 1)].into(), | ||
| ]) | ||
| .build(); | ||
|
|
||
| let eq_properties = config.eq_properties(); | ||
| let ordering = eq_properties | ||
| .output_ordering() | ||
| .expect("expected output ordering"); | ||
|
|
||
| let first_col = ordering[0].expr.as_any().downcast_ref::<Column>().unwrap(); | ||
| assert_eq!(first_col.name(), "tag1"); | ||
| assert_eq!(first_col.index(), 0); | ||
|
|
||
| let second_col = ordering[1].expr.as_any().downcast_ref::<Column>().unwrap(); | ||
| assert_eq!(second_col.name(), "time"); | ||
| assert_eq!(second_col.index(), 2); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_file_scan_config_builder_defaults() { | ||
| let file_schema = aggr_test_schema(); | ||
| let object_store_url = ObjectStoreUrl::parse("test:///").unwrap(); | ||
|
|
||
| let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); | ||
| let table_schema = TableSchema::from(Arc::clone(&file_schema)); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is a driveby cleanup to reduce test duplication (codex actually copied a lot of this extra bloat into the new tests, so instead I went and cleaned it up at the source) I can break it into a new PR if you prefer |
||
|
|
||
| let file_source: Arc<dyn FileSource> = | ||
| Arc::new(MockSource::new(table_schema.clone())); | ||
| let file_source = mock_source(table_schema); | ||
|
|
||
| // Create a builder with only required parameters and build without any additional configurations | ||
| let config = FileScanConfigBuilder::new( | ||
| object_store_url.clone(), | ||
| Arc::clone(&file_source), | ||
| ) | ||
| .build(); | ||
| let config = | ||
| FileScanConfigBuilder::new(object_store_url.clone(), file_source).build(); | ||
|
|
||
| // Verify default values | ||
| assert_eq!(config.object_store_url, object_store_url); | ||
|
|
@@ -2025,24 +2108,21 @@ mod tests { | |
| let file = PartitionedFile::new("test_file.parquet", 100); | ||
|
|
||
| let table_schema = TableSchema::new( | ||
| Arc::clone(&schema), | ||
| schema.clone(), | ||
| partition_cols.iter().map(|f| Arc::new(f.clone())).collect(), | ||
| ); | ||
|
|
||
| let file_source: Arc<dyn FileSource> = | ||
| Arc::new(MockSource::new(table_schema.clone())); | ||
| let file_source = mock_source(table_schema); | ||
|
|
||
| // Create a config with non-default values | ||
| let original_config = FileScanConfigBuilder::new( | ||
| object_store_url.clone(), | ||
| Arc::clone(&file_source), | ||
| ) | ||
| .with_projection_indices(Some(vec![0, 2])) | ||
| .unwrap() | ||
| .with_limit(Some(10)) | ||
| .with_file(file.clone()) | ||
| .with_constraints(Constraints::default()) | ||
| .build(); | ||
| let original_config = | ||
| FileScanConfigBuilder::new(object_store_url.clone(), file_source) | ||
| .with_projection_indices(Some(vec![0, 2])) | ||
| .unwrap() | ||
| .with_limit(Some(10)) | ||
| .with_file(file.clone()) | ||
| .with_constraints(Constraints::default()) | ||
| .build(); | ||
|
|
||
| // Create a new builder from the config | ||
| let new_builder = FileScanConfigBuilder::from(original_config); | ||
|
|
@@ -2470,4 +2550,9 @@ mod tests { | |
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| /// Returns a PhysicalSortExpr for the given column name and index. | ||
| fn sort_expr(name: &str, index: usize) -> PhysicalSortExpr { | ||
| PhysicalSortExpr::new_default(Arc::new(Column::new(name, index))) | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also noticed the equivalence properties are computed on demand each time during planning, which can be quite substantial. I'll see if I can find some way to avoid doing so as a follow on