From e489ec0dce81a63ec6a8adc94423b36425ab8e20 Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Wed, 4 Feb 2026 21:40:08 +0530 Subject: [PATCH 1/4] chore: Clarify rehash setting in hash utils --- datafusion/common/src/hash_utils.rs | 102 +++++++++++++++++++++++++++- 1 file changed, 101 insertions(+), 1 deletion(-) diff --git a/datafusion/common/src/hash_utils.rs b/datafusion/common/src/hash_utils.rs index 19c251c1a149a..f74ac7642211b 100644 --- a/datafusion/common/src/hash_utils.rs +++ b/datafusion/common/src/hash_utils.rs @@ -16,6 +16,23 @@ // under the License. //! Functionality used both on logical and physical plans +//! +//! ## About `rehash` +//! Many helpers in this module take a `rehash: bool` argument. +//! +//! Conceptually, `hashes_buffer` is an **accumulator** of per-row hash values. When hashing a +//! *single* column, the hasher should **initialize** each row's hash. When hashing *multiple* +//! columns (e.g. for partitioning or joins), subsequent columns should **mix** their value hash +//! into the existing row hash using [`combine_hashes`]. +//! +//! - `rehash = false`: initialize/overwrite the row hash for this column +//! - `rehash = true`: combine this column into an existing row hash +//! +//! [`create_hashes`] sets `rehash` to `false` for the first column and `true` for all following +//! columns, which avoids an unnecessary `combine_hashes` on the first column for performance. +//! +//! Note: some nested-type hashers currently always combine into `hashes_buffer` (see the note in +//! `hash_single_array`). This is intentional to preserve existing behavior. use ahash::RandomState; use arrow::array::types::{IntervalDayTime, IntervalMonthDayNano}; @@ -400,6 +417,8 @@ fn update_hash_for_dict_key( dict_hashes: &[u64], dict_values: &dyn Array, idx: usize, + // `multi_col` is the historical name for what is now referred to as `rehash` elsewhere + // in this module: if true, combine into an existing per-row hash; if false, initialize. multi_col: bool, ) { if dict_values.is_valid(idx) { @@ -413,6 +432,10 @@ fn update_hash_for_dict_key( } /// Hash the values in a dictionary array +/// +/// Note: `multi_col` is equivalent to `rehash` used by other hashing helpers: +/// - `multi_col = false`: initialize/overwrite `hashes_buffer` for this column +/// - `multi_col = true`: combine into existing per-row hashes #[cfg(not(feature = "force_hash_collisions"))] fn hash_dictionary( array: &DictionaryArray, @@ -449,6 +472,14 @@ fn hash_struct_array( random_state: &RandomState, hashes_buffer: &mut [u64], ) -> Result<()> { + // This nested-type hasher currently always combines its computed struct-row hash + // into `hashes_buffer` (equivalent to `rehash=true`). This preserves existing + // behavior for single-column hashing of nested types. + // + // If we were to add a `rehash` flag here and make `rehash=false` overwrite the + // buffer, it would change the numeric hash values produced for standalone + // Struct columns. + let nulls = array.nulls(); let row_len = array.len(); @@ -477,6 +508,9 @@ fn hash_map_array( random_state: &RandomState, hashes_buffer: &mut [u64], ) -> Result<()> { + // This nested-type hasher currently always combines entry hashes into + // `hashes_buffer` (equivalent to `rehash=true`) to preserve existing behavior. + let nulls = array.nulls(); let offsets = array.offsets(); @@ -515,6 +549,9 @@ fn hash_list_array( where OffsetSize: OffsetSizeTrait, { + // This nested-type hasher currently always combines element hashes into + // `hashes_buffer` (equivalent to `rehash=true`) to preserve existing behavior. + // In case values is sliced, hash only the bytes used by the offsets of this ListArray let first_offset = array.value_offsets().first().cloned().unwrap_or_default(); let last_offset = array.value_offsets().last().cloned().unwrap_or_default(); @@ -566,6 +603,9 @@ fn hash_list_view_array( where OffsetSize: OffsetSizeTrait, { + // This nested-type hasher currently always combines element hashes into + // `hashes_buffer` (equivalent to `rehash=true`) to preserve existing behavior. + let values = array.values(); let offsets = array.value_offsets(); let sizes = array.value_sizes(); @@ -602,6 +642,9 @@ fn hash_union_array( random_state: &RandomState, hashes_buffer: &mut [u64], ) -> Result<()> { + // This nested-type hasher currently always combines the chosen child hash into + // `hashes_buffer` (equivalent to `rehash=true`) to preserve existing behavior. + use std::collections::HashMap; let DataType::Union(union_fields, _mode) = array.data_type() else { @@ -636,6 +679,9 @@ fn hash_fixed_list_array( random_state: &RandomState, hashes_buffer: &mut [u64], ) -> Result<()> { + // This nested-type hasher currently always combines element hashes into + // `hashes_buffer` (equivalent to `rehash=true`) to preserve existing behavior. + let values = array.values(); let value_length = array.value_length() as usize; let nulls = array.nulls(); @@ -760,6 +806,13 @@ fn hash_single_array( array => hash_dictionary(array, random_state, hashes_buffer, rehash)?, _ => unreachable!() } + // NOTE: The nested-type hashers below currently always *combine* their computed + // nested-value hash into `hashes_buffer` (i.e. they effectively behave as if + // `rehash=true`). This preserves existing hash values for these types. + // + // In other words, unlike primitive-like arrays, nested types do not currently + // "initialize" the buffer when they are the first/only column. + // Changing that would be a behavioral change for single-column hashing. DataType::Struct(_) => { let array = as_struct_array(array)?; hash_struct_array(array, random_state, hashes_buffer)?; @@ -872,7 +925,10 @@ where T: AsDynArray, { for (i, array) in arrays.into_iter().enumerate() { - // combine hashes with `combine_hashes` for all columns besides the first + // `hashes_buffer` is a per-row accumulator. + // + // First column: initialize hashes (no need to call `combine_hashes`) + // Subsequent columns: combine with existing per-row hash let rehash = i >= 1; hash_single_array(array.as_dyn_array(), random_state, hashes_buffer, rehash)?; } @@ -1194,6 +1250,50 @@ mod tests { assert_eq!(hashes[1], hashes[6]); // null vs empty list } + #[test] + #[cfg(not(feature = "force_hash_collisions"))] + fn create_multi_column_hash_with_list_array() -> Result<()> { + // Validate that nested types participate in multi-column hashing. + let data = vec![ + Some(vec![Some(0), Some(1), Some(2)]), + None, + Some(vec![Some(3), None, Some(5)]), + Some(vec![Some(3), None, Some(5)]), + None, + Some(vec![Some(0), Some(1), Some(2)]), + Some(vec![]), + ]; + let list_array: ArrayRef = + Arc::new(ListArray::from_iter_primitive::(data)); + let extra_col: ArrayRef = + Arc::new(Int32Array::from(vec![10, 11, 12, 12, 11, 10, 13])); + + let random_state = RandomState::with_seeds(0, 0, 0, 0); + + let mut one_col_hashes = vec![0; list_array.len()]; + create_hashes( + &[Arc::clone(&list_array)], + &random_state, + &mut one_col_hashes, + )?; + + let mut two_col_hashes = vec![0; list_array.len()]; + create_hashes( + &[Arc::clone(&list_array), Arc::clone(&extra_col)], + &random_state, + &mut two_col_hashes, + )?; + + assert_ne!(one_col_hashes, two_col_hashes); + + // Equalities from the underlying list content should still hold when adding a column + assert_eq!(two_col_hashes[0], two_col_hashes[5]); + assert_eq!(two_col_hashes[1], two_col_hashes[4]); + assert_eq!(two_col_hashes[2], two_col_hashes[3]); + + Ok(()) + } + #[test] #[cfg(not(feature = "force_hash_collisions"))] fn create_hashes_for_sliced_list_arrays() { From 1196cc085b85d43f3a886d3299cec0a423599553 Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Thu, 5 Feb 2026 11:14:58 +0530 Subject: [PATCH 2/4] make the naming consistent and refactor --- datafusion/common/src/hash_utils.rs | 267 +++++++++++++++++++--------- 1 file changed, 184 insertions(+), 83 deletions(-) diff --git a/datafusion/common/src/hash_utils.rs b/datafusion/common/src/hash_utils.rs index f74ac7642211b..24d914eb98626 100644 --- a/datafusion/common/src/hash_utils.rs +++ b/datafusion/common/src/hash_utils.rs @@ -31,9 +31,6 @@ //! [`create_hashes`] sets `rehash` to `false` for the first column and `true` for all following //! columns, which avoids an unnecessary `combine_hashes` on the first column for performance. //! -//! Note: some nested-type hashers currently always combine into `hashes_buffer` (see the note in -//! `hash_single_array`). This is intentional to preserve existing behavior. - use ahash::RandomState; use arrow::array::types::{IntervalDayTime, IntervalMonthDayNano}; use arrow::array::*; @@ -60,6 +57,19 @@ pub fn combine_hashes(l: u64, r: u64) -> u64 { hash.wrapping_mul(37).wrapping_add(r) } +/// Applies `row_hash` into `dst` according to `rehash` semantics. +/// +/// - `rehash = false`: initialize/overwrite `dst` +/// - `rehash = true`: combine `row_hash` into `dst` +#[inline] +fn apply_row_hash(dst: &mut u64, row_hash: u64, rehash: bool) { + if rehash { + *dst = combine_hashes(row_hash, *dst); + } else { + *dst = row_hash; + } +} + /// Maximum size for the thread-local hash buffer before truncation (4MB = 524,288 u64 elements). /// The goal of this is to avoid unbounded memory growth that would appear as a memory leak. /// We allow temporary allocations beyond this size, but after use the buffer is truncated @@ -149,8 +159,8 @@ where } #[cfg(not(feature = "force_hash_collisions"))] -fn hash_null(random_state: &RandomState, hashes_buffer: &'_ mut [u64], mul_col: bool) { - if mul_col { +fn hash_null(random_state: &RandomState, hashes_buffer: &'_ mut [u64], rehash: bool) { + if rehash { hashes_buffer.iter_mut().for_each(|hash| { // stable hash for null value *hash = combine_hashes(random_state.hash_one(1), *hash); @@ -417,12 +427,10 @@ fn update_hash_for_dict_key( dict_hashes: &[u64], dict_values: &dyn Array, idx: usize, - // `multi_col` is the historical name for what is now referred to as `rehash` elsewhere - // in this module: if true, combine into an existing per-row hash; if false, initialize. - multi_col: bool, + rehash: bool, ) { if dict_values.is_valid(idx) { - if multi_col { + if rehash { *hash = combine_hashes(dict_hashes[idx], *hash); } else { *hash = dict_hashes[idx]; @@ -432,16 +440,12 @@ fn update_hash_for_dict_key( } /// Hash the values in a dictionary array -/// -/// Note: `multi_col` is equivalent to `rehash` used by other hashing helpers: -/// - `multi_col = false`: initialize/overwrite `hashes_buffer` for this column -/// - `multi_col = true`: combine into existing per-row hashes #[cfg(not(feature = "force_hash_collisions"))] fn hash_dictionary( array: &DictionaryArray, random_state: &RandomState, hashes_buffer: &mut [u64], - multi_col: bool, + rehash: bool, ) -> Result<()> { // Hash each dictionary value once, and then use that computed // hash for each key value to avoid a potentially expensive @@ -459,7 +463,7 @@ fn hash_dictionary( &dict_hashes, dict_values.as_ref(), idx, - multi_col, + rehash, ); } // no update for Null key } @@ -471,14 +475,12 @@ fn hash_struct_array( array: &StructArray, random_state: &RandomState, hashes_buffer: &mut [u64], + rehash: bool, ) -> Result<()> { - // This nested-type hasher currently always combines its computed struct-row hash - // into `hashes_buffer` (equivalent to `rehash=true`). This preserves existing - // behavior for single-column hashing of nested types. - // - // If we were to add a `rehash` flag here and make `rehash=false` overwrite the - // buffer, it would change the numeric hash values produced for standalone - // Struct columns. + // Hashing for nested types follows the same initialize-vs-combine convention as + // primitive types: + // - `rehash=false`: initialize `hashes_buffer` for this column + // - `rehash=true`: combine into existing per-row hashes let nulls = array.nulls(); let row_len = array.len(); @@ -494,8 +496,16 @@ fn hash_struct_array( create_hashes(array.columns(), random_state, &mut values_hashes)?; for i in valid_row_indices { - let hash = &mut hashes_buffer[i]; - *hash = combine_hashes(*hash, values_hashes[i]); + apply_row_hash(&mut hashes_buffer[i], values_hashes[i], rehash); + } + + // Hash null struct values consistently with other array types + if let Some(nulls) = array.nulls() { + for (i, hash) in hashes_buffer.iter_mut().enumerate().take(row_len) { + if !nulls.is_valid(i) { + hash_null(random_state, std::slice::from_mut(hash), rehash); + } + } } Ok(()) @@ -507,9 +517,9 @@ fn hash_map_array( array: &MapArray, random_state: &RandomState, hashes_buffer: &mut [u64], + rehash: bool, ) -> Result<()> { - // This nested-type hasher currently always combines entry hashes into - // `hashes_buffer` (equivalent to `rehash=true`) to preserve existing behavior. + // Nested types follow the same initialize-vs-combine convention as other hashers. let nulls = array.nulls(); let offsets = array.offsets(); @@ -522,17 +532,39 @@ fn hash_map_array( if let Some(nulls) = nulls { for (i, (start, stop)) in offsets.iter().zip(offsets.iter().skip(1)).enumerate() { if nulls.is_valid(i) { - let hash = &mut hashes_buffer[i]; - for values_hash in &values_hashes[start.as_usize()..stop.as_usize()] { - *hash = combine_hashes(*hash, *values_hash); + if rehash { + let hash = &mut hashes_buffer[i]; + for values_hash in &values_hashes[start.as_usize()..stop.as_usize()] { + *hash = combine_hashes(*values_hash, *hash); + } + } else { + let mut row_hash = 0u64; + for values_hash in &values_hashes[start.as_usize()..stop.as_usize()] { + row_hash = combine_hashes(*values_hash, row_hash); + } + apply_row_hash(&mut hashes_buffer[i], row_hash, rehash); } + } else { + hash_null( + random_state, + std::slice::from_mut(&mut hashes_buffer[i]), + rehash, + ); } } } else { for (i, (start, stop)) in offsets.iter().zip(offsets.iter().skip(1)).enumerate() { - let hash = &mut hashes_buffer[i]; - for values_hash in &values_hashes[start.as_usize()..stop.as_usize()] { - *hash = combine_hashes(*hash, *values_hash); + if rehash { + let hash = &mut hashes_buffer[i]; + for values_hash in &values_hashes[start.as_usize()..stop.as_usize()] { + *hash = combine_hashes(*values_hash, *hash); + } + } else { + let mut row_hash = 0u64; + for values_hash in &values_hashes[start.as_usize()..stop.as_usize()] { + row_hash = combine_hashes(*values_hash, row_hash); + } + apply_row_hash(&mut hashes_buffer[i], row_hash, rehash); } } } @@ -545,12 +577,12 @@ fn hash_list_array( array: &GenericListArray, random_state: &RandomState, hashes_buffer: &mut [u64], + rehash: bool, ) -> Result<()> where OffsetSize: OffsetSizeTrait, { - // This nested-type hasher currently always combines element hashes into - // `hashes_buffer` (equivalent to `rehash=true`) to preserve existing behavior. + // Nested types follow the same initialize-vs-combine convention as other hashers. // In case values is sliced, hash only the bytes used by the offsets of this ListArray let first_offset = array.value_offsets().first().cloned().unwrap_or_default(); @@ -569,12 +601,28 @@ where for (i, (start, stop)) in array.value_offsets().iter().tuple_windows().enumerate() { if array.is_valid(i) { - let hash = &mut hashes_buffer[i]; - for values_hash in &values_hashes[(*start - first_offset).as_usize() - ..(*stop - first_offset).as_usize()] - { - *hash = combine_hashes(*hash, *values_hash); + if rehash { + let hash = &mut hashes_buffer[i]; + for values_hash in &values_hashes[(*start - first_offset).as_usize() + ..(*stop - first_offset).as_usize()] + { + *hash = combine_hashes(*values_hash, *hash); + } + } else { + let mut row_hash = 0u64; + for values_hash in &values_hashes[(*start - first_offset).as_usize() + ..(*stop - first_offset).as_usize()] + { + row_hash = combine_hashes(*values_hash, row_hash); + } + apply_row_hash(&mut hashes_buffer[i], row_hash, rehash); } + } else { + hash_null( + random_state, + std::slice::from_mut(&mut hashes_buffer[i]), + rehash, + ); } } } else { @@ -584,10 +632,20 @@ where .tuple_windows() .zip(hashes_buffer.iter_mut()) { - for values_hash in &values_hashes - [(*start - first_offset).as_usize()..(*stop - first_offset).as_usize()] - { - *hash = combine_hashes(*hash, *values_hash); + if rehash { + for values_hash in &values_hashes[(*start - first_offset).as_usize() + ..(*stop - first_offset).as_usize()] + { + *hash = combine_hashes(*values_hash, *hash); + } + } else { + let mut row_hash = 0u64; + for values_hash in &values_hashes[(*start - first_offset).as_usize() + ..(*stop - first_offset).as_usize()] + { + row_hash = combine_hashes(*values_hash, row_hash); + } + apply_row_hash(hash, row_hash, rehash); } } } @@ -599,12 +657,12 @@ fn hash_list_view_array( array: &GenericListViewArray, random_state: &RandomState, hashes_buffer: &mut [u64], + rehash: bool, ) -> Result<()> where OffsetSize: OffsetSizeTrait, { - // This nested-type hasher currently always combines element hashes into - // `hashes_buffer` (equivalent to `rehash=true`) to preserve existing behavior. + // Nested types follow the same initialize-vs-combine convention as other hashers. let values = array.values(); let offsets = array.value_offsets(); @@ -615,21 +673,43 @@ where if let Some(nulls) = nulls { for (i, (offset, size)) in offsets.iter().zip(sizes.iter()).enumerate() { if nulls.is_valid(i) { - let hash = &mut hashes_buffer[i]; let start = offset.as_usize(); let end = start + size.as_usize(); - for values_hash in &values_hashes[start..end] { - *hash = combine_hashes(*hash, *values_hash); + if rehash { + let hash = &mut hashes_buffer[i]; + for values_hash in &values_hashes[start..end] { + *hash = combine_hashes(*values_hash, *hash); + } + } else { + let mut row_hash = 0u64; + for values_hash in &values_hashes[start..end] { + row_hash = combine_hashes(*values_hash, row_hash); + } + apply_row_hash(&mut hashes_buffer[i], row_hash, rehash); } + } else { + hash_null( + random_state, + std::slice::from_mut(&mut hashes_buffer[i]), + rehash, + ); } } } else { for (i, (offset, size)) in offsets.iter().zip(sizes.iter()).enumerate() { - let hash = &mut hashes_buffer[i]; let start = offset.as_usize(); let end = start + size.as_usize(); - for values_hash in &values_hashes[start..end] { - *hash = combine_hashes(*hash, *values_hash); + if rehash { + let hash = &mut hashes_buffer[i]; + for values_hash in &values_hashes[start..end] { + *hash = combine_hashes(*values_hash, *hash); + } + } else { + let mut row_hash = 0u64; + for values_hash in &values_hashes[start..end] { + row_hash = combine_hashes(*values_hash, row_hash); + } + apply_row_hash(&mut hashes_buffer[i], row_hash, rehash); } } } @@ -641,9 +721,9 @@ fn hash_union_array( array: &UnionArray, random_state: &RandomState, hashes_buffer: &mut [u64], + rehash: bool, ) -> Result<()> { - // This nested-type hasher currently always combines the chosen child hash into - // `hashes_buffer` (equivalent to `rehash=true`) to preserve existing behavior. + // Nested types follow the same initialize-vs-combine convention as other hashers. use std::collections::HashMap; @@ -667,7 +747,7 @@ fn hash_union_array( let child_offset = array.value_offset(i); let child_hash = child_hashes.get(&type_id).expect("invalid type_id"); - hashes_buffer[i] = combine_hashes(hashes_buffer[i], child_hash[child_offset]); + apply_row_hash(&mut hashes_buffer[i], child_hash[child_offset], rehash); } Ok(()) @@ -678,9 +758,9 @@ fn hash_fixed_list_array( array: &FixedSizeListArray, random_state: &RandomState, hashes_buffer: &mut [u64], + rehash: bool, ) -> Result<()> { - // This nested-type hasher currently always combines element hashes into - // `hashes_buffer` (equivalent to `rehash=true`) to preserve existing behavior. + // Nested types follow the same initialize-vs-combine convention as other hashers. let values = array.values(); let value_length = array.value_length() as usize; @@ -690,19 +770,47 @@ fn hash_fixed_list_array( if let Some(nulls) = nulls { for i in 0..array.len() { if nulls.is_valid(i) { - let hash = &mut hashes_buffer[i]; - for values_hash in - &values_hashes[i * value_length..(i + 1) * value_length] - { - *hash = combine_hashes(*hash, *values_hash); + if rehash { + let hash = &mut hashes_buffer[i]; + for values_hash in + &values_hashes[i * value_length..(i + 1) * value_length] + { + *hash = combine_hashes(*values_hash, *hash); + } + } else { + let mut row_hash = 0u64; + for values_hash in + &values_hashes[i * value_length..(i + 1) * value_length] + { + row_hash = combine_hashes(*values_hash, row_hash); + } + apply_row_hash(&mut hashes_buffer[i], row_hash, rehash); } + } else { + hash_null( + random_state, + std::slice::from_mut(&mut hashes_buffer[i]), + rehash, + ); } } } else { for i in 0..array.len() { - let hash = &mut hashes_buffer[i]; - for values_hash in &values_hashes[i * value_length..(i + 1) * value_length] { - *hash = combine_hashes(*hash, *values_hash); + if rehash { + let hash = &mut hashes_buffer[i]; + for values_hash in + &values_hashes[i * value_length..(i + 1) * value_length] + { + *hash = combine_hashes(*values_hash, *hash); + } + } else { + let mut row_hash = 0u64; + for values_hash in + &values_hashes[i * value_length..(i + 1) * value_length] + { + row_hash = combine_hashes(*values_hash, row_hash); + } + apply_row_hash(&mut hashes_buffer[i], row_hash, rehash); } } } @@ -806,44 +914,37 @@ fn hash_single_array( array => hash_dictionary(array, random_state, hashes_buffer, rehash)?, _ => unreachable!() } - // NOTE: The nested-type hashers below currently always *combine* their computed - // nested-value hash into `hashes_buffer` (i.e. they effectively behave as if - // `rehash=true`). This preserves existing hash values for these types. - // - // In other words, unlike primitive-like arrays, nested types do not currently - // "initialize" the buffer when they are the first/only column. - // Changing that would be a behavioral change for single-column hashing. DataType::Struct(_) => { let array = as_struct_array(array)?; - hash_struct_array(array, random_state, hashes_buffer)?; + hash_struct_array(array, random_state, hashes_buffer, rehash)?; } DataType::List(_) => { let array = as_list_array(array)?; - hash_list_array(array, random_state, hashes_buffer)?; + hash_list_array(array, random_state, hashes_buffer, rehash)?; } DataType::LargeList(_) => { let array = as_large_list_array(array)?; - hash_list_array(array, random_state, hashes_buffer)?; + hash_list_array(array, random_state, hashes_buffer, rehash)?; } DataType::ListView(_) => { let array = as_list_view_array(array)?; - hash_list_view_array(array, random_state, hashes_buffer)?; + hash_list_view_array(array, random_state, hashes_buffer, rehash)?; } DataType::LargeListView(_) => { let array = as_large_list_view_array(array)?; - hash_list_view_array(array, random_state, hashes_buffer)?; + hash_list_view_array(array, random_state, hashes_buffer, rehash)?; } DataType::Map(_, _) => { let array = as_map_array(array)?; - hash_map_array(array, random_state, hashes_buffer)?; + hash_map_array(array, random_state, hashes_buffer, rehash)?; } DataType::FixedSizeList(_,_) => { let array = as_fixed_size_list_array(array)?; - hash_fixed_list_array(array, random_state, hashes_buffer)?; + hash_fixed_list_array(array, random_state, hashes_buffer, rehash)?; } DataType::Union(_, _) => { let array = as_union_array(array)?; - hash_union_array(array, random_state, hashes_buffer)?; + hash_union_array(array, random_state, hashes_buffer, rehash)?; } DataType::RunEndEncoded(_, _) => downcast_run_array! { array => hash_run_array(array, random_state, hashes_buffer, rehash)?, @@ -1247,7 +1348,7 @@ mod tests { assert_eq!(hashes[0], hashes[5]); assert_eq!(hashes[1], hashes[4]); assert_eq!(hashes[2], hashes[3]); - assert_eq!(hashes[1], hashes[6]); // null vs empty list + assert_ne!(hashes[1], hashes[6]); // null vs empty list } #[test] @@ -1360,7 +1461,7 @@ mod tests { assert_eq!(hashes[0], hashes[5]); // same content [0, 1, 2] assert_eq!(hashes[1], hashes[4]); // both null assert_eq!(hashes[2], hashes[3]); // same content [3, null, 5] - assert_eq!(hashes[1], hashes[6]); // null vs empty list + assert_ne!(hashes[1], hashes[6]); // null vs empty list // Negative tests: different content should produce different hashes assert_ne!(hashes[0], hashes[2]); // [0, 1, 2] vs [3, null, 5] @@ -1410,7 +1511,7 @@ mod tests { assert_eq!(hashes[0], hashes[5]); // same content [0, 1, 2] assert_eq!(hashes[1], hashes[4]); // both null assert_eq!(hashes[2], hashes[3]); // same content [3, null, 5] - assert_eq!(hashes[1], hashes[6]); // null vs empty list + assert_ne!(hashes[1], hashes[6]); // null vs empty list // Negative tests: different content should produce different hashes assert_ne!(hashes[0], hashes[2]); // [0, 1, 2] vs [3, null, 5] @@ -1582,7 +1683,7 @@ mod tests { assert_ne!(hashes[0], hashes[3]); // different key assert_ne!(hashes[0], hashes[4]); // missing an entry assert_ne!(hashes[4], hashes[5]); // filled vs null value - assert_eq!(hashes[6], hashes[7]); // empty vs null map + assert_ne!(hashes[6], hashes[7]); // empty vs null map } #[test] From e6153c27732db0ed7661e1d5cba1cc42c79b48e3 Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Thu, 5 Feb 2026 20:43:52 +0530 Subject: [PATCH 3/4] remove apply_row_hash refactoring --- datafusion/common/src/hash_utils.rs | 62 +++++++++++++++-------------- 1 file changed, 33 insertions(+), 29 deletions(-) diff --git a/datafusion/common/src/hash_utils.rs b/datafusion/common/src/hash_utils.rs index 24d914eb98626..4046472253a01 100644 --- a/datafusion/common/src/hash_utils.rs +++ b/datafusion/common/src/hash_utils.rs @@ -57,19 +57,6 @@ pub fn combine_hashes(l: u64, r: u64) -> u64 { hash.wrapping_mul(37).wrapping_add(r) } -/// Applies `row_hash` into `dst` according to `rehash` semantics. -/// -/// - `rehash = false`: initialize/overwrite `dst` -/// - `rehash = true`: combine `row_hash` into `dst` -#[inline] -fn apply_row_hash(dst: &mut u64, row_hash: u64, rehash: bool) { - if rehash { - *dst = combine_hashes(row_hash, *dst); - } else { - *dst = row_hash; - } -} - /// Maximum size for the thread-local hash buffer before truncation (4MB = 524,288 u64 elements). /// The goal of this is to avoid unbounded memory growth that would appear as a memory leak. /// We allow temporary allocations beyond this size, but after use the buffer is truncated @@ -495,8 +482,14 @@ fn hash_struct_array( let mut values_hashes = vec![0u64; row_len]; create_hashes(array.columns(), random_state, &mut values_hashes)?; - for i in valid_row_indices { - apply_row_hash(&mut hashes_buffer[i], values_hashes[i], rehash); + if rehash { + for i in valid_row_indices { + hashes_buffer[i] = combine_hashes(values_hashes[i], hashes_buffer[i]); + } + } else { + for i in valid_row_indices { + hashes_buffer[i] = values_hashes[i]; + } } // Hash null struct values consistently with other array types @@ -542,7 +535,7 @@ fn hash_map_array( for values_hash in &values_hashes[start.as_usize()..stop.as_usize()] { row_hash = combine_hashes(*values_hash, row_hash); } - apply_row_hash(&mut hashes_buffer[i], row_hash, rehash); + hashes_buffer[i] = row_hash; } } else { hash_null( @@ -564,7 +557,7 @@ fn hash_map_array( for values_hash in &values_hashes[start.as_usize()..stop.as_usize()] { row_hash = combine_hashes(*values_hash, row_hash); } - apply_row_hash(&mut hashes_buffer[i], row_hash, rehash); + hashes_buffer[i] = row_hash; } } } @@ -615,7 +608,7 @@ where { row_hash = combine_hashes(*values_hash, row_hash); } - apply_row_hash(&mut hashes_buffer[i], row_hash, rehash); + hashes_buffer[i] = row_hash; } } else { hash_null( @@ -645,7 +638,7 @@ where { row_hash = combine_hashes(*values_hash, row_hash); } - apply_row_hash(hash, row_hash, rehash); + *hash = row_hash; } } } @@ -685,7 +678,7 @@ where for values_hash in &values_hashes[start..end] { row_hash = combine_hashes(*values_hash, row_hash); } - apply_row_hash(&mut hashes_buffer[i], row_hash, rehash); + hashes_buffer[i] = row_hash; } } else { hash_null( @@ -709,7 +702,7 @@ where for values_hash in &values_hashes[start..end] { row_hash = combine_hashes(*values_hash, row_hash); } - apply_row_hash(&mut hashes_buffer[i], row_hash, rehash); + hashes_buffer[i] = row_hash; } } } @@ -741,13 +734,24 @@ fn hash_union_array( child_hashes.insert(type_id, child_hash_buffer); } - #[expect(clippy::needless_range_loop)] - for i in 0..array.len() { - let type_id = array.type_id(i); - let child_offset = array.value_offset(i); + if rehash { + #[expect(clippy::needless_range_loop)] + for i in 0..array.len() { + let type_id = array.type_id(i); + let child_offset = array.value_offset(i); - let child_hash = child_hashes.get(&type_id).expect("invalid type_id"); - apply_row_hash(&mut hashes_buffer[i], child_hash[child_offset], rehash); + let child_hash = child_hashes.get(&type_id).expect("invalid type_id"); + hashes_buffer[i] = combine_hashes(child_hash[child_offset], hashes_buffer[i]); + } + } else { + #[expect(clippy::needless_range_loop)] + for i in 0..array.len() { + let type_id = array.type_id(i); + let child_offset = array.value_offset(i); + + let child_hash = child_hashes.get(&type_id).expect("invalid type_id"); + hashes_buffer[i] = child_hash[child_offset]; + } } Ok(()) @@ -784,7 +788,7 @@ fn hash_fixed_list_array( { row_hash = combine_hashes(*values_hash, row_hash); } - apply_row_hash(&mut hashes_buffer[i], row_hash, rehash); + hashes_buffer[i] = row_hash; } } else { hash_null( @@ -810,7 +814,7 @@ fn hash_fixed_list_array( { row_hash = combine_hashes(*values_hash, row_hash); } - apply_row_hash(&mut hashes_buffer[i], row_hash, rehash); + hashes_buffer[i] = row_hash; } } } From 98a1fd9a1c773b2bc963d302c69bb9392259d0f5 Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Fri, 6 Feb 2026 13:56:28 +0530 Subject: [PATCH 4/4] remove redundant comments and handle null cases --- datafusion/common/src/hash_utils.rs | 263 ++++++++++++++-------------- 1 file changed, 128 insertions(+), 135 deletions(-) diff --git a/datafusion/common/src/hash_utils.rs b/datafusion/common/src/hash_utils.rs index 4046472253a01..bb6587c0b976f 100644 --- a/datafusion/common/src/hash_utils.rs +++ b/datafusion/common/src/hash_utils.rs @@ -464,11 +464,6 @@ fn hash_struct_array( hashes_buffer: &mut [u64], rehash: bool, ) -> Result<()> { - // Hashing for nested types follows the same initialize-vs-combine convention as - // primitive types: - // - `rehash=false`: initialize `hashes_buffer` for this column - // - `rehash=true`: combine into existing per-row hashes - let nulls = array.nulls(); let row_len = array.len(); @@ -492,15 +487,6 @@ fn hash_struct_array( } } - // Hash null struct values consistently with other array types - if let Some(nulls) = array.nulls() { - for (i, hash) in hashes_buffer.iter_mut().enumerate().take(row_len) { - if !nulls.is_valid(i) { - hash_null(random_state, std::slice::from_mut(hash), rehash); - } - } - } - Ok(()) } @@ -512,8 +498,6 @@ fn hash_map_array( hashes_buffer: &mut [u64], rehash: bool, ) -> Result<()> { - // Nested types follow the same initialize-vs-combine convention as other hashers. - let nulls = array.nulls(); let offsets = array.offsets(); @@ -521,38 +505,35 @@ fn hash_map_array( let mut values_hashes = vec![0u64; array.entries().len()]; create_hashes(array.entries().columns(), random_state, &mut values_hashes)?; - // Combine the hashes for entries on each row with each other and previous hash for that row - if let Some(nulls) = nulls { - for (i, (start, stop)) in offsets.iter().zip(offsets.iter().skip(1)).enumerate() { - if nulls.is_valid(i) { - if rehash { - let hash = &mut hashes_buffer[i]; - for values_hash in &values_hashes[start.as_usize()..stop.as_usize()] { - *hash = combine_hashes(*values_hash, *hash); - } - } else { + // Combine the hashes for entries on each row with each other. + // When `rehash=true`, combine the per-row map hash into the existing accumulator. + if rehash { + if let Some(nulls) = nulls { + for (i, (start, stop)) in + offsets.iter().zip(offsets.iter().skip(1)).enumerate() + { + if nulls.is_valid(i) { let mut row_hash = 0u64; for values_hash in &values_hashes[start.as_usize()..stop.as_usize()] { row_hash = combine_hashes(*values_hash, row_hash); } - hashes_buffer[i] = row_hash; + hashes_buffer[i] = combine_hashes(row_hash, hashes_buffer[i]); } - } else { - hash_null( - random_state, - std::slice::from_mut(&mut hashes_buffer[i]), - rehash, - ); } - } - } else { - for (i, (start, stop)) in offsets.iter().zip(offsets.iter().skip(1)).enumerate() { - if rehash { - let hash = &mut hashes_buffer[i]; + } else { + for (i, (start, stop)) in + offsets.iter().zip(offsets.iter().skip(1)).enumerate() + { + let mut row_hash = 0u64; for values_hash in &values_hashes[start.as_usize()..stop.as_usize()] { - *hash = combine_hashes(*values_hash, *hash); + row_hash = combine_hashes(*values_hash, row_hash); } - } else { + hashes_buffer[i] = combine_hashes(row_hash, hashes_buffer[i]); + } + } + } else if let Some(nulls) = nulls { + for (i, (start, stop)) in offsets.iter().zip(offsets.iter().skip(1)).enumerate() { + if nulls.is_valid(i) { let mut row_hash = 0u64; for values_hash in &values_hashes[start.as_usize()..stop.as_usize()] { row_hash = combine_hashes(*values_hash, row_hash); @@ -560,6 +541,14 @@ fn hash_map_array( hashes_buffer[i] = row_hash; } } + } else { + for (i, (start, stop)) in offsets.iter().zip(offsets.iter().skip(1)).enumerate() { + let mut row_hash = 0u64; + for values_hash in &values_hashes[start.as_usize()..stop.as_usize()] { + row_hash = combine_hashes(*values_hash, row_hash); + } + hashes_buffer[i] = row_hash; + } } Ok(()) @@ -575,8 +564,6 @@ fn hash_list_array( where OffsetSize: OffsetSizeTrait, { - // Nested types follow the same initialize-vs-combine convention as other hashers. - // In case values is sliced, hash only the bytes used by the offsets of this ListArray let first_offset = array.value_offsets().first().cloned().unwrap_or_default(); let last_offset = array.value_offsets().last().cloned().unwrap_or_default(); @@ -590,56 +577,66 @@ where &mut values_hashes, )?; - if array.null_count() > 0 { - for (i, (start, stop)) in array.value_offsets().iter().tuple_windows().enumerate() - { - if array.is_valid(i) { - if rehash { - let hash = &mut hashes_buffer[i]; - for values_hash in &values_hashes[(*start - first_offset).as_usize() - ..(*stop - first_offset).as_usize()] - { - *hash = combine_hashes(*values_hash, *hash); - } - } else { + // Compute per-row list hash (fold element hashes), then either initialize or combine + // once per row depending on `rehash`. + if rehash { + if array.null_count() > 0 { + for (i, (start, stop)) in + array.value_offsets().iter().tuple_windows().enumerate() + { + if array.is_valid(i) { let mut row_hash = 0u64; for values_hash in &values_hashes[(*start - first_offset).as_usize() ..(*stop - first_offset).as_usize()] { row_hash = combine_hashes(*values_hash, row_hash); } - hashes_buffer[i] = row_hash; + hashes_buffer[i] = combine_hashes(row_hash, hashes_buffer[i]); } - } else { - hash_null( - random_state, - std::slice::from_mut(&mut hashes_buffer[i]), - rehash, - ); } - } - } else { - for ((start, stop), hash) in array - .value_offsets() - .iter() - .tuple_windows() - .zip(hashes_buffer.iter_mut()) - { - if rehash { + } else { + for ((start, stop), hash) in array + .value_offsets() + .iter() + .tuple_windows() + .zip(hashes_buffer.iter_mut()) + { + let mut row_hash = 0u64; for values_hash in &values_hashes[(*start - first_offset).as_usize() ..(*stop - first_offset).as_usize()] { - *hash = combine_hashes(*values_hash, *hash); + row_hash = combine_hashes(*values_hash, row_hash); } - } else { + *hash = combine_hashes(row_hash, *hash); + } + } + } else if array.null_count() > 0 { + for (i, (start, stop)) in array.value_offsets().iter().tuple_windows().enumerate() + { + if array.is_valid(i) { let mut row_hash = 0u64; for values_hash in &values_hashes[(*start - first_offset).as_usize() ..(*stop - first_offset).as_usize()] { row_hash = combine_hashes(*values_hash, row_hash); } - *hash = row_hash; + hashes_buffer[i] = row_hash; + } + } + } else { + for ((start, stop), hash) in array + .value_offsets() + .iter() + .tuple_windows() + .zip(hashes_buffer.iter_mut()) + { + let mut row_hash = 0u64; + for values_hash in &values_hashes + [(*start - first_offset).as_usize()..(*stop - first_offset).as_usize()] + { + row_hash = combine_hashes(*values_hash, row_hash); } + *hash = row_hash; } } Ok(()) @@ -655,49 +652,41 @@ fn hash_list_view_array( where OffsetSize: OffsetSizeTrait, { - // Nested types follow the same initialize-vs-combine convention as other hashers. - let values = array.values(); let offsets = array.value_offsets(); let sizes = array.value_sizes(); let nulls = array.nulls(); let mut values_hashes = vec![0u64; values.len()]; create_hashes([values], random_state, &mut values_hashes)?; - if let Some(nulls) = nulls { - for (i, (offset, size)) in offsets.iter().zip(sizes.iter()).enumerate() { - if nulls.is_valid(i) { - let start = offset.as_usize(); - let end = start + size.as_usize(); - if rehash { - let hash = &mut hashes_buffer[i]; - for values_hash in &values_hashes[start..end] { - *hash = combine_hashes(*values_hash, *hash); - } - } else { + if rehash { + if let Some(nulls) = nulls { + for (i, (offset, size)) in offsets.iter().zip(sizes.iter()).enumerate() { + if nulls.is_valid(i) { + let start = offset.as_usize(); + let end = start + size.as_usize(); let mut row_hash = 0u64; for values_hash in &values_hashes[start..end] { row_hash = combine_hashes(*values_hash, row_hash); } - hashes_buffer[i] = row_hash; + hashes_buffer[i] = combine_hashes(row_hash, hashes_buffer[i]); } - } else { - hash_null( - random_state, - std::slice::from_mut(&mut hashes_buffer[i]), - rehash, - ); } - } - } else { - for (i, (offset, size)) in offsets.iter().zip(sizes.iter()).enumerate() { - let start = offset.as_usize(); - let end = start + size.as_usize(); - if rehash { - let hash = &mut hashes_buffer[i]; + } else { + for (i, (offset, size)) in offsets.iter().zip(sizes.iter()).enumerate() { + let start = offset.as_usize(); + let end = start + size.as_usize(); + let mut row_hash = 0u64; for values_hash in &values_hashes[start..end] { - *hash = combine_hashes(*values_hash, *hash); + row_hash = combine_hashes(*values_hash, row_hash); } - } else { + hashes_buffer[i] = combine_hashes(row_hash, hashes_buffer[i]); + } + } + } else if let Some(nulls) = nulls { + for (i, (offset, size)) in offsets.iter().zip(sizes.iter()).enumerate() { + if nulls.is_valid(i) { + let start = offset.as_usize(); + let end = start + size.as_usize(); let mut row_hash = 0u64; for values_hash in &values_hashes[start..end] { row_hash = combine_hashes(*values_hash, row_hash); @@ -705,6 +694,16 @@ where hashes_buffer[i] = row_hash; } } + } else { + for (i, (offset, size)) in offsets.iter().zip(sizes.iter()).enumerate() { + let start = offset.as_usize(); + let end = start + size.as_usize(); + let mut row_hash = 0u64; + for values_hash in &values_hashes[start..end] { + row_hash = combine_hashes(*values_hash, row_hash); + } + hashes_buffer[i] = row_hash; + } } Ok(()) } @@ -716,8 +715,6 @@ fn hash_union_array( hashes_buffer: &mut [u64], rehash: bool, ) -> Result<()> { - // Nested types follow the same initialize-vs-combine convention as other hashers. - use std::collections::HashMap; let DataType::Union(union_fields, _mode) = array.data_type() else { @@ -764,50 +761,38 @@ fn hash_fixed_list_array( hashes_buffer: &mut [u64], rehash: bool, ) -> Result<()> { - // Nested types follow the same initialize-vs-combine convention as other hashers. - let values = array.values(); let value_length = array.value_length() as usize; let nulls = array.nulls(); let mut values_hashes = vec![0u64; values.len()]; create_hashes([values], random_state, &mut values_hashes)?; - if let Some(nulls) = nulls { - for i in 0..array.len() { - if nulls.is_valid(i) { - if rehash { - let hash = &mut hashes_buffer[i]; - for values_hash in - &values_hashes[i * value_length..(i + 1) * value_length] - { - *hash = combine_hashes(*values_hash, *hash); - } - } else { + if rehash { + if let Some(nulls) = nulls { + for i in 0..array.len() { + if nulls.is_valid(i) { let mut row_hash = 0u64; for values_hash in &values_hashes[i * value_length..(i + 1) * value_length] { row_hash = combine_hashes(*values_hash, row_hash); } - hashes_buffer[i] = row_hash; + hashes_buffer[i] = combine_hashes(row_hash, hashes_buffer[i]); } - } else { - hash_null( - random_state, - std::slice::from_mut(&mut hashes_buffer[i]), - rehash, - ); } - } - } else { - for i in 0..array.len() { - if rehash { - let hash = &mut hashes_buffer[i]; + } else { + for i in 0..array.len() { + let mut row_hash = 0u64; for values_hash in &values_hashes[i * value_length..(i + 1) * value_length] { - *hash = combine_hashes(*values_hash, *hash); + row_hash = combine_hashes(*values_hash, row_hash); } - } else { + hashes_buffer[i] = combine_hashes(row_hash, hashes_buffer[i]); + } + } + } else if let Some(nulls) = nulls { + for i in 0..array.len() { + if nulls.is_valid(i) { let mut row_hash = 0u64; for values_hash in &values_hashes[i * value_length..(i + 1) * value_length] @@ -817,6 +802,14 @@ fn hash_fixed_list_array( hashes_buffer[i] = row_hash; } } + } else { + for i in 0..array.len() { + let mut row_hash = 0u64; + for values_hash in &values_hashes[i * value_length..(i + 1) * value_length] { + row_hash = combine_hashes(*values_hash, row_hash); + } + hashes_buffer[i] = row_hash; + } } Ok(()) } @@ -1352,7 +1345,7 @@ mod tests { assert_eq!(hashes[0], hashes[5]); assert_eq!(hashes[1], hashes[4]); assert_eq!(hashes[2], hashes[3]); - assert_ne!(hashes[1], hashes[6]); // null vs empty list + assert_eq!(hashes[1], hashes[6]); // null vs empty list } #[test] @@ -1465,7 +1458,7 @@ mod tests { assert_eq!(hashes[0], hashes[5]); // same content [0, 1, 2] assert_eq!(hashes[1], hashes[4]); // both null assert_eq!(hashes[2], hashes[3]); // same content [3, null, 5] - assert_ne!(hashes[1], hashes[6]); // null vs empty list + assert_eq!(hashes[1], hashes[6]); // null vs empty list // Negative tests: different content should produce different hashes assert_ne!(hashes[0], hashes[2]); // [0, 1, 2] vs [3, null, 5] @@ -1515,7 +1508,7 @@ mod tests { assert_eq!(hashes[0], hashes[5]); // same content [0, 1, 2] assert_eq!(hashes[1], hashes[4]); // both null assert_eq!(hashes[2], hashes[3]); // same content [3, null, 5] - assert_ne!(hashes[1], hashes[6]); // null vs empty list + assert_eq!(hashes[1], hashes[6]); // null vs empty list // Negative tests: different content should produce different hashes assert_ne!(hashes[0], hashes[2]); // [0, 1, 2] vs [3, null, 5] @@ -1687,7 +1680,7 @@ mod tests { assert_ne!(hashes[0], hashes[3]); // different key assert_ne!(hashes[0], hashes[4]); // missing an entry assert_ne!(hashes[4], hashes[5]); // filled vs null value - assert_ne!(hashes[6], hashes[7]); // empty vs null map + assert_eq!(hashes[6], hashes[7]); // empty vs null map } #[test]