Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 106 additions & 44 deletions datafusion/common/src/hash_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,33 +392,22 @@ fn hash_generic_byte_view_array<T: ByteViewType>(
}
}

/// Helper function to update hash for a dictionary key if the value is valid
#[cfg(not(feature = "force_hash_collisions"))]
#[inline]
fn update_hash_for_dict_key(
hash: &mut u64,
dict_hashes: &[u64],
dict_values: &dyn Array,
idx: usize,
multi_col: bool,
) {
if dict_values.is_valid(idx) {
if multi_col {
*hash = combine_hashes(dict_hashes[idx], *hash);
} else {
*hash = dict_hashes[idx];
}
}
// no update for invalid dictionary value
}

/// Hash the values in a dictionary array
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_dictionary<K: ArrowDictionaryKeyType>(
/// Hash dictionary array with compile-time specialization for null handling.
///
/// Uses const generics to eliminate runtim branching in the hot loop:
/// - `HAS_NULL_KEYS`: Whether to check for null dictionary keys
/// - `HAS_NULL_VALUES`: Whether to check for null dictionary values
/// - `MULTI_COL`: Whether to combine with existing hash (true) or initialize (false)
#[inline(never)]
fn hash_dictionary_inner<
K: ArrowDictionaryKeyType,
const HAS_NULL_KEYS: bool,
const HAS_NULL_VALUES: bool,
const MULTI_COL: bool,
>(
array: &DictionaryArray<K>,
random_state: &RandomState,
hashes_buffer: &mut [u64],
multi_col: bool,
) -> Result<()> {
// Hash each dictionary value once, and then use that computed
// hash for each key value to avoid a potentially expensive
Expand All @@ -427,22 +416,93 @@ fn hash_dictionary<K: ArrowDictionaryKeyType>(
let mut dict_hashes = vec![0; dict_values.len()];
create_hashes([dict_values], random_state, &mut dict_hashes)?;

// combine hash for each index in values
for (hash, key) in hashes_buffer.iter_mut().zip(array.keys().iter()) {
if let Some(key) = key {
if HAS_NULL_KEYS {
for (hash, key) in hashes_buffer.iter_mut().zip(array.keys().iter()) {
if let Some(key) = key {
let idx = key.as_usize();
if !HAS_NULL_VALUES || dict_values.is_valid(idx) {
if MULTI_COL {
*hash = combine_hashes(dict_hashes[idx], *hash);
} else {
*hash = dict_hashes[idx];
}
}
}
}
} else {
for (hash, key) in hashes_buffer.iter_mut().zip(array.keys().iter()) {
// SAFETY: Keys are guaranteed to be non-null because we checked nucll_count = 0
let key = key.unwrap();
let idx = key.as_usize();
update_hash_for_dict_key(
hash,
&dict_hashes,
dict_values.as_ref(),
idx,
multi_col,
);
} // no update for Null key
if !HAS_NULL_VALUES || dict_values.is_valid(idx) {
if MULTI_COL {
*hash = combine_hashes(dict_hashes[idx], *hash);
} else {
*hash = dict_hashes[idx];
}
}
}
}
Ok(())
}

/// Hash the values in a dictionary array
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_dictionary<K: ArrowDictionaryKeyType>(
array: &DictionaryArray<K>,
random_state: &RandomState,
hashes_buffer: &mut [u64],
multi_col: bool,
) -> Result<()> {
let has_null_keys = array.keys().null_count() != 0;
let has_null_values = array.values().null_count() != 0;

// Dispatcher based on null presence and multi-column mode
// Should reduce branching within hot loops
match (has_null_keys, has_null_values, multi_col) {
(false, false, false) => hash_dictionary_inner::<K, false, false, false>(
array,
random_state,
hashes_buffer,
),
(false, false, true) => hash_dictionary_inner::<K, false, false, true>(
array,
random_state,
hashes_buffer,
),
(false, true, false) => hash_dictionary_inner::<K, false, true, false>(
array,
random_state,
hashes_buffer,
),
(false, true, true) => hash_dictionary_inner::<K, false, true, true>(
array,
random_state,
hashes_buffer,
),
(true, false, false) => hash_dictionary_inner::<K, true, false, false>(
array,
random_state,
hashes_buffer,
),
(true, false, true) => hash_dictionary_inner::<K, true, false, true>(
array,
random_state,
hashes_buffer,
),
(true, true, false) => hash_dictionary_inner::<K, true, true, false>(
array,
random_state,
hashes_buffer,
),
(true, true, true) => hash_dictionary_inner::<K, true, true, true>(
array,
random_state,
hashes_buffer,
),
}
}

#[cfg(not(feature = "force_hash_collisions"))]
fn hash_struct_array(
array: &StructArray,
Expand All @@ -452,19 +512,21 @@ fn hash_struct_array(
let nulls = array.nulls();
let row_len = array.len();

let valid_row_indices: Vec<usize> = if let Some(nulls) = nulls {
nulls.valid_indices().collect()
} else {
(0..row_len).collect()
};

// Create hashes for each row that combines the hashes over all the column at that row.
let mut values_hashes = vec![0u64; row_len];
create_hashes(array.columns(), random_state, &mut values_hashes)?;

for i in valid_row_indices {
let hash = &mut hashes_buffer[i];
*hash = combine_hashes(*hash, values_hashes[i]);
// Separate paths to avoid allocating Vec when there are no nulls
if let Some(nulls) = nulls {
for i in nulls.valid_indices() {
let hash = &mut hashes_buffer[i];
*hash = combine_hashes(*hash, values_hashes[i]);
}
} else {
for i in 0..row_len {
let hash = &mut hashes_buffer[i];
*hash = combine_hashes(*hash, values_hashes[i]);
}
}

Ok(())
Expand Down