From 7e6456773caa7736279b83c242c2b64f5e39af10 Mon Sep 17 00:00:00 2001 From: Jonas Irgens Kylling Date: Tue, 3 Feb 2026 09:40:45 +0100 Subject: [PATCH] Expose virtual columns from the Arrow Parquet reader in datasource-parquet It would be useful to expose the virtual columns of the arrow Parquet reader in the datasource-parquet `ParquetSource` added in https://github.com/apache/arrow-rs/pull/8715. Then engines can use both DataFusion's partition value machinery and the virtual columns. I made a go at it in this PR, but hit some rough edges. This is closer to an issue than a PR, but it is easier to explain with code. The virtual columns we added are a bit difficult to integrate cleanly today. They are part of the physical schema of the Parquet reader, but cannot currently be projected. We need some additional handling to avoid predicate pushdown for virtual columns, to build the correct projection mask, and to build the correct stream schema. See the changes to `opener.rs` in this PR. One alternative would be to modify the arrow-rs implementation to remove these workarounds. Then the only change to `opener.rs` would be `.with_virtual_columns(virtual_columns.to_vec())?` (and maybe even that could be avoided? See the discussion below). What would be the best way forward here? It is redundant that the user needs to specify both `Field::new("row_index", DataType::Int64, false).with_extension_type(RowNumber)`, and add the column in a special way to the reader options with `.with_virtual_columns(virtual_columns.to_vec())?`. When the extension type `RowNumber` is added, we know that it is a virtual column. All users of the `TableSchema/ParquetSource` must know that a schema is built out of three parts: the physical Parquet columns, the virtual columns and the partition columns. From a user perspective, the user would just like to supply a schema. One alternative is to only indicate the column kind using extension types, and the user only supplies a schema. That is, there would be an extension type indicating that a column is a partition column or virtual column, instead of the user supplying this information piecemeal. This may have a performance impact, as we would likely need to extract different extension type columns during planning, which could be problematic for large schemas. Signed-off-by: Jonas Irgens Kylling --- datafusion/datasource-parquet/src/opener.rs | 697 +++++++++++++++++++- datafusion/datasource/src/table_schema.rs | 160 ++++- 2 files changed, 833 insertions(+), 24 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index f87a30265a17b..6f605d7203bf8 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -24,7 +24,7 @@ use crate::{ apply_file_schema_type_coercions, coerce_int96_to_resolution, row_filter, }; use arrow::array::{RecordBatch, RecordBatchOptions}; -use arrow::datatypes::DataType; +use arrow::datatypes::{DataType, SchemaBuilder}; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr::utils::reassign_expr_columns; @@ -213,6 +213,7 @@ impl FileOpener for ParquetOpener { self.projection .project_schema(self.table_schema.table_schema())?, ); + let virtual_columns = Arc::clone(self.table_schema.virtual_columns()); // Build a combined map for replacing column references with literal values. // This includes: @@ -348,7 +349,10 @@ impl FileOpener for ParquetOpener { // unnecessary I/O. We decide later if it is needed to evaluate the // pruning predicates. Thus default to not requesting it from the // underlying reader. - let mut options = ArrowReaderOptions::new().with_page_index(false); + let mut options = ArrowReaderOptions::new() + .with_page_index(false) + .with_virtual_columns(virtual_columns.to_vec())?; + #[cfg(feature = "parquet_encryption")] if let Some(fd_val) = file_decryption_properties { options = options.with_file_decryption_properties(Arc::clone(&fd_val)); @@ -461,9 +465,18 @@ impl FileOpener for ParquetOpener { // Filter pushdown: evaluate predicates during scan if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() { + // Predicate pushdown (or rather, projection) is not supported for virtual columns, + // so we must remove the virtual columns from the schema used for building the row filter + let mut schema_builder = + SchemaBuilder::from(physical_file_schema.as_ref()); + for i in 0..virtual_columns.len() { + schema_builder.remove(physical_file_schema.fields().len() - i - 1); + } + let pushdown_schema = Arc::new(schema_builder.finish()); + let row_filter = row_filter::build_row_filter( &predicate, - &physical_file_schema, + &pushdown_schema, builder.metadata(), reorder_predicates, &file_metrics, @@ -601,8 +614,18 @@ impl FileOpener for ParquetOpener { // metrics from the arrow reader itself let arrow_reader_metrics = ArrowReaderMetrics::enabled(); + // The projection mask should only include physical parquet columns. We know that + // projection column indices > the number of root parquet columns must be virtual or + // partition columns, since these are after the physical columns in the table schema. + let parquet_num_columns = + builder.parquet_schema().root_schema().get_fields().len(); let indices = projection.column_indices(); - let mask = ProjectionMask::roots(builder.parquet_schema(), indices); + let parquet_indices: Vec = indices + .iter() + .filter(|&idx| idx < &parquet_num_columns) + .copied() + .collect(); + let mask = ProjectionMask::roots(builder.parquet_schema(), parquet_indices); let stream = builder .with_projection(mask) @@ -610,13 +633,17 @@ impl FileOpener for ParquetOpener { .with_metrics(arrow_reader_metrics.clone()) .build()?; + // The reader's stream.schema() doesn't include virtual columns, so we add them. + let mut schema_builder = SchemaBuilder::from(stream.schema().as_ref()); + schema_builder.extend(virtual_columns.iter().cloned()); + let stream_schema = Arc::new(schema_builder.finish()); + let files_ranges_pruned_statistics = file_metrics.files_ranges_pruned_statistics.clone(); let predicate_cache_inner_records = file_metrics.predicate_cache_inner_records.clone(); let predicate_cache_records = file_metrics.predicate_cache_records.clone(); - let stream_schema = Arc::clone(stream.schema()); // Check if we need to replace the schema to handle things like differing nullability or metadata. // See note below about file vs. output schema. let replace_schema = !stream_schema.eq(&output_schema); @@ -1018,7 +1045,8 @@ mod test { use super::{ConstantColumns, constant_columns_from_stats}; use crate::{DefaultParquetFileReaderFactory, RowGroupAccess, opener::ParquetOpener}; - use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; + use arrow::array::{Array, AsArray}; + use arrow::datatypes::{DataType, Field, Int32Type, Int64Type, Schema, SchemaRef}; use bytes::{BufMut, BytesMut}; use datafusion_common::{ ColumnStatistics, DataFusionError, ScalarValue, Statistics, record_batch, @@ -1038,7 +1066,7 @@ mod test { use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use futures::{Stream, StreamExt}; use object_store::{ObjectStore, memory::InMemory, path::Path}; - use parquet::arrow::ArrowWriter; + use parquet::arrow::{ArrowWriter, RowNumber}; use parquet::file::properties::WriterProperties; /// Builder for creating [`ParquetOpener`] instances with sensible defaults for tests. @@ -1159,16 +1187,16 @@ mod test { let table_schema = self.table_schema.expect( "ParquetOpenerBuilder: table_schema must be set via with_schema() or with_table_schema()", ); - let file_schema = Arc::clone(table_schema.file_schema()); + let full_schema = table_schema.table_schema(); let projection = if let Some(projection) = self.projection { projection } else if let Some(indices) = self.projection_indices { - ProjectionExprs::from_indices(&indices, &file_schema) + // Use table_schema (which includes virtual columns) for projection + ProjectionExprs::from_indices(&indices, full_schema) } else { - // Default: project all columns - let all_indices: Vec = (0..file_schema.fields().len()).collect(); - ProjectionExprs::from_indices(&all_indices, &file_schema) + let all_indices: Vec = (0..full_schema.fields().len()).collect(); + ProjectionExprs::from_indices(&all_indices, full_schema) }; ParquetOpener { @@ -2004,4 +2032,649 @@ mod test { "Reverse scan with non-contiguous row groups should correctly map RowSelection" ); } + + /// Options for reading parquet files in tests + #[derive(Default)] + struct ReadOptions { + projection: Option>, + partition_values: Option>, + predicate: Option>, + } + + /// Writes a batch to parquet and reads it back with the given options. + /// Returns a single RecordBatch. + async fn read_parquet( + batch: arrow::record_batch::RecordBatch, + table_schema: TableSchema, + options: ReadOptions, + ) -> arrow::record_batch::RecordBatch { + let store = Arc::new(InMemory::new()) as Arc; + + let data_size = + write_parquet(Arc::clone(&store), "test.parquet", batch.clone()).await; + + let mut file = PartitionedFile::new("test.parquet".to_string(), data_size as u64); + if let Some(partition_values) = options.partition_values { + file.partition_values = partition_values; + } + + let mut builder = ParquetOpenerBuilder::new() + .with_store(Arc::clone(&store)) + .with_table_schema(table_schema.clone()); + if let Some(projection) = options.projection { + builder = builder.with_projection_indices(&projection); + } + if let Some(predicate) = options.predicate { + builder = builder + .with_predicate(predicate) + .with_pushdown_filters(true); + } + let opener = builder.build(); + let mut stream = opener.open(file).unwrap().await.unwrap(); + + let mut batches = vec![]; + while let Some(Ok(batch)) = stream.next().await { + batches.push(batch); + } + assert_eq!(batches.len(), 1, "Expected exactly one batch"); + batches.into_iter().next().unwrap() + } + + #[tokio::test] + async fn test_virtual_columns() { + let parquet_data = + record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); + let row_number_field = Arc::new( + Field::new("row_index", DataType::Int64, false) + .with_extension_type(RowNumber), + ); + let table_schema = TableSchema::new_with_virtual_columns( + Arc::clone(&parquet_data.schema()), + vec![row_number_field], + vec![], + ); + let batch = + read_parquet(parquet_data, table_schema, ReadOptions::default()).await; + + let output_schema = batch.schema(); + assert_eq!( + output_schema.fields().len(), + 2, + "Output should have 2 columns (a and row_index)" + ); + assert_eq!(output_schema.field(0).name(), "a"); + assert_eq!(output_schema.field(1).name(), "row_index"); + + let a_values = batch + .column(0) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!(a_values, vec![1, 2, 3]); + + let row_index_values = batch + .column(1) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!(row_index_values, vec![0, 1, 2]); + } + + #[tokio::test] + async fn test_virtual_columns_with_projections() { + let parquet_data = + record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); + let row_number_field = Arc::new( + Field::new("row_index", DataType::Int64, false) + .with_extension_type(RowNumber), + ); + let table_schema = TableSchema::new_with_virtual_columns( + Arc::clone(&parquet_data.schema()), + vec![row_number_field], + vec![], + ); + + // Project only the virtual column (index 1) + let batch = read_parquet( + parquet_data, + table_schema, + ReadOptions { + projection: Some(vec![1]), + ..Default::default() + }, + ) + .await; + + let output_schema = batch.schema(); + assert_eq!( + output_schema.fields().len(), + 1, + "Output should have 1 column (row_index)" + ); + assert_eq!(output_schema.field(0).name(), "row_index"); + + let row_index_values = batch + .column(0) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!(row_index_values, vec![0, 1, 2]); + } + + #[tokio::test] + async fn test_virtual_columns_with_partition_columns() { + let parquet_data = + record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); + + let row_number_field = Arc::new( + Field::new("row_index", DataType::Int64, false) + .with_extension_type(RowNumber), + ); + let partition_col = Arc::new(Field::new("region", DataType::Utf8, false)); + let table_schema = TableSchema::new_with_virtual_columns( + Arc::clone(&parquet_data.schema()), + vec![row_number_field], + vec![partition_col], + ); + + // Project all columns: file column (0), virtual column (1), partition column (2) + let batch = read_parquet( + parquet_data, + table_schema, + ReadOptions { + partition_values: Some(vec![ScalarValue::Utf8(Some( + "europe".to_string(), + ))]), + ..Default::default() + }, + ) + .await; + + let output_schema = batch.schema(); + assert_eq!( + output_schema.fields().len(), + 3, + "Output should have 3 columns (a, row_index, region)" + ); + assert_eq!(output_schema.field(0).name(), "a"); + assert_eq!(output_schema.field(1).name(), "row_index"); + assert_eq!(output_schema.field(2).name(), "region"); + + let a_values = batch + .column(0) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!(a_values, vec![1, 2, 3], "File column 'a' values"); + + let row_index_values = batch + .column(1) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!( + row_index_values, + vec![0, 1, 2], + "Virtual column 'row_index' values" + ); + + let region_values = batch + .column(2) + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| v.unwrap().to_string()) + .collect::>(); + assert_eq!( + region_values, + vec!["europe", "europe", "europe"], + "Partition column 'region' values" + ); + } + + #[tokio::test] + async fn test_partition_and_virtual_columns_only() { + let parquet_data = + record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); + + let row_number_field = Arc::new( + Field::new("row_index", DataType::Int64, false) + .with_extension_type(RowNumber), + ); + let partition_col = Arc::new(Field::new("year", DataType::Int32, false)); + let table_schema = TableSchema::new_with_virtual_columns( + Arc::clone(&parquet_data.schema()), + vec![row_number_field], + vec![partition_col], + ); + + let batch = read_parquet( + parquet_data, + table_schema, + ReadOptions { + projection: Some(vec![1, 2]), + partition_values: Some(vec![ScalarValue::Int32(Some(2026))]), + ..Default::default() + }, + ) + .await; + + let output_schema = batch.schema(); + assert_eq!( + output_schema.fields().len(), + 2, + "Output should have 2 columns (row_index, year)" + ); + assert_eq!(output_schema.field(0).name(), "row_index"); + assert_eq!(output_schema.field(1).name(), "year"); + + let row_index_values = batch + .column(0) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!( + row_index_values, + vec![0, 1, 2], + "Virtual column 'row_index' values" + ); + + let year_values = batch + .column(1) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!( + year_values, + vec![2026, 2026, 2026], + "Partition column 'year' values" + ); + } + + #[tokio::test] + async fn test_nested_schema_projections() { + use arrow::array::{ArrayRef, Int32Array, StructArray}; + // Create nested schema: a: {b: int32, c: {d: int32, e: int32}} + let inner_struct_fields = vec![ + Field::new("d", DataType::Int32, false), + Field::new("e", DataType::Int32, false), + ]; + let inner_struct_type = DataType::Struct(inner_struct_fields.clone().into()); + + let outer_struct_fields = vec![ + Field::new("b", DataType::Int32, false), + Field::new("c", inner_struct_type.clone(), false), + ]; + let outer_struct_type = DataType::Struct(outer_struct_fields.clone().into()); + + let a_struct = { + let d_array: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 30])); + let e_array: ArrayRef = Arc::new(Int32Array::from(vec![100, 200, 300])); + let c_struct = StructArray::from(vec![ + (Arc::new(inner_struct_fields[0].clone()), d_array), + (Arc::new(inner_struct_fields[1].clone()), e_array), + ]); + let b_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3])); + let c_array: ArrayRef = Arc::new(c_struct); + Arc::new(StructArray::from(vec![ + (Arc::new(outer_struct_fields[0].clone()), b_array), + (Arc::new(outer_struct_fields[1].clone()), c_array), + ])) + }; + + let row_number_field = Arc::new( + Field::new("row_index", DataType::Int64, false) + .with_extension_type(RowNumber), + ); + + // File schema: x: int32, a: {b: int32, c: {d: int32, e: int32}}, y: int32 + // Table schema: x, a, y, row_index (indices 0, 1, 2, 3) + let file_schema = Arc::new(Schema::new(vec![ + Field::new("x", DataType::Int32, false), + Field::new("a", outer_struct_type.clone(), false), + Field::new("y", DataType::Int32, false), + ])); + + let x_array: ArrayRef = Arc::new(Int32Array::from(vec![100, 200, 300])); + let y_array: ArrayRef = Arc::new(Int32Array::from(vec![1000, 2000, 3000])); + + let parquet_data = arrow::record_batch::RecordBatch::try_new( + file_schema.clone(), + vec![x_array, a_struct, y_array], + ) + .unwrap(); + + let table_schema = TableSchema::new_with_virtual_columns( + file_schema, + vec![row_number_field], + vec![], + ); + + // Test 1: Read all columns including row_index + { + let batch = read_parquet( + parquet_data.clone(), + table_schema.clone(), + ReadOptions::default(), + ) + .await; + + assert_eq!(batch.schema().fields().len(), 4); + assert_eq!(batch.schema().field(0).name(), "x"); + assert_eq!(batch.schema().field(1).name(), "a"); + assert_eq!(batch.schema().field(2).name(), "y"); + assert_eq!(batch.schema().field(3).name(), "row_index"); + + let row_index_values = batch + .column(3) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!(row_index_values, vec![0, 1, 2]); + + let a_col = batch.column(1).as_struct(); + let b_values = a_col + .column(0) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!(b_values, vec![1, 2, 3]); + } + + // Test 2: Project nested struct and row_index only + { + let batch = read_parquet( + parquet_data.clone(), + table_schema.clone(), + ReadOptions { + projection: Some(vec![1, 3]), + ..Default::default() + }, + ) + .await; + + assert_eq!(batch.schema().fields().len(), 2); + assert_eq!(batch.schema().field(0).name(), "a"); + assert_eq!(batch.schema().field(1).name(), "row_index"); + + let row_index_values = batch + .column(1) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!(row_index_values, vec![0, 1, 2]); + + let a_col = batch.column(0).as_struct(); + let c_col = a_col.column(1).as_struct(); + let d_values = c_col + .column(0) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!(d_values, vec![10, 20, 30]); + } + + // Test 3: Project only primitive columns with row_index (skip nested struct) + { + let batch = read_parquet( + parquet_data.clone(), + table_schema.clone(), + ReadOptions { + projection: Some(vec![0, 2, 3]), // x, y, row_index - skip 'a' + ..Default::default() + }, + ) + .await; + + assert_eq!(batch.schema().fields().len(), 3); + assert_eq!(batch.schema().field(0).name(), "x"); + assert_eq!(batch.schema().field(1).name(), "y"); + assert_eq!(batch.schema().field(2).name(), "row_index"); + + let x_values = batch + .column(0) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + let y_values = batch + .column(1) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + let row_index_values = batch + .column(2) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!(x_values, vec![100, 200, 300]); + assert_eq!(y_values, vec![1000, 2000, 3000]); + assert_eq!(row_index_values, vec![0, 1, 2]); + } + + // Test 4: Project only the nested column (without row_index) + { + let batch = read_parquet( + parquet_data.clone(), + table_schema.clone(), + ReadOptions { + projection: Some(vec![1]), + ..Default::default() + }, + ) + .await; + + assert_eq!(batch.schema().fields().len(), 1); + assert_eq!(batch.schema().field(0).name(), "a"); + + let a_col = batch.column(0).as_struct(); + let b_values = a_col + .column(0) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!(b_values, vec![1, 2, 3]); + } + + // Test 5: Project columns in different order with row_index + { + let batch = read_parquet( + parquet_data.clone(), + table_schema.clone(), + ReadOptions { + projection: Some(vec![3, 2, 0, 1]), // row_index, y, x, a (reordered) + ..Default::default() + }, + ) + .await; + + assert_eq!(batch.schema().fields().len(), 4); + assert_eq!(batch.schema().field(0).name(), "row_index"); + assert_eq!(batch.schema().field(1).name(), "y"); + assert_eq!(batch.schema().field(2).name(), "x"); + assert_eq!(batch.schema().field(3).name(), "a"); + + let row_index_values = batch + .column(0) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + let y_values = batch + .column(1) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + let x_values = batch + .column(2) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!(row_index_values, vec![0, 1, 2]); + assert_eq!(y_values, vec![1000, 2000, 3000]); + assert_eq!(x_values, vec![100, 200, 300]); + + let a_col = batch.column(3).as_struct(); + let b_values = a_col + .column(0) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!(b_values, vec![1, 2, 3]); + } + + // Test 6: Project only row_index + { + let batch = read_parquet( + parquet_data.clone(), + table_schema.clone(), + ReadOptions { + projection: Some(vec![3]), + ..Default::default() + }, + ) + .await; + + assert_eq!(batch.schema().fields().len(), 1); + assert_eq!(batch.schema().field(0).name(), "row_index"); + + let row_index_values = batch + .column(0) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!(row_index_values, vec![0, 1, 2]); + } + } + + #[tokio::test] + async fn test_predicate_with_virtual_columns() { + let parquet_data = record_batch!(( + "a", + Int32, + vec![Some(10), Some(20), Some(30), Some(40), Some(50)] + )) + .unwrap(); + + let row_number_field = Arc::new( + Field::new("row_index", DataType::Int64, false) + .with_extension_type(RowNumber), + ); + let table_schema = TableSchema::new_with_virtual_columns( + parquet_data.schema(), + vec![row_number_field], + vec![], + ); + + // Test 1: Filter on file column (a > 20) with virtual column in schema + { + let expr = col("a").gt(lit(20)); + let predicate = logical2physical(&expr, table_schema.table_schema()); + + let batch = read_parquet( + parquet_data.clone(), + table_schema.clone(), + ReadOptions { + predicate: Some(predicate), + ..Default::default() + }, + ) + .await; + + let a_values = batch + .column(0) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!(a_values, vec![30, 40, 50]); + + let row_index_values = batch + .column(1) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!(row_index_values, vec![2, 3, 4]); + } + + // Test 2: Filter on virtual column does not have predicate pushdown + { + let expr = col("row_index").eq(lit(2)); + let predicate = logical2physical(&expr, table_schema.table_schema()); + + let batch = read_parquet( + parquet_data.clone(), + table_schema.clone(), + ReadOptions { + predicate: Some(predicate), + projection: Some(vec![0, 1]), // a and row_index + ..Default::default() + }, + ) + .await; + + let a_values = batch + .column(0) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!(a_values, vec![10, 20, 30, 40, 50]); + + let row_index_values = batch + .column(1) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!(row_index_values, vec![0, 1, 2, 3, 4]); + } + + // Test 3: Project only virtual column with predicate on file column + { + let expr = col("a").lt(lit(30)); + let predicate = logical2physical(&expr, table_schema.table_schema()); + + let batch = read_parquet( + parquet_data.clone(), + table_schema.clone(), + ReadOptions { + predicate: Some(predicate), + projection: Some(vec![1]), // Only row_index + ..Default::default() + }, + ) + .await; + assert_eq!(batch.num_columns(), 1); + assert_eq!(batch.schema().field(0).name(), "row_index"); + + let row_index_values = batch + .column(0) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!(row_index_values, vec![0, 1]); + } + } } diff --git a/datafusion/datasource/src/table_schema.rs b/datafusion/datasource/src/table_schema.rs index a45cdbaaea076..ce895bef014b9 100644 --- a/datafusion/datasource/src/table_schema.rs +++ b/datafusion/datasource/src/table_schema.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Helper struct to manage table schemas with partition columns +//! Helper struct to manage table schemas with partition columns and virtual columns use arrow::datatypes::{FieldRef, SchemaBuilder, SchemaRef}; use std::sync::Arc; @@ -23,10 +23,12 @@ use std::sync::Arc; /// Helper to hold table schema information for partitioned data sources. /// /// When reading partitioned data (such as Hive-style partitioning), a table's schema -/// consists of two parts: +/// consists of multiple parts: /// 1. **File schema**: The schema of the actual data files on disk /// 2. **Partition columns**: Columns that are encoded in the directory structure, /// not stored in the files themselves +/// 3. **Virtual columns**: Columns computed during read (e.g., row numbers), +/// not stored in the files /// /// # Example: Partitioned Table /// @@ -38,14 +40,16 @@ use std::sync::Arc; /// /// In this case: /// - **File schema**: The schema of `data.parquet` files (e.g., `[user_id, amount]`) +/// - **Virtual columns**: Computed columns like `row_index` for row numbers /// - **Partition columns**: `[date, region]` extracted from the directory path -/// - **Table schema**: The full schema combining both (e.g., `[user_id, amount, date, region]`) +/// - **Table schema**: The full schema combining all (e.g., `[user_id, amount, row_index, date, region]`) /// /// # When to Use /// /// Use `TableSchema` when: /// - Reading partitioned data sources (Parquet, CSV, etc. with Hive-style partitioning) /// - You need to efficiently access different schema representations without reconstructing them +/// - You need virtual columns like row numbers computed during read /// - You want to avoid repeatedly concatenating file and partition schemas /// /// For non-partitioned data or when working with a single schema representation, @@ -57,12 +61,19 @@ use std::sync::Arc; /// to any representation without repeated allocations or reconstructions. #[derive(Debug, Clone)] pub struct TableSchema { - /// The schema of the data files themselves, without partition columns. + /// The schema of the data files themselves, without virtual columns or partition columns. /// /// For example, if your Parquet files contain `[user_id, amount]`, /// this field holds that schema. file_schema: SchemaRef, + /// Virtual columns that are computed during read. + /// + /// These columns are not stored in the data files but are generated during + /// query execution. Examples include row index (0-based) using parquet's + /// RowNumber extension type. + virtual_columns: Arc>, + /// Columns that are derived from the directory structure (partitioning scheme). /// /// For Hive-style partitioning like `/date=2025-10-10/region=us-west/`, @@ -72,10 +83,11 @@ pub struct TableSchema { /// row during query execution based on the file's location. table_partition_cols: Arc>, - /// The complete table schema: file_schema columns followed by partition columns. + /// The complete table schema: file_schema columns followed by virtual columns + /// and partition columns. /// - /// This is pre-computed during construction by concatenating `file_schema` - /// and `table_partition_cols`, so it can be returned as a cheap reference. + /// This is pre-computed during construction by concatenating `file_schema`, + /// `virtual_columns`, and `table_partition_cols`, so it can be returned as a cheap reference. table_schema: SchemaRef, } @@ -121,12 +133,63 @@ impl TableSchema { builder.extend(table_partition_cols.iter().cloned()); Self { file_schema, + virtual_columns: Arc::new(vec![]), + table_partition_cols: Arc::new(table_partition_cols), + table_schema: Arc::new(builder.finish()), + } + } + + /// Create a new TableSchema from a file schema and virtual columns. + /// + /// The table schema is automatically computed by appending the virtual columns + /// to the file schema. + /// + /// # Arguments + /// + /// * `file_schema` - Schema of the data files (without virtual columns) + /// * `virtual_columns` - Virtual columns to append to each row + /// + /// # Example + /// + /// ``` + /// # use std::sync::Arc; + /// # use arrow::datatypes::{Schema, Field, DataType}; + /// # use datafusion_datasource::TableSchema; + /// let file_schema = Arc::new(Schema::new(vec![ + /// Field::new("user_id", DataType::Int64, false), + /// ])); + /// + /// let virtual_cols = vec![ + /// Arc::new(Field::new("row_index", DataType::Int64, false)), + /// ]; + /// + /// let partition_cols = vec![ + /// Arc::new(Field::new("date", DataType::Utf8, false)), + /// Arc::new(Field::new("region", DataType::Utf8, false)), + /// ]; + /// + /// let table_schema = TableSchema::new_with_virtual_columns(file_schema, virtual_cols, partition_cols); + /// + /// // Table schema will have 2 columns: user_id, row_index + /// assert_eq!(table_schema.table_schema().fields().len(), 2); + /// ``` + pub fn new_with_virtual_columns( + file_schema: SchemaRef, + virtual_columns: Vec, + table_partition_cols: Vec, + ) -> Self { + let mut builder = SchemaBuilder::from(file_schema.as_ref()); + builder.extend(virtual_columns.iter().cloned()); + builder.extend(table_partition_cols.iter().cloned()); + Self { + file_schema, + virtual_columns: Arc::new(virtual_columns), table_partition_cols: Arc::new(table_partition_cols), table_schema: Arc::new(builder.finish()), } } - /// Create a new TableSchema with no partition columns. + /// Create a new TableSchema with no partition columns or virtual columns. /// /// You should prefer calling [`TableSchema::new`] if you have partition columns at /// construction time since it avoids re-computing the table schema. @@ -149,19 +212,51 @@ impl TableSchema { ); table_partition_cols.extend(partition_cols); } + self.recompute_table_schema(); + self + } + + /// Add virtual columns to an existing TableSchema, returning a new instance. + /// + /// Virtual columns are computed during read (e.g., row numbers) and are not + /// stored in the data files. + pub fn with_virtual_columns(mut self, virtual_cols: Vec) -> Self { + if self.virtual_columns.is_empty() { + self.virtual_columns = Arc::new(virtual_cols); + } else { + // Append to existing virtual columns + let virtual_columns = Arc::get_mut(&mut self.virtual_columns).expect( + "Expected to be the sole owner of virtual_columns since this function accepts mut self", + ); + virtual_columns.extend(virtual_cols); + } + self.recompute_table_schema(); + self + } + + /// Recompute the table schema from file schema, partition columns, and virtual columns. + fn recompute_table_schema(&mut self) { let mut builder = SchemaBuilder::from(self.file_schema.as_ref()); + builder.extend(self.virtual_columns.iter().cloned()); builder.extend(self.table_partition_cols.iter().cloned()); self.table_schema = Arc::new(builder.finish()); - self } - /// Get the file schema (without partition columns). + /// Get the file schema (without virtual columns or partition columns). /// /// This is the schema of the actual data files on disk. pub fn file_schema(&self) -> &SchemaRef { &self.file_schema } + /// Get the virtual columns. + /// + /// These are columns computed during read (e.g., row index) that + /// will be appended to each row during query execution. + pub fn virtual_columns(&self) -> &Arc> { + &self.virtual_columns + } + /// Get the table partition columns. /// /// These are the columns derived from the directory structure that @@ -170,10 +265,10 @@ impl TableSchema { &self.table_partition_cols } - /// Get the full table schema (file schema + partition columns). + /// Get the full table schema (file schema + virtual columns + partition columns). /// /// This is the complete schema that will be seen by queries, combining - /// both the columns from the files and the partition columns. + /// all columns from files, virtual, and partition columns. pub fn table_schema(&self) -> &SchemaRef { &self.table_schema } @@ -276,4 +371,45 @@ mod tests { &expected_schema ); } + + #[test] + fn test_virtual_columns() { + let file_schema = Arc::new(Schema::new(vec![ + Field::new("user_id", DataType::Int64, false), + Field::new("amount", DataType::Float64, false), + ])); + + let virtual_cols = + vec![Arc::new(Field::new("row_index", DataType::Int64, false))]; + + let partition_cols = vec![Arc::new(Field::new("date", DataType::Utf8, false))]; + + let table_schema = TableSchema::new_with_virtual_columns( + file_schema.clone(), + virtual_cols.clone(), + partition_cols.clone(), + ); + + // Verify file schema + assert_eq!(table_schema.file_schema().as_ref(), file_schema.as_ref()); + + // Verify virtual columns + assert_eq!(table_schema.virtual_columns().len(), 1); + assert_eq!(table_schema.virtual_columns()[0].name(), "row_index"); + + // Verify partition columns + assert_eq!(table_schema.table_partition_cols().len(), 1); + assert_eq!(table_schema.table_partition_cols()[0].name(), "date"); + + // Verify full table schema has all columns in correct order: + // file_schema + virtual_columns + partition_columns + let expected_fields = vec![ + Field::new("user_id", DataType::Int64, false), + Field::new("amount", DataType::Float64, false), + Field::new("row_index", DataType::Int64, false), + Field::new("date", DataType::Utf8, false), + ]; + let expected_schema = Schema::new(expected_fields); + assert_eq!(table_schema.table_schema().as_ref(), &expected_schema); + } }