diff --git a/datafusion/common/benches/with_hashes.rs b/datafusion/common/benches/with_hashes.rs index 8154c20df88f3..c938afc4c2f2f 100644 --- a/datafusion/common/benches/with_hashes.rs +++ b/datafusion/common/benches/with_hashes.rs @@ -19,11 +19,14 @@ use ahash::RandomState; use arrow::array::{ - Array, ArrayRef, ArrowPrimitiveType, DictionaryArray, GenericStringArray, - NullBufferBuilder, OffsetSizeTrait, PrimitiveArray, StringViewArray, make_array, + Array, ArrayRef, ArrowPrimitiveType, DictionaryArray, GenericStringArray, Int32Array, + Int64Array, ListArray, MapArray, NullBufferBuilder, OffsetSizeTrait, PrimitiveArray, + StringViewArray, StructArray, UnionArray, make_array, +}; +use arrow::buffer::{NullBuffer, OffsetBuffer, ScalarBuffer}; +use arrow::datatypes::{ + ArrowDictionaryKeyType, DataType, Field, Fields, Int32Type, Int64Type, UnionFields, }; -use arrow::buffer::NullBuffer; -use arrow::datatypes::{ArrowDictionaryKeyType, Int32Type, Int64Type}; use criterion::{Bencher, Criterion, criterion_group, criterion_main}; use datafusion_common::hash_utils::with_hashes; use rand::Rng; @@ -37,6 +40,8 @@ const BATCH_SIZE: usize = 8192; struct BenchData { name: &'static str, array: ArrayRef, + /// Union arrays can't have null bitmasks added + supports_nulls: bool, } fn criterion_benchmark(c: &mut Criterion) { @@ -47,50 +52,83 @@ fn criterion_benchmark(c: &mut Criterion) { BenchData { name: "int64", array: primitive_array::(BATCH_SIZE), + supports_nulls: true, }, BenchData { name: "utf8", array: pool.string_array::(BATCH_SIZE), + supports_nulls: true, }, BenchData { name: "large_utf8", array: pool.string_array::(BATCH_SIZE), + supports_nulls: true, }, BenchData { name: "utf8_view", array: pool.string_view_array(BATCH_SIZE), + supports_nulls: true, }, BenchData { name: "utf8_view (small)", array: small_pool.string_view_array(BATCH_SIZE), + supports_nulls: true, }, BenchData { name: "dictionary_utf8_int32", array: pool.dictionary_array::(BATCH_SIZE), + supports_nulls: true, + }, + BenchData { + name: "list_array", + array: list_array(BATCH_SIZE), + supports_nulls: true, + }, + BenchData { + name: "map_array", + array: map_array(BATCH_SIZE), + supports_nulls: true, + }, + BenchData { + name: "sparse_union", + array: sparse_union_array(BATCH_SIZE), + supports_nulls: false, + }, + BenchData { + name: "dense_union", + array: dense_union_array(BATCH_SIZE), + supports_nulls: false, }, ]; - for BenchData { name, array } in cases { - // with_hash has different code paths for single vs multiple arrays and nulls vs no nulls - let nullable_array = add_nulls(&array); + for BenchData { + name, + array, + supports_nulls, + } in cases + { c.bench_function(&format!("{name}: single, no nulls"), |b| { do_hash_test(b, std::slice::from_ref(&array)); }); - c.bench_function(&format!("{name}: single, nulls"), |b| { - do_hash_test(b, std::slice::from_ref(&nullable_array)); - }); c.bench_function(&format!("{name}: multiple, no nulls"), |b| { let arrays = vec![array.clone(), array.clone(), array.clone()]; do_hash_test(b, &arrays); }); - c.bench_function(&format!("{name}: multiple, nulls"), |b| { - let arrays = vec![ - nullable_array.clone(), - nullable_array.clone(), - nullable_array.clone(), - ]; - do_hash_test(b, &arrays); - }); + // Union arrays can't have null bitmasks + if supports_nulls { + let nullable_array = add_nulls(&array); + c.bench_function(&format!("{name}: single, nulls"), |b| { + do_hash_test(b, std::slice::from_ref(&nullable_array)); + }); + c.bench_function(&format!("{name}: multiple, nulls"), |b| { + let arrays = vec![ + nullable_array.clone(), + nullable_array.clone(), + nullable_array.clone(), + ]; + do_hash_test(b, &arrays); + }); + } } } @@ -205,5 +243,221 @@ where Arc::new(array) } -criterion_group!(benches, criterion_benchmark); +/// Benchmark sliced arrays to demonstrate the optimization for when an array is +/// sliced, the underlying buffer may be much larger than what's referenced by +/// the slice. The optimization avoids hashing unreferenced elements. +fn sliced_array_benchmark(c: &mut Criterion) { + // Test with different slice ratios: slice_size / total_size + // Smaller ratio = more potential savings from the optimization + let slice_ratios = [10, 5, 2]; // 1/10, 1/5, 1/2 of total + + for ratio in slice_ratios { + let total_rows = BATCH_SIZE * ratio; + let slice_offset = BATCH_SIZE * (ratio / 2); // Take from middle + let slice_len = BATCH_SIZE; + + // Sliced ListArray + { + let full_array = list_array(total_rows); + let sliced: ArrayRef = Arc::new( + full_array + .as_any() + .downcast_ref::() + .unwrap() + .slice(slice_offset, slice_len), + ); + c.bench_function( + &format!("list_array_sliced: 1/{ratio} of {total_rows} rows"), + |b| { + do_hash_test_with_len(b, std::slice::from_ref(&sliced), slice_len); + }, + ); + } + + // Sliced MapArray + { + let full_array = map_array(total_rows); + let sliced: ArrayRef = Arc::new( + full_array + .as_any() + .downcast_ref::() + .unwrap() + .slice(slice_offset, slice_len), + ); + c.bench_function( + &format!("map_array_sliced: 1/{ratio} of {total_rows} rows"), + |b| { + do_hash_test_with_len(b, std::slice::from_ref(&sliced), slice_len); + }, + ); + } + + // Sliced Sparse UnionArray + { + let full_array = sparse_union_array(total_rows); + let sliced: ArrayRef = Arc::new( + full_array + .as_any() + .downcast_ref::() + .unwrap() + .slice(slice_offset, slice_len), + ); + c.bench_function( + &format!("sparse_union_sliced: 1/{ratio} of {total_rows} rows"), + |b| { + do_hash_test_with_len(b, std::slice::from_ref(&sliced), slice_len); + }, + ); + } + } +} + +fn do_hash_test_with_len(b: &mut Bencher, arrays: &[ArrayRef], expected_len: usize) { + let state = RandomState::new(); + b.iter(|| { + with_hashes(arrays, &state, |hashes| { + assert_eq!(hashes.len(), expected_len); + Ok(()) + }) + .unwrap(); + }); +} + +fn list_array(num_rows: usize) -> ArrayRef { + let mut rng = make_rng(); + let elements_per_row = 5; + let total_elements = num_rows * elements_per_row; + + let values: Int64Array = (0..total_elements) + .map(|_| Some(rng.random::())) + .collect(); + let offsets: Vec = (0..=num_rows) + .map(|i| (i * elements_per_row) as i32) + .collect(); + + Arc::new(ListArray::new( + Arc::new(Field::new("item", DataType::Int64, true)), + OffsetBuffer::new(ScalarBuffer::from(offsets)), + Arc::new(values), + None, + )) +} + +fn map_array(num_rows: usize) -> ArrayRef { + let mut rng = make_rng(); + let entries_per_row = 5; + let total_entries = num_rows * entries_per_row; + + let keys: Int32Array = (0..total_entries) + .map(|_| Some(rng.random::())) + .collect(); + let values: Int64Array = (0..total_entries) + .map(|_| Some(rng.random::())) + .collect(); + let offsets: Vec = (0..=num_rows) + .map(|i| (i * entries_per_row) as i32) + .collect(); + + let entries = StructArray::try_new( + Fields::from(vec![ + Field::new("keys", DataType::Int32, false), + Field::new("values", DataType::Int64, true), + ]), + vec![Arc::new(keys), Arc::new(values)], + None, + ) + .unwrap(); + + Arc::new(MapArray::new( + Arc::new(Field::new( + "entries", + DataType::Struct(Fields::from(vec![ + Field::new("keys", DataType::Int32, false), + Field::new("values", DataType::Int64, true), + ])), + false, + )), + OffsetBuffer::new(ScalarBuffer::from(offsets)), + entries, + None, + false, + )) +} + +fn sparse_union_array(num_rows: usize) -> ArrayRef { + let mut rng = make_rng(); + let num_types = 5; + + let type_ids: Vec = (0..num_rows) + .map(|_| rng.random_range(0..num_types) as i8) + .collect(); + let (fields, children): (Vec<_>, Vec<_>) = (0..num_types) + .map(|i| { + ( + ( + i as i8, + Arc::new(Field::new(format!("f{i}"), DataType::Int64, true)), + ), + primitive_array::(num_rows), + ) + }) + .unzip(); + + Arc::new( + UnionArray::try_new( + UnionFields::from_iter(fields), + ScalarBuffer::from(type_ids), + None, + children, + ) + .unwrap(), + ) +} + +fn dense_union_array(num_rows: usize) -> ArrayRef { + let mut rng = make_rng(); + let num_types = 5; + let type_ids: Vec = (0..num_rows) + .map(|_| rng.random_range(0..num_types) as i8) + .collect(); + + let mut type_counts = vec![0i32; num_types]; + for &tid in &type_ids { + type_counts[tid as usize] += 1; + } + + let mut current_offsets = vec![0i32; num_types]; + let offsets: Vec = type_ids + .iter() + .map(|&tid| { + let offset = current_offsets[tid as usize]; + current_offsets[tid as usize] += 1; + offset + }) + .collect(); + + let (fields, children): (Vec<_>, Vec<_>) = (0..num_types) + .map(|i| { + ( + ( + i as i8, + Arc::new(Field::new(format!("f{i}"), DataType::Int64, true)), + ), + primitive_array::(type_counts[i] as usize), + ) + }) + .unzip(); + + Arc::new( + UnionArray::try_new( + UnionFields::from_iter(fields), + ScalarBuffer::from(type_ids), + Some(ScalarBuffer::from(offsets)), + children, + ) + .unwrap(), + ) +} + +criterion_group!(benches, criterion_benchmark, sliced_array_benchmark); criterion_main!(benches); diff --git a/datafusion/common/src/hash_utils.rs b/datafusion/common/src/hash_utils.rs index 19c251c1a149a..fb96610e7cf0c 100644 --- a/datafusion/common/src/hash_utils.rs +++ b/datafusion/common/src/hash_utils.rs @@ -20,10 +20,12 @@ use ahash::RandomState; use arrow::array::types::{IntervalDayTime, IntervalMonthDayNano}; use arrow::array::*; +use arrow::compute::take; use arrow::datatypes::*; #[cfg(not(feature = "force_hash_collisions"))] use arrow::{downcast_dictionary_array, downcast_primitive_array}; use itertools::Itertools; +use std::collections::HashMap; #[cfg(not(feature = "force_hash_collisions"))] use crate::cast::{ @@ -481,15 +483,29 @@ fn hash_map_array( let offsets = array.offsets(); // Create hashes for each entry in each row - let mut values_hashes = vec![0u64; array.entries().len()]; - create_hashes(array.entries().columns(), random_state, &mut values_hashes)?; + let first_offset = offsets.first().copied().unwrap_or_default() as usize; + let last_offset = offsets.last().copied().unwrap_or_default() as usize; + let entries_len = last_offset - first_offset; + + // Only hash the entries that are actually referenced + let mut values_hashes = vec![0u64; entries_len]; + let entries = array.entries(); + let sliced_columns: Vec = entries + .columns() + .iter() + .map(|col| col.slice(first_offset, entries_len)) + .collect(); + create_hashes(&sliced_columns, random_state, &mut values_hashes)?; // Combine the hashes for entries on each row with each other and previous hash for that row + // Adjust indices by first_offset since values_hashes is sliced starting from first_offset if let Some(nulls) = nulls { for (i, (start, stop)) in offsets.iter().zip(offsets.iter().skip(1)).enumerate() { if nulls.is_valid(i) { let hash = &mut hashes_buffer[i]; - for values_hash in &values_hashes[start.as_usize()..stop.as_usize()] { + for values_hash in &values_hashes + [start.as_usize() - first_offset..stop.as_usize() - first_offset] + { *hash = combine_hashes(*hash, *values_hash); } } @@ -497,7 +513,9 @@ fn hash_map_array( } else { for (i, (start, stop)) in offsets.iter().zip(offsets.iter().skip(1)).enumerate() { let hash = &mut hashes_buffer[i]; - for values_hash in &values_hashes[start.as_usize()..stop.as_usize()] { + for values_hash in &values_hashes + [start.as_usize() - first_offset..stop.as_usize() - first_offset] + { *hash = combine_hashes(*hash, *values_hash); } } @@ -602,14 +620,42 @@ fn hash_union_array( random_state: &RandomState, hashes_buffer: &mut [u64], ) -> Result<()> { - use std::collections::HashMap; - let DataType::Union(union_fields, _mode) = array.data_type() else { unreachable!() }; - let mut child_hashes = HashMap::with_capacity(union_fields.len()); + if array.is_dense() { + // Dense union: children only contain values of their type, so they're already compact. + // Use the default hashing approach which is efficient for dense unions. + hash_union_array_default(array, union_fields, random_state, hashes_buffer) + } else { + // Sparse union: each child has the same length as the union array. + // Optimization: only hash the elements that are actually referenced by type_ids, + // instead of hashing all K*N elements (where K = num types, N = array length). + hash_sparse_union_array(array, union_fields, random_state, hashes_buffer) + } +} + +/// Default hashing for union arrays - hashes all elements of each child array fully. +/// +/// This approach works for both dense and sparse union arrays: +/// - Dense unions: children are compact (each child only contains values of that type) +/// - Sparse unions: children have the same length as the union array +/// +/// For sparse unions with 3+ types, the optimized take/scatter approach in +/// `hash_sparse_union_array` is more efficient, but for 1-2 types or dense unions, +/// this simpler approach is preferred. +#[cfg(not(feature = "force_hash_collisions"))] +fn hash_union_array_default( + array: &UnionArray, + union_fields: &UnionFields, + random_state: &RandomState, + hashes_buffer: &mut [u64], +) -> Result<()> { + let mut child_hashes: HashMap> = + HashMap::with_capacity(union_fields.len()); + // Hash each child array fully for (type_id, _field) in union_fields.iter() { let child = array.child(type_id); let mut child_hash_buffer = vec![0; child.len()]; @@ -618,6 +664,9 @@ fn hash_union_array( child_hashes.insert(type_id, child_hash_buffer); } + // Combine hashes for each row using the appropriate child offset + // For dense unions: value_offset points to the actual position in the child + // For sparse unions: value_offset equals the row index #[expect(clippy::needless_range_loop)] for i in 0..array.len() { let type_id = array.type_id(i); @@ -630,6 +679,69 @@ fn hash_union_array( Ok(()) } +/// Hash a sparse union array. +/// Sparse unions have child arrays with the same length as the union array. +/// For 3+ types, we optimize by only hashing the N elements that are actually used +/// (via take/scatter), instead of hashing all K*N elements. +/// +/// For 1-2 types, the overhead of take/scatter outweighs the benefit, so we use +/// the default approach of hashing all children (same as dense unions). +#[cfg(not(feature = "force_hash_collisions"))] +fn hash_sparse_union_array( + array: &UnionArray, + union_fields: &UnionFields, + random_state: &RandomState, + hashes_buffer: &mut [u64], +) -> Result<()> { + use std::collections::HashMap; + + // For 1-2 types, the take/scatter overhead isn't worth it. + // Fall back to the default approach (same as dense union). + if union_fields.len() <= 2 { + return hash_union_array_default( + array, + union_fields, + random_state, + hashes_buffer, + ); + } + + let type_ids = array.type_ids(); + + // Group indices by type_id + let mut indices_by_type: HashMap> = HashMap::new(); + for (i, &type_id) in type_ids.iter().enumerate() { + indices_by_type.entry(type_id).or_default().push(i as u32); + } + + // For each type, extract only the needed elements, hash them, and scatter back + for (type_id, _field) in union_fields.iter() { + if let Some(indices) = indices_by_type.get(&type_id) { + if indices.is_empty() { + continue; + } + + let child = array.child(type_id); + let indices_array = UInt32Array::from(indices.clone()); + + // Extract only the elements we need using take() + let filtered = take(child.as_ref(), &indices_array, None)?; + + // Hash the filtered array + let mut filtered_hashes = vec![0u64; filtered.len()]; + create_hashes([&filtered], random_state, &mut filtered_hashes)?; + + // Scatter hashes back to correct positions + for (hash, &idx) in filtered_hashes.iter().zip(indices.iter()) { + hashes_buffer[idx as usize] = + combine_hashes(hashes_buffer[idx as usize], *hash); + } + } + } + + Ok(()) +} + #[cfg(not(feature = "force_hash_collisions"))] fn hash_fixed_list_array( array: &FixedSizeListArray,