diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index fe78c0e5262a..d47f6363bac8 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -158,6 +158,9 @@ pub struct FileScanConfig { /// DataFusion may reorder file processing for optimization without affecting correctness. pub preserve_order: bool, /// All equivalent lexicographical orderings that describe the schema. + /// + /// Note these orderings are with respect to projected schema (i.e., after + /// applying [`FileSource::projection`]`). pub output_ordering: Vec, /// File compression type pub file_compression_type: FileCompressionType, @@ -685,11 +688,10 @@ impl DataSource for FileScanConfig { fn eq_properties(&self) -> EquivalenceProperties { let schema = self.file_source.table_schema().table_schema(); - let mut eq_properties = EquivalenceProperties::new_with_orderings( - Arc::clone(schema), - self.output_ordering.clone(), - ) - .with_constraints(self.constraints.clone()); + let orderings = project_orderings(&self.output_ordering, schema); + let mut eq_properties = + EquivalenceProperties::new_with_orderings(Arc::clone(schema), orderings) + .with_constraints(self.constraints.clone()); if let Some(filter) = self.file_source.filter() { // We need to remap column indexes to match the projected schema since that's what the equivalence properties deal with. @@ -1402,13 +1404,13 @@ mod tests { use super::*; use crate::TableSchema; - use crate::test_util::col; + use crate::test_util::{col, mock_source}; use crate::{ generate_test_files, test_util::MockSource, tests::aggr_test_schema, verify_sort_integrity, }; - use arrow::datatypes::Field; + use arrow::datatypes::{Field, Schema, TimeUnit}; use datafusion_common::stats::Precision; use datafusion_common::{ColumnStatistics, internal_err}; use datafusion_expr::{Operator, SortExpr}; @@ -1957,22 +1959,103 @@ mod tests { } } + #[test] + fn equivalence_properties_projection_reorders_schema() { + // This test ensures `project_orderings` is applied even when there are no + // partition columns: a projection reorders the schema, and the output ordering + // is specified in projected schema indices. + let file_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int64, false), + Field::new("c", DataType::Utf8, true), + ])); + let object_store_url = ObjectStoreUrl::parse("test:///").unwrap(); + let table_schema = TableSchema::from(file_schema); + + let file_source = mock_source(table_schema); + + let config = FileScanConfigBuilder::new(object_store_url.clone(), file_source) + .with_projection_indices(Some(vec![2, 0])) + .unwrap() + // Indices are in the projected schema: [c, a] -> [0, 1]. + .with_output_ordering(vec![[sort_expr("c", 0), sort_expr("a", 1)].into()]) + .build(); + + let eq_properties = config.eq_properties(); + let ordering = eq_properties + .output_ordering() + .expect("expected output ordering"); + + let first_col = ordering[0].expr.as_any().downcast_ref::().unwrap(); + assert_eq!(first_col.name(), "c"); + assert_eq!(first_col.index(), 0); + + let second_col = ordering[1].expr.as_any().downcast_ref::().unwrap(); + assert_eq!(second_col.name(), "a"); + assert_eq!(second_col.index(), 1); + } + + #[test] + fn equivalence_properties_reindex_output_ordering_for_partition_cols() { + // This test exercises `project_orderings` via `eq_properties()` when a projection + // reorders columns and includes a partition column. + let file_schema = Arc::new(Schema::new(vec![ + Field::new("f", DataType::Float64, true), + Field::new( + "time", + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + ])); + let object_store_url = ObjectStoreUrl::parse("test:///").unwrap(); + + let table_schema = TableSchema::new( + file_schema, + vec![Arc::new(Field::new( + "tag1", + wrap_partition_type_in_dict(DataType::Utf8), + true, + ))], + ); + + let file_source = mock_source(table_schema); + + let config = FileScanConfigBuilder::new(object_store_url.clone(), file_source) + .with_projection_indices(Some(vec![2, 0, 1])) + .unwrap() + // Output ordering is defined on unprojected (base) column indices. We expect + // `project_orderings` to remap these to the projected schema indices. + .with_output_ordering(vec![ + [sort_expr("tag1", 0), sort_expr("time", 1)].into(), + ]) + .build(); + + let eq_properties = config.eq_properties(); + let ordering = eq_properties + .output_ordering() + .expect("expected output ordering"); + + let first_col = ordering[0].expr.as_any().downcast_ref::().unwrap(); + assert_eq!(first_col.name(), "tag1"); + assert_eq!(first_col.index(), 0); + + let second_col = ordering[1].expr.as_any().downcast_ref::().unwrap(); + assert_eq!(second_col.name(), "time"); + assert_eq!(second_col.index(), 2); + } + #[test] fn test_file_scan_config_builder_defaults() { let file_schema = aggr_test_schema(); let object_store_url = ObjectStoreUrl::parse("test:///").unwrap(); - let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let table_schema = TableSchema::from(Arc::clone(&file_schema)); - let file_source: Arc = - Arc::new(MockSource::new(table_schema.clone())); + let file_source = mock_source(table_schema); // Create a builder with only required parameters and build without any additional configurations - let config = FileScanConfigBuilder::new( - object_store_url.clone(), - Arc::clone(&file_source), - ) - .build(); + let config = + FileScanConfigBuilder::new(object_store_url.clone(), file_source).build(); // Verify default values assert_eq!(config.object_store_url, object_store_url); @@ -2025,24 +2108,21 @@ mod tests { let file = PartitionedFile::new("test_file.parquet", 100); let table_schema = TableSchema::new( - Arc::clone(&schema), + schema.clone(), partition_cols.iter().map(|f| Arc::new(f.clone())).collect(), ); - let file_source: Arc = - Arc::new(MockSource::new(table_schema.clone())); + let file_source = mock_source(table_schema); // Create a config with non-default values - let original_config = FileScanConfigBuilder::new( - object_store_url.clone(), - Arc::clone(&file_source), - ) - .with_projection_indices(Some(vec![0, 2])) - .unwrap() - .with_limit(Some(10)) - .with_file(file.clone()) - .with_constraints(Constraints::default()) - .build(); + let original_config = + FileScanConfigBuilder::new(object_store_url.clone(), file_source) + .with_projection_indices(Some(vec![0, 2])) + .unwrap() + .with_limit(Some(10)) + .with_file(file.clone()) + .with_constraints(Constraints::default()) + .build(); // Create a new builder from the config let new_builder = FileScanConfigBuilder::from(original_config); @@ -2470,4 +2550,9 @@ mod tests { Ok(()) } + + /// Returns a PhysicalSortExpr for the given column name and index. + fn sort_expr(name: &str, index: usize) -> PhysicalSortExpr { + PhysicalSortExpr::new_default(Arc::new(Column::new(name, index))) + } } diff --git a/datafusion/datasource/src/test_util.rs b/datafusion/datasource/src/test_util.rs index c8d5dd54cb8a..6cc226fcf394 100644 --- a/datafusion/datasource/src/test_util.rs +++ b/datafusion/datasource/src/test_util.rs @@ -16,7 +16,8 @@ // under the License. use crate::{ - file::FileSource, file_scan_config::FileScanConfig, file_stream::FileOpener, + TableSchema, file::FileSource, file_scan_config::FileScanConfig, + file_stream::FileOpener, }; use std::sync::Arc; @@ -32,14 +33,13 @@ use object_store::ObjectStore; pub(crate) struct MockSource { metrics: ExecutionPlanMetricsSet, filter: Option>, - table_schema: crate::table_schema::TableSchema, + table_schema: TableSchema, projection: crate::projection::SplitProjection, } impl Default for MockSource { fn default() -> Self { - let table_schema = - crate::table_schema::TableSchema::new(Arc::new(Schema::empty()), vec![]); + let table_schema = TableSchema::new(Arc::new(Schema::empty()), vec![]); Self { metrics: ExecutionPlanMetricsSet::new(), filter: None, @@ -50,7 +50,7 @@ impl Default for MockSource { } impl MockSource { - pub fn new(table_schema: impl Into) -> Self { + pub fn new(table_schema: impl Into) -> Self { let table_schema = table_schema.into(); Self { metrics: ExecutionPlanMetricsSet::new(), @@ -66,6 +66,11 @@ impl MockSource { } } +/// Convenience method to create a MockSource as a dynamic FileSource +pub(crate) fn mock_source(table_schema: impl Into) -> Arc { + Arc::new(MockSource::new(table_schema)) +} + impl FileSource for MockSource { fn create_file_opener( &self, @@ -96,7 +101,7 @@ impl FileSource for MockSource { "mock" } - fn table_schema(&self) -> &crate::table_schema::TableSchema { + fn table_schema(&self) -> &TableSchema { &self.table_schema }