From 80f4df6558f02f604ef035729a6d17ea829f3998 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Fri, 3 Apr 2026 14:44:23 +0800 Subject: [PATCH 1/9] Add BuildHasher variants for hash_utils (cherry picked from commit 02b972d32ba5e8c462587026d1529d932bc0555d) --- datafusion/common/src/hash_utils.rs | 453 +++++++++++++++++++--------- 1 file changed, 309 insertions(+), 144 deletions(-) diff --git a/datafusion/common/src/hash_utils.rs b/datafusion/common/src/hash_utils.rs index fcc2e919b6cc2..e5d8b580f9766 100644 --- a/datafusion/common/src/hash_utils.rs +++ b/datafusion/common/src/hash_utils.rs @@ -106,6 +106,32 @@ where I: IntoIterator, T: AsDynArray, F: FnOnce(&[u64]) -> Result, +{ + with_hashes_impl(arrays, &DefaultHashStrategy { random_state }, callback) +} + +/// Creates hashes for the given arrays using a thread-local buffer and a custom hash builder, +/// then calls the provided callback with an immutable reference to the computed hashes. +pub fn with_hashes_with_hasher( + arrays: I, + hash_builder: &S, + callback: F, +) -> Result +where + I: IntoIterator, + T: AsDynArray, + F: FnOnce(&[u64]) -> Result, + S: BuildHasher, +{ + with_hashes_impl(arrays, &CustomHashStrategy { hash_builder }, callback) +} + +fn with_hashes_impl(arrays: I, hash_strategy: &H, callback: F) -> Result +where + I: IntoIterator, + T: AsDynArray, + F: FnOnce(&[u64]) -> Result, + H: HashStrategy + ?Sized, { // Peek at the first array to determine buffer size without fully collecting let mut iter = arrays.into_iter().peekable(); @@ -125,7 +151,7 @@ where buffer.resize(required_size, 0); // Create hashes in the buffer - this consumes the iterator - create_hashes(iter, random_state, &mut buffer[..required_size])?; + create_hashes_impl(iter, hash_strategy, &mut buffer[..required_size])?; // Execute the callback with an immutable slice let result = callback(&buffer[..required_size])?; @@ -141,27 +167,31 @@ where } #[cfg(not(feature = "force_hash_collisions"))] -fn hash_null(random_state: &RandomState, hashes_buffer: &'_ mut [u64], mul_col: bool) { +fn hash_null( + hash_strategy: &H, + hashes_buffer: &'_ mut [u64], + mul_col: bool, +) { if mul_col { hashes_buffer.iter_mut().for_each(|hash| { // stable hash for null value - *hash = combine_hashes(random_state.hash_one(1), *hash); + *hash = combine_hashes(hash_strategy.hash_null(), *hash); }) } else { hashes_buffer.iter_mut().for_each(|hash| { - *hash = random_state.hash_one(1); + *hash = hash_strategy.hash_null(); }) } } pub trait HashValue { - fn hash_one(&self, state: &RandomState) -> u64; + fn hash_one(&self, state: &S) -> u64; /// Write this value into an existing hasher (same data as `hash_one`). fn hash_write(&self, hasher: &mut impl Hasher); } impl HashValue for &T { - fn hash_one(&self, state: &RandomState) -> u64 { + fn hash_one(&self, state: &S) -> u64 { T::hash_one(self, state) } fn hash_write(&self, hasher: &mut impl Hasher) { @@ -172,7 +202,7 @@ impl HashValue for &T { macro_rules! hash_value { ($($t:ty),+) => { $(impl HashValue for $t { - fn hash_one(&self, state: &RandomState) -> u64 { + fn hash_one(&self, state: &S) -> u64 { state.hash_one(self) } fn hash_write(&self, hasher: &mut impl Hasher) { @@ -187,7 +217,7 @@ hash_value!(bool, str, [u8], IntervalDayTime, IntervalMonthDayNano); macro_rules! hash_float_value { ($(($t:ty, $i:ty)),+) => { $(impl HashValue for $t { - fn hash_one(&self, state: &RandomState) -> u64 { + fn hash_one(&self, state: &S) -> u64 { state.hash_one(<$i>::from_ne_bytes(self.to_ne_bytes())) } fn hash_write(&self, hasher: &mut impl Hasher) { @@ -210,17 +240,72 @@ fn seeded_state(seed: u64) -> foldhash::fast::SeedableRandomState { ) } +#[cfg_attr(feature = "force_hash_collisions", allow(dead_code))] +trait HashStrategy { + fn hash_null(&self) -> u64; + fn hash_one(&self, value: &T) -> u64; + fn rehash(&self, value: &T, existing_hash: u64) -> u64; +} + +#[cfg_attr(feature = "force_hash_collisions", allow(dead_code))] +struct DefaultHashStrategy<'a> { + random_state: &'a RandomState, +} + +impl HashStrategy for DefaultHashStrategy<'_> { + fn hash_null(&self) -> u64 { + self.random_state.hash_one(1) + } + + fn hash_one(&self, value: &T) -> u64 { + value.hash_one(self.random_state) + } + + fn rehash(&self, value: &T, existing_hash: u64) -> u64 { + #[cfg(not(feature = "force_hash_collisions"))] + { + let mut hasher = seeded_state(existing_hash).build_hasher(); + value.hash_write(&mut hasher); + hasher.finish() + } + #[cfg(feature = "force_hash_collisions")] + { + combine_hashes(value.hash_one(self.random_state), existing_hash) + } + } +} + +#[cfg_attr(feature = "force_hash_collisions", allow(dead_code))] +struct CustomHashStrategy<'a, S> { + hash_builder: &'a S, +} + +impl HashStrategy for CustomHashStrategy<'_, S> { + fn hash_null(&self) -> u64 { + self.hash_builder.hash_one(1) + } + + fn hash_one(&self, value: &T) -> u64 { + value.hash_one(self.hash_builder) + } + + fn rehash(&self, value: &T, existing_hash: u64) -> u64 { + combine_hashes(value.hash_one(self.hash_builder), existing_hash) + } +} + /// Builds hash values of PrimitiveArray and writes them into `hashes_buffer` /// If `rehash==true` this folds the existing hash into the hasher state /// and hashes only the new value (avoiding a separate combine step). #[cfg(not(feature = "force_hash_collisions"))] -fn hash_array_primitive( +fn hash_array_primitive( array: &PrimitiveArray, - random_state: &RandomState, + hash_strategy: &H, hashes_buffer: &mut [u64], rehash: bool, ) where T: ArrowPrimitiveType, + H: HashStrategy + ?Sized, { assert_eq!( hashes_buffer.len(), @@ -231,26 +316,22 @@ fn hash_array_primitive( if array.null_count() == 0 { if rehash { for (hash, &value) in hashes_buffer.iter_mut().zip(array.values().iter()) { - let mut hasher = seeded_state(*hash).build_hasher(); - value.hash_write(&mut hasher); - *hash = hasher.finish(); + *hash = hash_strategy.rehash(&value, *hash); } } else { for (hash, &value) in hashes_buffer.iter_mut().zip(array.values().iter()) { - *hash = value.hash_one(random_state); + *hash = hash_strategy.hash_one(&value); } } } else if rehash { for i in array.nulls().unwrap().valid_indices() { let value = unsafe { array.value_unchecked(i) }; - let mut hasher = seeded_state(hashes_buffer[i]).build_hasher(); - value.hash_write(&mut hasher); - hashes_buffer[i] = hasher.finish(); + hashes_buffer[i] = hash_strategy.rehash(&value, hashes_buffer[i]); } } else { for i in array.nulls().unwrap().valid_indices() { let value = unsafe { array.value_unchecked(i) }; - hashes_buffer[i] = value.hash_one(random_state); + hashes_buffer[i] = hash_strategy.hash_one(&value); } } } @@ -259,14 +340,11 @@ fn hash_array_primitive( /// If `rehash==true` this combines the previous hash value in the buffer /// with the new hash using `combine_hashes` #[cfg(not(feature = "force_hash_collisions"))] -fn hash_array( - array: &T, - random_state: &RandomState, - hashes_buffer: &mut [u64], - rehash: bool, -) where +fn hash_array(array: &T, hash_strategy: &H, hashes_buffer: &mut [u64], rehash: bool) +where T: ArrayAccessor, T::Item: HashValue, + H: HashStrategy + ?Sized, { assert_eq!( hashes_buffer.len(), @@ -278,24 +356,23 @@ fn hash_array( if rehash { for (i, hash) in hashes_buffer.iter_mut().enumerate() { let value = unsafe { array.value_unchecked(i) }; - *hash = combine_hashes(value.hash_one(random_state), *hash); + *hash = hash_strategy.rehash(&value, *hash); } } else { for (i, hash) in hashes_buffer.iter_mut().enumerate() { let value = unsafe { array.value_unchecked(i) }; - *hash = value.hash_one(random_state); + *hash = hash_strategy.hash_one(&value); } } } else if rehash { for i in array.nulls().unwrap().valid_indices() { let value = unsafe { array.value_unchecked(i) }; - hashes_buffer[i] = - combine_hashes(value.hash_one(random_state), hashes_buffer[i]); + hashes_buffer[i] = hash_strategy.rehash(&value, hashes_buffer[i]); } } else { for i in array.nulls().unwrap().valid_indices() { let value = unsafe { array.value_unchecked(i) }; - hashes_buffer[i] = value.hash_one(random_state); + hashes_buffer[i] = hash_strategy.hash_one(&value); } } } @@ -311,12 +388,13 @@ fn hash_array( #[inline(never)] fn hash_string_view_array_inner< T: ByteViewType, + H: HashStrategy + ?Sized, const HAS_NULLS: bool, const HAS_BUFFERS: bool, const REHASH: bool, >( array: &GenericByteViewArray, - random_state: &RandomState, + hash_strategy: &H, hashes_buffer: &mut [u64], ) { assert_eq!( @@ -345,22 +423,18 @@ fn hash_string_view_array_inner< // all views are inlined, no need to access external buffers if !HAS_BUFFERS || view_len <= 12 { if REHASH { - let mut hasher = seeded_state(*hash).build_hasher(); - v.hash_write(&mut hasher); - *hash = hasher.finish(); + *hash = hash_strategy.rehash(&v, *hash); } else { - *hash = v.hash_one(random_state); + *hash = hash_strategy.hash_one(&v); } continue; } // view is not inlined, so we need to hash the bytes as well let value = view_bytes(view_len, v); if REHASH { - let mut hasher = seeded_state(*hash).build_hasher(); - value.hash_write(&mut hasher); - *hash = hasher.finish(); + *hash = hash_strategy.rehash(&value, *hash); } else { - *hash = value.hash_one(random_state); + *hash = hash_strategy.hash_one(&value); } } } @@ -369,9 +443,9 @@ fn hash_string_view_array_inner< /// If `rehash==true` this combines the previous hash value in the buffer /// with the new hash using `combine_hashes` #[cfg(not(feature = "force_hash_collisions"))] -fn hash_generic_byte_view_array( +fn hash_generic_byte_view_array( array: &GenericByteViewArray, - random_state: &RandomState, + hash_strategy: &H, hashes_buffer: &mut [u64], rehash: bool, ) { @@ -385,44 +459,42 @@ fn hash_generic_byte_view_array( // don't call the inner function as Rust seems better able to inline this simpler code (2-3% faster) (false, false, false) => { for (hash, &view) in hashes_buffer.iter_mut().zip(array.views().iter()) { - *hash = view.hash_one(random_state); + *hash = hash_strategy.hash_one(&view); } } (false, false, true) => { for (hash, &view) in hashes_buffer.iter_mut().zip(array.views().iter()) { - let mut hasher = seeded_state(*hash).build_hasher(); - view.hash_write(&mut hasher); - *hash = hasher.finish(); + *hash = hash_strategy.rehash(&view, *hash); } } - (false, true, false) => hash_string_view_array_inner::( + (false, true, false) => hash_string_view_array_inner::( array, - random_state, + hash_strategy, hashes_buffer, ), - (false, true, true) => hash_string_view_array_inner::( + (false, true, true) => hash_string_view_array_inner::( array, - random_state, + hash_strategy, hashes_buffer, ), - (true, false, false) => hash_string_view_array_inner::( + (true, false, false) => hash_string_view_array_inner::( array, - random_state, + hash_strategy, hashes_buffer, ), - (true, false, true) => hash_string_view_array_inner::( + (true, false, true) => hash_string_view_array_inner::( array, - random_state, + hash_strategy, hashes_buffer, ), - (true, true, false) => hash_string_view_array_inner::( + (true, true, false) => hash_string_view_array_inner::( array, - random_state, + hash_strategy, hashes_buffer, ), - (true, true, true) => hash_string_view_array_inner::( + (true, true, true) => hash_string_view_array_inner::( array, - random_state, + hash_strategy, hashes_buffer, ), } @@ -438,12 +510,13 @@ fn hash_generic_byte_view_array( #[inline(never)] fn hash_dictionary_inner< K: ArrowDictionaryKeyType, + H: HashStrategy + ?Sized, const HAS_NULL_KEYS: bool, const HAS_NULL_VALUES: bool, const MULTI_COL: bool, >( array: &DictionaryArray, - random_state: &RandomState, + hash_strategy: &H, hashes_buffer: &mut [u64], ) -> Result<()> { // Hash each dictionary value once, and then use that computed @@ -451,7 +524,7 @@ fn hash_dictionary_inner< // redundant hashing for large dictionary elements (e.g. strings) let dict_values = array.values(); let mut dict_hashes = vec![0; dict_values.len()]; - create_hashes([dict_values], random_state, &mut dict_hashes)?; + create_hashes_impl([dict_values], hash_strategy, &mut dict_hashes)?; if HAS_NULL_KEYS { for (hash, key) in hashes_buffer.iter_mut().zip(array.keys().iter()) { @@ -483,9 +556,9 @@ fn hash_dictionary_inner< /// Hash the values in a dictionary array #[cfg(not(feature = "force_hash_collisions"))] -fn hash_dictionary( +fn hash_dictionary( array: &DictionaryArray, - random_state: &RandomState, + hash_strategy: &H, hashes_buffer: &mut [u64], multi_col: bool, ) -> Result<()> { @@ -495,53 +568,53 @@ fn hash_dictionary( // Dispatcher based on null presence and multi-column mode // Should reduce branching within hot loops match (has_null_keys, has_null_values, multi_col) { - (false, false, false) => hash_dictionary_inner::( + (false, false, false) => hash_dictionary_inner::( array, - random_state, + hash_strategy, hashes_buffer, ), - (false, false, true) => hash_dictionary_inner::( + (false, false, true) => hash_dictionary_inner::( array, - random_state, + hash_strategy, hashes_buffer, ), - (false, true, false) => hash_dictionary_inner::( + (false, true, false) => hash_dictionary_inner::( array, - random_state, + hash_strategy, hashes_buffer, ), - (false, true, true) => hash_dictionary_inner::( + (false, true, true) => hash_dictionary_inner::( array, - random_state, + hash_strategy, hashes_buffer, ), - (true, false, false) => hash_dictionary_inner::( + (true, false, false) => hash_dictionary_inner::( array, - random_state, + hash_strategy, hashes_buffer, ), - (true, false, true) => hash_dictionary_inner::( + (true, false, true) => hash_dictionary_inner::( array, - random_state, + hash_strategy, hashes_buffer, ), - (true, true, false) => hash_dictionary_inner::( + (true, true, false) => hash_dictionary_inner::( array, - random_state, + hash_strategy, hashes_buffer, ), - (true, true, true) => hash_dictionary_inner::( + (true, true, true) => hash_dictionary_inner::( array, - random_state, + hash_strategy, hashes_buffer, ), } } #[cfg(not(feature = "force_hash_collisions"))] -fn hash_struct_array( +fn hash_struct_array( array: &StructArray, - random_state: &RandomState, + hash_strategy: &H, hashes_buffer: &mut [u64], ) -> Result<()> { let nulls = array.nulls(); @@ -549,7 +622,7 @@ fn hash_struct_array( // Create hashes for each row that combines the hashes over all the column at that row. let mut values_hashes = vec![0u64; row_len]; - create_hashes(array.columns(), random_state, &mut values_hashes)?; + create_hashes_impl(array.columns(), hash_strategy, &mut values_hashes)?; // Separate paths to avoid allocating Vec when there are no nulls if let Some(nulls) = nulls { @@ -569,9 +642,9 @@ fn hash_struct_array( // only adding this `cfg` b/c this function is only used with this `cfg` #[cfg(not(feature = "force_hash_collisions"))] -fn hash_map_array( +fn hash_map_array( array: &MapArray, - random_state: &RandomState, + hash_strategy: &H, hashes_buffer: &mut [u64], ) -> Result<()> { let nulls = array.nulls(); @@ -590,7 +663,7 @@ fn hash_map_array( .iter() .map(|col| col.slice(first_offset, entries_len)) .collect(); - create_hashes(&sliced_columns, random_state, &mut values_hashes)?; + create_hashes_impl(&sliced_columns, hash_strategy, &mut values_hashes)?; // Combine the hashes for entries on each row with each other and previous hash for that row // Adjust indices by first_offset since values_hashes is sliced starting from first_offset @@ -620,24 +693,25 @@ fn hash_map_array( } #[cfg(not(feature = "force_hash_collisions"))] -fn hash_list_array( +fn hash_list_array( array: &GenericListArray, - random_state: &RandomState, + hash_strategy: &H, hashes_buffer: &mut [u64], ) -> Result<()> where OffsetSize: OffsetSizeTrait, + H: HashStrategy + ?Sized, { // In case values is sliced, hash only the bytes used by the offsets of this ListArray let first_offset = array.value_offsets().first().cloned().unwrap_or_default(); let last_offset = array.value_offsets().last().cloned().unwrap_or_default(); let value_bytes_len = (last_offset - first_offset).as_usize(); let mut values_hashes = vec![0u64; value_bytes_len]; - create_hashes( + create_hashes_impl( [array .values() .slice(first_offset.as_usize(), value_bytes_len)], - random_state, + hash_strategy, &mut values_hashes, )?; @@ -671,20 +745,21 @@ where } #[cfg(not(feature = "force_hash_collisions"))] -fn hash_list_view_array( +fn hash_list_view_array( array: &GenericListViewArray, - random_state: &RandomState, + hash_strategy: &H, hashes_buffer: &mut [u64], ) -> Result<()> where OffsetSize: OffsetSizeTrait, + H: HashStrategy + ?Sized, { let values = array.values(); let offsets = array.value_offsets(); let sizes = array.value_sizes(); let nulls = array.nulls(); let mut values_hashes = vec![0u64; values.len()]; - create_hashes([values], random_state, &mut values_hashes)?; + create_hashes_impl([values], hash_strategy, &mut values_hashes)?; if let Some(nulls) = nulls { for (i, (offset, size)) in offsets.iter().zip(sizes.iter()).enumerate() { if nulls.is_valid(i) { @@ -710,9 +785,9 @@ where } #[cfg(not(feature = "force_hash_collisions"))] -fn hash_union_array( +fn hash_union_array( array: &UnionArray, - random_state: &RandomState, + hash_strategy: &H, hashes_buffer: &mut [u64], ) -> Result<()> { let DataType::Union(union_fields, _mode) = array.data_type() else { @@ -722,12 +797,12 @@ fn hash_union_array( if array.is_dense() { // Dense union: children only contain values of their type, so they're already compact. // Use the default hashing approach which is efficient for dense unions. - hash_union_array_default(array, union_fields, random_state, hashes_buffer) + hash_union_array_default(array, union_fields, hash_strategy, hashes_buffer) } else { // Sparse union: each child has the same length as the union array. // Optimization: only hash the elements that are actually referenced by type_ids, // instead of hashing all K*N elements (where K = num types, N = array length). - hash_sparse_union_array(array, union_fields, random_state, hashes_buffer) + hash_sparse_union_array(array, union_fields, hash_strategy, hashes_buffer) } } @@ -741,10 +816,10 @@ fn hash_union_array( /// `hash_sparse_union_array` is more efficient, but for 1-2 types or dense unions, /// this simpler approach is preferred. #[cfg(not(feature = "force_hash_collisions"))] -fn hash_union_array_default( +fn hash_union_array_default( array: &UnionArray, union_fields: &UnionFields, - random_state: &RandomState, + hash_strategy: &H, hashes_buffer: &mut [u64], ) -> Result<()> { let mut child_hashes: HashMap> = @@ -754,7 +829,7 @@ fn hash_union_array_default( for (type_id, _field) in union_fields.iter() { let child = array.child(type_id); let mut child_hash_buffer = vec![0; child.len()]; - create_hashes([child], random_state, &mut child_hash_buffer)?; + create_hashes_impl([child], hash_strategy, &mut child_hash_buffer)?; child_hashes.insert(type_id, child_hash_buffer); } @@ -782,10 +857,10 @@ fn hash_union_array_default( /// For 1-2 types, the overhead of take/scatter outweighs the benefit, so we use /// the default approach of hashing all children (same as dense unions). #[cfg(not(feature = "force_hash_collisions"))] -fn hash_sparse_union_array( +fn hash_sparse_union_array( array: &UnionArray, union_fields: &UnionFields, - random_state: &RandomState, + hash_strategy: &H, hashes_buffer: &mut [u64], ) -> Result<()> { use std::collections::HashMap; @@ -796,7 +871,7 @@ fn hash_sparse_union_array( return hash_union_array_default( array, union_fields, - random_state, + hash_strategy, hashes_buffer, ); } @@ -824,7 +899,7 @@ fn hash_sparse_union_array( // Hash the filtered array let mut filtered_hashes = vec![0u64; filtered.len()]; - create_hashes([&filtered], random_state, &mut filtered_hashes)?; + create_hashes_impl([&filtered], hash_strategy, &mut filtered_hashes)?; // Scatter hashes back to correct positions for (hash, &idx) in filtered_hashes.iter().zip(indices.iter()) { @@ -838,16 +913,16 @@ fn hash_sparse_union_array( } #[cfg(not(feature = "force_hash_collisions"))] -fn hash_fixed_list_array( +fn hash_fixed_list_array( array: &FixedSizeListArray, - random_state: &RandomState, + hash_strategy: &H, hashes_buffer: &mut [u64], ) -> Result<()> { let values = array.values(); let value_length = array.value_length() as usize; let nulls = array.nulls(); let mut values_hashes = vec![0u64; values.len()]; - create_hashes([values], random_state, &mut values_hashes)?; + create_hashes_impl([values], hash_strategy, &mut values_hashes)?; if let Some(nulls) = nulls { for i in 0..array.len() { if nulls.is_valid(i) { @@ -875,11 +950,12 @@ fn hash_fixed_list_array( #[cfg(not(feature = "force_hash_collisions"))] fn hash_run_array_inner< R: RunEndIndexType, + H: HashStrategy + ?Sized, const HAS_NULL_VALUES: bool, const REHASH: bool, >( array: &RunArray, - random_state: &RandomState, + hash_strategy: &H, hashes_buffer: &mut [u64], ) -> Result<()> { // We find the relevant runs that cover potentially sliced arrays, so we can only hash those @@ -906,9 +982,9 @@ fn hash_run_array_inner< end_physical_index - start_physical_index, ); let mut values_hashes = vec![0u64; sliced_values.len()]; - create_hashes( + create_hashes_impl( std::slice::from_ref(&sliced_values), - random_state, + hash_strategy, &mut values_hashes, )?; @@ -944,26 +1020,28 @@ fn hash_run_array_inner< } #[cfg(not(feature = "force_hash_collisions"))] -fn hash_run_array( +fn hash_run_array( array: &RunArray, - random_state: &RandomState, + hash_strategy: &H, hashes_buffer: &mut [u64], rehash: bool, ) -> Result<()> { let has_null_values = array.values().null_count() != 0; match (has_null_values, rehash) { - (false, false) => { - hash_run_array_inner::(array, random_state, hashes_buffer) - } + (false, false) => hash_run_array_inner::( + array, + hash_strategy, + hashes_buffer, + ), (false, true) => { - hash_run_array_inner::(array, random_state, hashes_buffer) + hash_run_array_inner::(array, hash_strategy, hashes_buffer) } (true, false) => { - hash_run_array_inner::(array, random_state, hashes_buffer) + hash_run_array_inner::(array, hash_strategy, hashes_buffer) } (true, true) => { - hash_run_array_inner::(array, random_state, hashes_buffer) + hash_run_array_inner::(array, hash_strategy, hashes_buffer) } } } @@ -971,64 +1049,64 @@ fn hash_run_array( /// Internal helper function that hashes a single array and either initializes or combines /// the hash values in the buffer. #[cfg(not(feature = "force_hash_collisions"))] -fn hash_single_array( +fn hash_single_array( array: &dyn Array, - random_state: &RandomState, + hash_strategy: &H, hashes_buffer: &mut [u64], rehash: bool, ) -> Result<()> { downcast_primitive_array! { - array => hash_array_primitive(array, random_state, hashes_buffer, rehash), - DataType::Null => hash_null(random_state, hashes_buffer, rehash), - DataType::Boolean => hash_array(&as_boolean_array(array)?, random_state, hashes_buffer, rehash), - DataType::Utf8 => hash_array(&as_string_array(array)?, random_state, hashes_buffer, rehash), - DataType::Utf8View => hash_generic_byte_view_array(as_string_view_array(array)?, random_state, hashes_buffer, rehash), - DataType::LargeUtf8 => hash_array(&as_largestring_array(array), random_state, hashes_buffer, rehash), - DataType::Binary => hash_array(&as_generic_binary_array::(array)?, random_state, hashes_buffer, rehash), - DataType::BinaryView => hash_generic_byte_view_array(as_binary_view_array(array)?, random_state, hashes_buffer, rehash), - DataType::LargeBinary => hash_array(&as_generic_binary_array::(array)?, random_state, hashes_buffer, rehash), + array => hash_array_primitive(array, hash_strategy, hashes_buffer, rehash), + DataType::Null => hash_null(hash_strategy, hashes_buffer, rehash), + DataType::Boolean => hash_array(&as_boolean_array(array)?, hash_strategy, hashes_buffer, rehash), + DataType::Utf8 => hash_array(&as_string_array(array)?, hash_strategy, hashes_buffer, rehash), + DataType::Utf8View => hash_generic_byte_view_array(as_string_view_array(array)?, hash_strategy, hashes_buffer, rehash), + DataType::LargeUtf8 => hash_array(&as_largestring_array(array), hash_strategy, hashes_buffer, rehash), + DataType::Binary => hash_array(&as_generic_binary_array::(array)?, hash_strategy, hashes_buffer, rehash), + DataType::BinaryView => hash_generic_byte_view_array(as_binary_view_array(array)?, hash_strategy, hashes_buffer, rehash), + DataType::LargeBinary => hash_array(&as_generic_binary_array::(array)?, hash_strategy, hashes_buffer, rehash), DataType::FixedSizeBinary(_) => { let array: &FixedSizeBinaryArray = array.as_any().downcast_ref().unwrap(); - hash_array(&array, random_state, hashes_buffer, rehash) + hash_array(&array, hash_strategy, hashes_buffer, rehash) } DataType::Dictionary(_, _) => downcast_dictionary_array! { - array => hash_dictionary(array, random_state, hashes_buffer, rehash)?, + array => hash_dictionary(array, hash_strategy, hashes_buffer, rehash)?, _ => unreachable!() } DataType::Struct(_) => { let array = as_struct_array(array)?; - hash_struct_array(array, random_state, hashes_buffer)?; + hash_struct_array(array, hash_strategy, hashes_buffer)?; } DataType::List(_) => { let array = as_list_array(array)?; - hash_list_array(array, random_state, hashes_buffer)?; + hash_list_array(array, hash_strategy, hashes_buffer)?; } DataType::LargeList(_) => { let array = as_large_list_array(array)?; - hash_list_array(array, random_state, hashes_buffer)?; + hash_list_array(array, hash_strategy, hashes_buffer)?; } DataType::ListView(_) => { let array = as_list_view_array(array)?; - hash_list_view_array(array, random_state, hashes_buffer)?; + hash_list_view_array(array, hash_strategy, hashes_buffer)?; } DataType::LargeListView(_) => { let array = as_large_list_view_array(array)?; - hash_list_view_array(array, random_state, hashes_buffer)?; + hash_list_view_array(array, hash_strategy, hashes_buffer)?; } DataType::Map(_, _) => { let array = as_map_array(array)?; - hash_map_array(array, random_state, hashes_buffer)?; + hash_map_array(array, hash_strategy, hashes_buffer)?; } DataType::FixedSizeList(_,_) => { let array = as_fixed_size_list_array(array)?; - hash_fixed_list_array(array, random_state, hashes_buffer)?; + hash_fixed_list_array(array, hash_strategy, hashes_buffer)?; } DataType::Union(_, _) => { let array = as_union_array(array)?; - hash_union_array(array, random_state, hashes_buffer)?; + hash_union_array(array, hash_strategy, hashes_buffer)?; } DataType::RunEndEncoded(_, _) => downcast_run_array! { - array => hash_run_array(array, random_state, hashes_buffer, rehash)?, + array => hash_run_array(array, hash_strategy, hashes_buffer, rehash)?, _ => unreachable!() } _ => { @@ -1044,9 +1122,9 @@ fn hash_single_array( /// Test version of `hash_single_array` that forces all hashes to collide to zero. #[cfg(feature = "force_hash_collisions")] -fn hash_single_array( +fn hash_single_array( _array: &dyn Array, - _random_state: &RandomState, + _hash_strategy: &H, hashes_buffer: &mut [u64], _rehash: bool, ) -> Result<()> { @@ -1093,6 +1171,24 @@ impl AsDynArray for &ArrayRef { } } +fn create_hashes_impl<'a, I, T, H>( + arrays: I, + hash_strategy: &H, + hashes_buffer: &'a mut [u64], +) -> Result<&'a mut [u64]> +where + I: IntoIterator, + T: AsDynArray, + H: HashStrategy + ?Sized, +{ + for (i, array) in arrays.into_iter().enumerate() { + // combine hashes with `combine_hashes` for all columns besides the first + let rehash = i >= 1; + hash_single_array(array.as_dyn_array(), hash_strategy, hashes_buffer, rehash)?; + } + Ok(hashes_buffer) +} + /// Creates hash values for every row, based on the values in the columns. /// /// The number of rows to hash is determined by `hashes_buffer.len()`. @@ -1106,16 +1202,29 @@ where I: IntoIterator, T: AsDynArray, { - for (i, array) in arrays.into_iter().enumerate() { - // combine hashes with `combine_hashes` for all columns besides the first - let rehash = i >= 1; - hash_single_array(array.as_dyn_array(), random_state, hashes_buffer, rehash)?; - } - Ok(hashes_buffer) + create_hashes_impl(arrays, &DefaultHashStrategy { random_state }, hashes_buffer) +} + +/// Creates hash values for every row using a caller-provided hash builder. +/// +/// The number of rows to hash is determined by `hashes_buffer.len()`. +/// `hashes_buffer` should be pre-sized appropriately. +pub fn create_hashes_with_hasher<'a, I, T, S>( + arrays: I, + hash_builder: &S, + hashes_buffer: &'a mut [u64], +) -> Result<&'a mut [u64]> +where + I: IntoIterator, + T: AsDynArray, + S: BuildHasher, +{ + create_hashes_impl(arrays, &CustomHashStrategy { hash_builder }, hashes_buffer) } #[cfg(test)] mod tests { + use std::hash::{BuildHasherDefault, Hasher}; use std::sync::Arc; use arrow::array::*; @@ -1124,6 +1233,21 @@ mod tests { use super::*; + #[derive(Default)] + struct TestHasher(u64); + + impl Hasher for TestHasher { + fn finish(&self) -> u64 { + self.0 + } + + fn write(&mut self, bytes: &[u8]) { + for byte in bytes { + self.0 = self.0.wrapping_mul(37).wrapping_add(u64::from(*byte)); + } + } + } + #[test] fn create_hashes_for_decimal_array() -> Result<()> { let array = vec![1, 2, 3, 4] @@ -1890,6 +2014,47 @@ mod tests { ); } + #[cfg(not(feature = "force_hash_collisions"))] + #[test] + fn test_create_hashes_with_custom_hasher() { + let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 1, 4])); + let hash_builder = BuildHasherDefault::::default(); + + let mut custom_hashes = vec![0; array.len()]; + create_hashes_with_hasher([&array], &hash_builder, &mut custom_hashes).unwrap(); + + let random_state = RandomState::with_seed(0); + let mut default_hashes = vec![0; array.len()]; + create_hashes([&array], &random_state, &mut default_hashes).unwrap(); + + assert_eq!(custom_hashes[0], custom_hashes[2]); + assert_ne!(custom_hashes[0], custom_hashes[1]); + assert_ne!(custom_hashes, default_hashes); + } + + #[test] + fn test_with_hashes_with_custom_hasher() { + let int_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3])); + let str_array: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c"])); + let hash_builder = BuildHasherDefault::::default(); + + let mut expected_hashes = vec![0; int_array.len()]; + create_hashes_with_hasher( + [&int_array, &str_array], + &hash_builder, + &mut expected_hashes, + ) + .unwrap(); + + let actual_hashes = + with_hashes_with_hasher([&int_array, &str_array], &hash_builder, |hashes| { + Ok(hashes.to_vec()) + }) + .unwrap(); + + assert_eq!(actual_hashes, expected_hashes); + } + #[test] #[cfg(not(feature = "force_hash_collisions"))] fn create_hashes_for_sparse_union_arrays() { From 6a1cc02bc5ccb9a8bd2da8e5fc301d59a3cd0157 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Tue, 7 Apr 2026 12:16:08 +0800 Subject: [PATCH 2/9] Keep hash_array rehash behavior --- datafusion/common/src/hash_utils.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/datafusion/common/src/hash_utils.rs b/datafusion/common/src/hash_utils.rs index e5d8b580f9766..345242ae947dc 100644 --- a/datafusion/common/src/hash_utils.rs +++ b/datafusion/common/src/hash_utils.rs @@ -356,7 +356,7 @@ where if rehash { for (i, hash) in hashes_buffer.iter_mut().enumerate() { let value = unsafe { array.value_unchecked(i) }; - *hash = hash_strategy.rehash(&value, *hash); + *hash = combine_hashes(hash_strategy.hash_one(&value), *hash); } } else { for (i, hash) in hashes_buffer.iter_mut().enumerate() { @@ -367,7 +367,8 @@ where } else if rehash { for i in array.nulls().unwrap().valid_indices() { let value = unsafe { array.value_unchecked(i) }; - hashes_buffer[i] = hash_strategy.rehash(&value, hashes_buffer[i]); + hashes_buffer[i] = + combine_hashes(hash_strategy.hash_one(&value), hashes_buffer[i]); } } else { for i in array.nulls().unwrap().valid_indices() { From 1dec571c7b62635d957bf0ff69e1fbcc694ab898 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Wed, 8 Apr 2026 15:48:42 +0800 Subject: [PATCH 3/9] perf: inline hash_utils wrappers --- datafusion/common/src/hash_utils.rs | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/datafusion/common/src/hash_utils.rs b/datafusion/common/src/hash_utils.rs index 345242ae947dc..b6b7a373fd60c 100644 --- a/datafusion/common/src/hash_utils.rs +++ b/datafusion/common/src/hash_utils.rs @@ -97,6 +97,7 @@ thread_local! { /// Ok(hashes.len()) /// })?; /// ``` +#[inline(always)] pub fn with_hashes( arrays: I, random_state: &RandomState, @@ -112,6 +113,7 @@ where /// Creates hashes for the given arrays using a thread-local buffer and a custom hash builder, /// then calls the provided callback with an immutable reference to the computed hashes. +#[inline(always)] pub fn with_hashes_with_hasher( arrays: I, hash_builder: &S, @@ -126,6 +128,7 @@ where with_hashes_impl(arrays, &CustomHashStrategy { hash_builder }, callback) } +#[inline(always)] fn with_hashes_impl(arrays: I, hash_strategy: &H, callback: F) -> Result where I: IntoIterator, @@ -240,27 +243,31 @@ fn seeded_state(seed: u64) -> foldhash::fast::SeedableRandomState { ) } -#[cfg_attr(feature = "force_hash_collisions", allow(dead_code))] trait HashStrategy { + #[cfg_attr(feature = "force_hash_collisions", expect(dead_code))] fn hash_null(&self) -> u64; + #[cfg_attr(feature = "force_hash_collisions", expect(dead_code))] fn hash_one(&self, value: &T) -> u64; + #[cfg_attr(feature = "force_hash_collisions", expect(dead_code))] fn rehash(&self, value: &T, existing_hash: u64) -> u64; } -#[cfg_attr(feature = "force_hash_collisions", allow(dead_code))] struct DefaultHashStrategy<'a> { random_state: &'a RandomState, } impl HashStrategy for DefaultHashStrategy<'_> { + #[inline(always)] fn hash_null(&self) -> u64 { self.random_state.hash_one(1) } + #[inline(always)] fn hash_one(&self, value: &T) -> u64 { value.hash_one(self.random_state) } + #[inline(always)] fn rehash(&self, value: &T, existing_hash: u64) -> u64 { #[cfg(not(feature = "force_hash_collisions"))] { @@ -275,20 +282,22 @@ impl HashStrategy for DefaultHashStrategy<'_> { } } -#[cfg_attr(feature = "force_hash_collisions", allow(dead_code))] struct CustomHashStrategy<'a, S> { hash_builder: &'a S, } impl HashStrategy for CustomHashStrategy<'_, S> { + #[inline(always)] fn hash_null(&self) -> u64 { self.hash_builder.hash_one(1) } + #[inline(always)] fn hash_one(&self, value: &T) -> u64 { value.hash_one(self.hash_builder) } + #[inline(always)] fn rehash(&self, value: &T, existing_hash: u64) -> u64 { combine_hashes(value.hash_one(self.hash_builder), existing_hash) } @@ -1172,6 +1181,7 @@ impl AsDynArray for &ArrayRef { } } +#[inline(always)] fn create_hashes_impl<'a, I, T, H>( arrays: I, hash_strategy: &H, @@ -1194,6 +1204,7 @@ where /// /// The number of rows to hash is determined by `hashes_buffer.len()`. /// `hashes_buffer` should be pre-sized appropriately. +#[inline(always)] pub fn create_hashes<'a, I, T>( arrays: I, random_state: &RandomState, @@ -1210,6 +1221,7 @@ where /// /// The number of rows to hash is determined by `hashes_buffer.len()`. /// `hashes_buffer` should be pre-sized appropriately. +#[inline(always)] pub fn create_hashes_with_hasher<'a, I, T, S>( arrays: I, hash_builder: &S, From 12821fc508e96b598b3c0ad1b9832bb027ada6ed Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Wed, 8 Apr 2026 16:24:29 +0800 Subject: [PATCH 4/9] Revert "perf: inline hash_utils wrappers" This reverts commit 1dec571c7b62635d957bf0ff69e1fbcc694ab898. --- datafusion/common/src/hash_utils.rs | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/datafusion/common/src/hash_utils.rs b/datafusion/common/src/hash_utils.rs index b6b7a373fd60c..345242ae947dc 100644 --- a/datafusion/common/src/hash_utils.rs +++ b/datafusion/common/src/hash_utils.rs @@ -97,7 +97,6 @@ thread_local! { /// Ok(hashes.len()) /// })?; /// ``` -#[inline(always)] pub fn with_hashes( arrays: I, random_state: &RandomState, @@ -113,7 +112,6 @@ where /// Creates hashes for the given arrays using a thread-local buffer and a custom hash builder, /// then calls the provided callback with an immutable reference to the computed hashes. -#[inline(always)] pub fn with_hashes_with_hasher( arrays: I, hash_builder: &S, @@ -128,7 +126,6 @@ where with_hashes_impl(arrays, &CustomHashStrategy { hash_builder }, callback) } -#[inline(always)] fn with_hashes_impl(arrays: I, hash_strategy: &H, callback: F) -> Result where I: IntoIterator, @@ -243,31 +240,27 @@ fn seeded_state(seed: u64) -> foldhash::fast::SeedableRandomState { ) } +#[cfg_attr(feature = "force_hash_collisions", allow(dead_code))] trait HashStrategy { - #[cfg_attr(feature = "force_hash_collisions", expect(dead_code))] fn hash_null(&self) -> u64; - #[cfg_attr(feature = "force_hash_collisions", expect(dead_code))] fn hash_one(&self, value: &T) -> u64; - #[cfg_attr(feature = "force_hash_collisions", expect(dead_code))] fn rehash(&self, value: &T, existing_hash: u64) -> u64; } +#[cfg_attr(feature = "force_hash_collisions", allow(dead_code))] struct DefaultHashStrategy<'a> { random_state: &'a RandomState, } impl HashStrategy for DefaultHashStrategy<'_> { - #[inline(always)] fn hash_null(&self) -> u64 { self.random_state.hash_one(1) } - #[inline(always)] fn hash_one(&self, value: &T) -> u64 { value.hash_one(self.random_state) } - #[inline(always)] fn rehash(&self, value: &T, existing_hash: u64) -> u64 { #[cfg(not(feature = "force_hash_collisions"))] { @@ -282,22 +275,20 @@ impl HashStrategy for DefaultHashStrategy<'_> { } } +#[cfg_attr(feature = "force_hash_collisions", allow(dead_code))] struct CustomHashStrategy<'a, S> { hash_builder: &'a S, } impl HashStrategy for CustomHashStrategy<'_, S> { - #[inline(always)] fn hash_null(&self) -> u64 { self.hash_builder.hash_one(1) } - #[inline(always)] fn hash_one(&self, value: &T) -> u64 { value.hash_one(self.hash_builder) } - #[inline(always)] fn rehash(&self, value: &T, existing_hash: u64) -> u64 { combine_hashes(value.hash_one(self.hash_builder), existing_hash) } @@ -1181,7 +1172,6 @@ impl AsDynArray for &ArrayRef { } } -#[inline(always)] fn create_hashes_impl<'a, I, T, H>( arrays: I, hash_strategy: &H, @@ -1204,7 +1194,6 @@ where /// /// The number of rows to hash is determined by `hashes_buffer.len()`. /// `hashes_buffer` should be pre-sized appropriately. -#[inline(always)] pub fn create_hashes<'a, I, T>( arrays: I, random_state: &RandomState, @@ -1221,7 +1210,6 @@ where /// /// The number of rows to hash is determined by `hashes_buffer.len()`. /// `hashes_buffer` should be pre-sized appropriately. -#[inline(always)] pub fn create_hashes_with_hasher<'a, I, T, S>( arrays: I, hash_builder: &S, From c022fb45432fdacc7be39e0795696614c115fb64 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Thu, 9 Apr 2026 14:56:58 +0800 Subject: [PATCH 5/9] datafusion-common: split hash_utils hasher paths Keep the default RandomState path concrete while adding dedicated BuildHasher entry points and tests. Also wire the parquet feature through object_store/tokio so datafusion-common all-features clippy builds cleanly. --- datafusion/common/Cargo.toml | 4 +- datafusion/common/src/hash_utils.rs | 768 +++++++++++++++++++++------- 2 files changed, 576 insertions(+), 196 deletions(-) diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index dbb81a4824c0d..abe2ff718d095 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -49,7 +49,7 @@ parquet_encryption = [ ] force_hash_collisions = [] recursive_protection = ["dep:recursive"] -parquet = ["dep:parquet"] +parquet = ["dep:parquet", "object_store"] sql = ["sqlparser"] [[bench]] @@ -77,7 +77,7 @@ indexmap = { workspace = true } itertools = { workspace = true } libc = "0.2.180" log = { workspace = true } -object_store = { workspace = true, optional = true } +object_store = { workspace = true, optional = true, features = ["tokio"] } parquet = { workspace = true, optional = true, default-features = true } recursive = { workspace = true, optional = true } sqlparser = { workspace = true, optional = true } diff --git a/datafusion/common/src/hash_utils.rs b/datafusion/common/src/hash_utils.rs index 345242ae947dc..f4bbbf7656784 100644 --- a/datafusion/common/src/hash_utils.rs +++ b/datafusion/common/src/hash_utils.rs @@ -107,11 +107,37 @@ where T: AsDynArray, F: FnOnce(&[u64]) -> Result, { - with_hashes_impl(arrays, &DefaultHashStrategy { random_state }, callback) + // Keep the default RandomState path monomorphic: routing it through the + // generic custom-hasher abstraction regressed the CI aarch64 with_hashes benchmark. + let mut iter = arrays.into_iter().peekable(); + + let required_size = match iter.peek() { + Some(arr) => arr.as_dyn_array().len(), + None => return _internal_err!("with_hashes requires at least one array"), + }; + + HASH_BUFFER.try_with(|cell| { + let mut buffer = cell.try_borrow_mut() + .map_err(|_| _internal_datafusion_err!("with_hashes cannot be called reentrantly on the same thread"))?; + + buffer.clear(); + buffer.resize(required_size, 0); + + create_hashes(iter, random_state, &mut buffer[..required_size])?; + + let result = callback(&buffer[..required_size])?; + + if buffer.capacity() > MAX_BUFFER_SIZE { + buffer.truncate(MAX_BUFFER_SIZE); + buffer.shrink_to_fit(); + } + + Ok(result) + }).map_err(|_| _internal_datafusion_err!("with_hashes cannot access thread-local storage during or after thread destruction"))? } -/// Creates hashes for the given arrays using a thread-local buffer and a custom hash builder, -/// then calls the provided callback with an immutable reference to the computed hashes. +/// Creates hashes for the given arrays using a thread-local buffer and a custom +/// hash builder, then calls the provided callback with the computed hashes. pub fn with_hashes_with_hasher( arrays: I, hash_builder: &S, @@ -123,20 +149,22 @@ where F: FnOnce(&[u64]) -> Result, S: BuildHasher, { - with_hashes_impl(arrays, &CustomHashStrategy { hash_builder }, callback) + with_hashes_with_hasher_impl(arrays, hash_builder, callback) } -fn with_hashes_impl(arrays: I, hash_strategy: &H, callback: F) -> Result +fn with_hashes_with_hasher_impl( + arrays: I, + hash_builder: &S, + callback: F, +) -> Result where I: IntoIterator, T: AsDynArray, F: FnOnce(&[u64]) -> Result, - H: HashStrategy + ?Sized, + S: BuildHasher, { - // Peek at the first array to determine buffer size without fully collecting let mut iter = arrays.into_iter().peekable(); - // Get the required size from the first array let required_size = match iter.peek() { Some(arr) => arr.as_dyn_array().len(), None => return _internal_err!("with_hashes requires at least one array"), @@ -146,17 +174,17 @@ where let mut buffer = cell.try_borrow_mut() .map_err(|_| _internal_datafusion_err!("with_hashes cannot be called reentrantly on the same thread"))?; - // Ensure buffer has sufficient length, clearing old values buffer.clear(); buffer.resize(required_size, 0); - // Create hashes in the buffer - this consumes the iterator - create_hashes_impl(iter, hash_strategy, &mut buffer[..required_size])?; + create_hashes_with_hasher_impl( + iter, + hash_builder, + &mut buffer[..required_size], + )?; - // Execute the callback with an immutable slice let result = callback(&buffer[..required_size])?; - // Cleanup: truncate if buffer grew too large if buffer.capacity() > MAX_BUFFER_SIZE { buffer.truncate(MAX_BUFFER_SIZE); buffer.shrink_to_fit(); @@ -167,19 +195,15 @@ where } #[cfg(not(feature = "force_hash_collisions"))] -fn hash_null( - hash_strategy: &H, - hashes_buffer: &'_ mut [u64], - mul_col: bool, -) { +fn hash_null(random_state: &RandomState, hashes_buffer: &'_ mut [u64], mul_col: bool) { if mul_col { hashes_buffer.iter_mut().for_each(|hash| { // stable hash for null value - *hash = combine_hashes(hash_strategy.hash_null(), *hash); + *hash = combine_hashes(random_state.hash_one(1), *hash); }) } else { hashes_buffer.iter_mut().for_each(|hash| { - *hash = hash_strategy.hash_null(); + *hash = random_state.hash_one(1); }) } } @@ -240,57 +264,46 @@ fn seeded_state(seed: u64) -> foldhash::fast::SeedableRandomState { ) } -#[cfg_attr(feature = "force_hash_collisions", allow(dead_code))] -trait HashStrategy { - fn hash_null(&self) -> u64; - fn hash_one(&self, value: &T) -> u64; - fn rehash(&self, value: &T, existing_hash: u64) -> u64; +#[cfg(not(feature = "force_hash_collisions"))] +// Only recursive child hashing is shared; leaf hot loops stay split so the +// default RandomState path keeps its concrete codegen. +trait ChildHashing { + fn create_hashes(&self, arrays: I, hashes_buffer: &mut [u64]) -> Result<()> + where + I: IntoIterator, + T: AsDynArray; } -#[cfg_attr(feature = "force_hash_collisions", allow(dead_code))] -struct DefaultHashStrategy<'a> { +#[cfg(not(feature = "force_hash_collisions"))] +struct RandomStateChildHashing<'a> { random_state: &'a RandomState, } -impl HashStrategy for DefaultHashStrategy<'_> { - fn hash_null(&self) -> u64 { - self.random_state.hash_one(1) - } - - fn hash_one(&self, value: &T) -> u64 { - value.hash_one(self.random_state) - } - - fn rehash(&self, value: &T, existing_hash: u64) -> u64 { - #[cfg(not(feature = "force_hash_collisions"))] - { - let mut hasher = seeded_state(existing_hash).build_hasher(); - value.hash_write(&mut hasher); - hasher.finish() - } - #[cfg(feature = "force_hash_collisions")] - { - combine_hashes(value.hash_one(self.random_state), existing_hash) - } +#[cfg(not(feature = "force_hash_collisions"))] +impl ChildHashing for RandomStateChildHashing<'_> { + fn create_hashes(&self, arrays: I, hashes_buffer: &mut [u64]) -> Result<()> + where + I: IntoIterator, + T: AsDynArray, + { + create_hashes(arrays, self.random_state, hashes_buffer).map(|_| ()) } } -#[cfg_attr(feature = "force_hash_collisions", allow(dead_code))] -struct CustomHashStrategy<'a, S> { +#[cfg(not(feature = "force_hash_collisions"))] +struct BuildHasherChildHashing<'a, S> { hash_builder: &'a S, } -impl HashStrategy for CustomHashStrategy<'_, S> { - fn hash_null(&self) -> u64 { - self.hash_builder.hash_one(1) - } - - fn hash_one(&self, value: &T) -> u64 { - value.hash_one(self.hash_builder) - } - - fn rehash(&self, value: &T, existing_hash: u64) -> u64 { - combine_hashes(value.hash_one(self.hash_builder), existing_hash) +#[cfg(not(feature = "force_hash_collisions"))] +impl ChildHashing for BuildHasherChildHashing<'_, S> { + fn create_hashes(&self, arrays: I, hashes_buffer: &mut [u64]) -> Result<()> + where + I: IntoIterator, + T: AsDynArray, + { + create_hashes_with_hasher_impl(arrays, self.hash_builder, hashes_buffer) + .map(|_| ()) } } @@ -298,14 +311,13 @@ impl HashStrategy for CustomHashStrategy<'_, S> { /// If `rehash==true` this folds the existing hash into the hasher state /// and hashes only the new value (avoiding a separate combine step). #[cfg(not(feature = "force_hash_collisions"))] -fn hash_array_primitive( +fn hash_array_primitive( array: &PrimitiveArray, - hash_strategy: &H, + random_state: &RandomState, hashes_buffer: &mut [u64], rehash: bool, ) where T: ArrowPrimitiveType, - H: HashStrategy + ?Sized, { assert_eq!( hashes_buffer.len(), @@ -316,22 +328,26 @@ fn hash_array_primitive( if array.null_count() == 0 { if rehash { for (hash, &value) in hashes_buffer.iter_mut().zip(array.values().iter()) { - *hash = hash_strategy.rehash(&value, *hash); + let mut hasher = seeded_state(*hash).build_hasher(); + value.hash_write(&mut hasher); + *hash = hasher.finish(); } } else { for (hash, &value) in hashes_buffer.iter_mut().zip(array.values().iter()) { - *hash = hash_strategy.hash_one(&value); + *hash = value.hash_one(random_state); } } } else if rehash { for i in array.nulls().unwrap().valid_indices() { let value = unsafe { array.value_unchecked(i) }; - hashes_buffer[i] = hash_strategy.rehash(&value, hashes_buffer[i]); + let mut hasher = seeded_state(hashes_buffer[i]).build_hasher(); + value.hash_write(&mut hasher); + hashes_buffer[i] = hasher.finish(); } } else { for i in array.nulls().unwrap().valid_indices() { let value = unsafe { array.value_unchecked(i) }; - hashes_buffer[i] = hash_strategy.hash_one(&value); + hashes_buffer[i] = value.hash_one(random_state); } } } @@ -340,11 +356,14 @@ fn hash_array_primitive( /// If `rehash==true` this combines the previous hash value in the buffer /// with the new hash using `combine_hashes` #[cfg(not(feature = "force_hash_collisions"))] -fn hash_array(array: &T, hash_strategy: &H, hashes_buffer: &mut [u64], rehash: bool) -where +fn hash_array( + array: &T, + random_state: &RandomState, + hashes_buffer: &mut [u64], + rehash: bool, +) where T: ArrayAccessor, T::Item: HashValue, - H: HashStrategy + ?Sized, { assert_eq!( hashes_buffer.len(), @@ -356,24 +375,24 @@ where if rehash { for (i, hash) in hashes_buffer.iter_mut().enumerate() { let value = unsafe { array.value_unchecked(i) }; - *hash = combine_hashes(hash_strategy.hash_one(&value), *hash); + *hash = combine_hashes(value.hash_one(random_state), *hash); } } else { for (i, hash) in hashes_buffer.iter_mut().enumerate() { let value = unsafe { array.value_unchecked(i) }; - *hash = hash_strategy.hash_one(&value); + *hash = value.hash_one(random_state); } } } else if rehash { for i in array.nulls().unwrap().valid_indices() { let value = unsafe { array.value_unchecked(i) }; hashes_buffer[i] = - combine_hashes(hash_strategy.hash_one(&value), hashes_buffer[i]); + combine_hashes(value.hash_one(random_state), hashes_buffer[i]); } } else { for i in array.nulls().unwrap().valid_indices() { let value = unsafe { array.value_unchecked(i) }; - hashes_buffer[i] = hash_strategy.hash_one(&value); + hashes_buffer[i] = value.hash_one(random_state); } } } @@ -389,13 +408,12 @@ where #[inline(never)] fn hash_string_view_array_inner< T: ByteViewType, - H: HashStrategy + ?Sized, const HAS_NULLS: bool, const HAS_BUFFERS: bool, const REHASH: bool, >( array: &GenericByteViewArray, - hash_strategy: &H, + random_state: &RandomState, hashes_buffer: &mut [u64], ) { assert_eq!( @@ -424,18 +442,22 @@ fn hash_string_view_array_inner< // all views are inlined, no need to access external buffers if !HAS_BUFFERS || view_len <= 12 { if REHASH { - *hash = hash_strategy.rehash(&v, *hash); + let mut hasher = seeded_state(*hash).build_hasher(); + v.hash_write(&mut hasher); + *hash = hasher.finish(); } else { - *hash = hash_strategy.hash_one(&v); + *hash = v.hash_one(random_state); } continue; } // view is not inlined, so we need to hash the bytes as well let value = view_bytes(view_len, v); if REHASH { - *hash = hash_strategy.rehash(&value, *hash); + let mut hasher = seeded_state(*hash).build_hasher(); + value.hash_write(&mut hasher); + *hash = hasher.finish(); } else { - *hash = hash_strategy.hash_one(&value); + *hash = value.hash_one(random_state); } } } @@ -444,9 +466,9 @@ fn hash_string_view_array_inner< /// If `rehash==true` this combines the previous hash value in the buffer /// with the new hash using `combine_hashes` #[cfg(not(feature = "force_hash_collisions"))] -fn hash_generic_byte_view_array( +fn hash_generic_byte_view_array( array: &GenericByteViewArray, - hash_strategy: &H, + random_state: &RandomState, hashes_buffer: &mut [u64], rehash: bool, ) { @@ -460,47 +482,270 @@ fn hash_generic_byte_view_array( // don't call the inner function as Rust seems better able to inline this simpler code (2-3% faster) (false, false, false) => { for (hash, &view) in hashes_buffer.iter_mut().zip(array.views().iter()) { - *hash = hash_strategy.hash_one(&view); + *hash = view.hash_one(random_state); } } (false, false, true) => { for (hash, &view) in hashes_buffer.iter_mut().zip(array.views().iter()) { - *hash = hash_strategy.rehash(&view, *hash); + let mut hasher = seeded_state(*hash).build_hasher(); + view.hash_write(&mut hasher); + *hash = hasher.finish(); } } - (false, true, false) => hash_string_view_array_inner::( + (false, true, false) => hash_string_view_array_inner::( array, - hash_strategy, + random_state, hashes_buffer, ), - (false, true, true) => hash_string_view_array_inner::( + (false, true, true) => hash_string_view_array_inner::( array, - hash_strategy, + random_state, hashes_buffer, ), - (true, false, false) => hash_string_view_array_inner::( + (true, false, false) => hash_string_view_array_inner::( array, - hash_strategy, + random_state, hashes_buffer, ), - (true, false, true) => hash_string_view_array_inner::( + (true, false, true) => hash_string_view_array_inner::( array, - hash_strategy, + random_state, hashes_buffer, ), - (true, true, false) => hash_string_view_array_inner::( + (true, true, false) => hash_string_view_array_inner::( array, - hash_strategy, + random_state, hashes_buffer, ), - (true, true, true) => hash_string_view_array_inner::( + (true, true, true) => hash_string_view_array_inner::( array, - hash_strategy, + random_state, hashes_buffer, ), } } +#[cfg(not(feature = "force_hash_collisions"))] +// The custom BuildHasher path intentionally mirrors the leaf helpers so the +// default path does not route its tight loops through generic abstractions. +fn hash_null_with_hasher( + hash_builder: &S, + hashes_buffer: &mut [u64], + mul_col: bool, +) { + if mul_col { + hashes_buffer.iter_mut().for_each(|hash| { + *hash = combine_hashes(hash_builder.hash_one(1), *hash); + }) + } else { + hashes_buffer.iter_mut().for_each(|hash| { + *hash = hash_builder.hash_one(1); + }) + } +} + +#[cfg(not(feature = "force_hash_collisions"))] +fn hash_array_primitive_with_hasher( + array: &PrimitiveArray, + hash_builder: &S, + hashes_buffer: &mut [u64], + rehash: bool, +) where + T: ArrowPrimitiveType, + S: BuildHasher, +{ + assert_eq!( + hashes_buffer.len(), + array.len(), + "hashes_buffer and array should be of equal length" + ); + + if array.null_count() == 0 { + if rehash { + for (hash, &value) in hashes_buffer.iter_mut().zip(array.values().iter()) { + *hash = combine_hashes(value.hash_one(hash_builder), *hash); + } + } else { + for (hash, &value) in hashes_buffer.iter_mut().zip(array.values().iter()) { + *hash = value.hash_one(hash_builder); + } + } + } else if rehash { + for i in array.nulls().unwrap().valid_indices() { + let value = unsafe { array.value_unchecked(i) }; + hashes_buffer[i] = + combine_hashes(value.hash_one(hash_builder), hashes_buffer[i]); + } + } else { + for i in array.nulls().unwrap().valid_indices() { + let value = unsafe { array.value_unchecked(i) }; + hashes_buffer[i] = value.hash_one(hash_builder); + } + } +} + +#[cfg(not(feature = "force_hash_collisions"))] +fn hash_array_with_hasher( + array: &T, + hash_builder: &S, + hashes_buffer: &mut [u64], + rehash: bool, +) where + T: ArrayAccessor, + T::Item: HashValue, + S: BuildHasher, +{ + assert_eq!( + hashes_buffer.len(), + array.len(), + "hashes_buffer and array should be of equal length" + ); + + if array.null_count() == 0 { + if rehash { + for (i, hash) in hashes_buffer.iter_mut().enumerate() { + let value = unsafe { array.value_unchecked(i) }; + *hash = combine_hashes(value.hash_one(hash_builder), *hash); + } + } else { + for (i, hash) in hashes_buffer.iter_mut().enumerate() { + let value = unsafe { array.value_unchecked(i) }; + *hash = value.hash_one(hash_builder); + } + } + } else if rehash { + for i in array.nulls().unwrap().valid_indices() { + let value = unsafe { array.value_unchecked(i) }; + hashes_buffer[i] = + combine_hashes(value.hash_one(hash_builder), hashes_buffer[i]); + } + } else { + for i in array.nulls().unwrap().valid_indices() { + let value = unsafe { array.value_unchecked(i) }; + hashes_buffer[i] = value.hash_one(hash_builder); + } + } +} + +#[cfg(not(feature = "force_hash_collisions"))] +#[inline(never)] +fn hash_string_view_array_inner_with_hasher< + T: ByteViewType, + S: BuildHasher, + const HAS_NULLS: bool, + const HAS_BUFFERS: bool, + const REHASH: bool, +>( + array: &GenericByteViewArray, + hash_builder: &S, + hashes_buffer: &mut [u64], +) { + assert_eq!( + hashes_buffer.len(), + array.len(), + "hashes_buffer and array should be of equal length" + ); + + let buffers = array.data_buffers(); + let view_bytes = |view_len: u32, view: u128| { + let view = ByteView::from(view); + let offset = view.offset as usize; + unsafe { + let data = buffers.get_unchecked(view.buffer_index as usize); + data.get_unchecked(offset..offset + view_len as usize) + } + }; + + let hashes_and_views = hashes_buffer.iter_mut().zip(array.views().iter()); + for (i, (hash, &v)) in hashes_and_views.enumerate() { + if HAS_NULLS && array.is_null(i) { + continue; + } + let view_len = v as u32; + if !HAS_BUFFERS || view_len <= 12 { + if REHASH { + *hash = combine_hashes(v.hash_one(hash_builder), *hash); + } else { + *hash = v.hash_one(hash_builder); + } + continue; + } + let value = view_bytes(view_len, v); + if REHASH { + *hash = combine_hashes(value.hash_one(hash_builder), *hash); + } else { + *hash = value.hash_one(hash_builder); + } + } +} + +#[cfg(not(feature = "force_hash_collisions"))] +fn hash_generic_byte_view_array_with_hasher( + array: &GenericByteViewArray, + hash_builder: &S, + hashes_buffer: &mut [u64], + rehash: bool, +) { + match ( + array.null_count() != 0, + !array.data_buffers().is_empty(), + rehash, + ) { + (false, false, false) => { + for (hash, &view) in hashes_buffer.iter_mut().zip(array.views().iter()) { + *hash = view.hash_one(hash_builder); + } + } + (false, false, true) => { + for (hash, &view) in hashes_buffer.iter_mut().zip(array.views().iter()) { + *hash = combine_hashes(view.hash_one(hash_builder), *hash); + } + } + (false, true, false) => { + hash_string_view_array_inner_with_hasher::( + array, + hash_builder, + hashes_buffer, + ) + } + (false, true, true) => { + hash_string_view_array_inner_with_hasher::( + array, + hash_builder, + hashes_buffer, + ) + } + (true, false, false) => { + hash_string_view_array_inner_with_hasher::( + array, + hash_builder, + hashes_buffer, + ) + } + (true, false, true) => { + hash_string_view_array_inner_with_hasher::( + array, + hash_builder, + hashes_buffer, + ) + } + (true, true, false) => { + hash_string_view_array_inner_with_hasher::( + array, + hash_builder, + hashes_buffer, + ) + } + (true, true, true) => { + hash_string_view_array_inner_with_hasher::( + array, + hash_builder, + hashes_buffer, + ) + } + } +} + /// Hash dictionary array with compile-time specialization for null handling. /// /// Uses const generics to eliminate runtim branching in the hot loop: @@ -511,13 +756,13 @@ fn hash_generic_byte_view_array( #[inline(never)] fn hash_dictionary_inner< K: ArrowDictionaryKeyType, - H: HashStrategy + ?Sized, + C: ChildHashing + ?Sized, const HAS_NULL_KEYS: bool, const HAS_NULL_VALUES: bool, const MULTI_COL: bool, >( array: &DictionaryArray, - hash_strategy: &H, + child_hashing: &C, hashes_buffer: &mut [u64], ) -> Result<()> { // Hash each dictionary value once, and then use that computed @@ -525,7 +770,7 @@ fn hash_dictionary_inner< // redundant hashing for large dictionary elements (e.g. strings) let dict_values = array.values(); let mut dict_hashes = vec![0; dict_values.len()]; - create_hashes_impl([dict_values], hash_strategy, &mut dict_hashes)?; + child_hashing.create_hashes([dict_values], &mut dict_hashes)?; if HAS_NULL_KEYS { for (hash, key) in hashes_buffer.iter_mut().zip(array.keys().iter()) { @@ -557,9 +802,9 @@ fn hash_dictionary_inner< /// Hash the values in a dictionary array #[cfg(not(feature = "force_hash_collisions"))] -fn hash_dictionary( +fn hash_dictionary( array: &DictionaryArray, - hash_strategy: &H, + child_hashing: &impl ChildHashing, hashes_buffer: &mut [u64], multi_col: bool, ) -> Result<()> { @@ -569,53 +814,53 @@ fn hash_dictionary( // Dispatcher based on null presence and multi-column mode // Should reduce branching within hot loops match (has_null_keys, has_null_values, multi_col) { - (false, false, false) => hash_dictionary_inner::( + (false, false, false) => hash_dictionary_inner::( array, - hash_strategy, + child_hashing, hashes_buffer, ), - (false, false, true) => hash_dictionary_inner::( + (false, false, true) => hash_dictionary_inner::( array, - hash_strategy, + child_hashing, hashes_buffer, ), - (false, true, false) => hash_dictionary_inner::( + (false, true, false) => hash_dictionary_inner::( array, - hash_strategy, + child_hashing, hashes_buffer, ), - (false, true, true) => hash_dictionary_inner::( + (false, true, true) => hash_dictionary_inner::( array, - hash_strategy, + child_hashing, hashes_buffer, ), - (true, false, false) => hash_dictionary_inner::( + (true, false, false) => hash_dictionary_inner::( array, - hash_strategy, + child_hashing, hashes_buffer, ), - (true, false, true) => hash_dictionary_inner::( + (true, false, true) => hash_dictionary_inner::( array, - hash_strategy, + child_hashing, hashes_buffer, ), - (true, true, false) => hash_dictionary_inner::( + (true, true, false) => hash_dictionary_inner::( array, - hash_strategy, + child_hashing, hashes_buffer, ), - (true, true, true) => hash_dictionary_inner::( + (true, true, true) => hash_dictionary_inner::( array, - hash_strategy, + child_hashing, hashes_buffer, ), } } #[cfg(not(feature = "force_hash_collisions"))] -fn hash_struct_array( +fn hash_struct_array( array: &StructArray, - hash_strategy: &H, + child_hashing: &impl ChildHashing, hashes_buffer: &mut [u64], ) -> Result<()> { let nulls = array.nulls(); @@ -623,7 +868,7 @@ fn hash_struct_array( // Create hashes for each row that combines the hashes over all the column at that row. let mut values_hashes = vec![0u64; row_len]; - create_hashes_impl(array.columns(), hash_strategy, &mut values_hashes)?; + child_hashing.create_hashes(array.columns(), &mut values_hashes)?; // Separate paths to avoid allocating Vec when there are no nulls if let Some(nulls) = nulls { @@ -643,9 +888,9 @@ fn hash_struct_array( // only adding this `cfg` b/c this function is only used with this `cfg` #[cfg(not(feature = "force_hash_collisions"))] -fn hash_map_array( +fn hash_map_array( array: &MapArray, - hash_strategy: &H, + child_hashing: &impl ChildHashing, hashes_buffer: &mut [u64], ) -> Result<()> { let nulls = array.nulls(); @@ -664,7 +909,7 @@ fn hash_map_array( .iter() .map(|col| col.slice(first_offset, entries_len)) .collect(); - create_hashes_impl(&sliced_columns, hash_strategy, &mut values_hashes)?; + child_hashing.create_hashes(&sliced_columns, &mut values_hashes)?; // Combine the hashes for entries on each row with each other and previous hash for that row // Adjust indices by first_offset since values_hashes is sliced starting from first_offset @@ -694,25 +939,23 @@ fn hash_map_array( } #[cfg(not(feature = "force_hash_collisions"))] -fn hash_list_array( +fn hash_list_array( array: &GenericListArray, - hash_strategy: &H, + child_hashing: &impl ChildHashing, hashes_buffer: &mut [u64], ) -> Result<()> where OffsetSize: OffsetSizeTrait, - H: HashStrategy + ?Sized, { // In case values is sliced, hash only the bytes used by the offsets of this ListArray let first_offset = array.value_offsets().first().cloned().unwrap_or_default(); let last_offset = array.value_offsets().last().cloned().unwrap_or_default(); let value_bytes_len = (last_offset - first_offset).as_usize(); let mut values_hashes = vec![0u64; value_bytes_len]; - create_hashes_impl( + child_hashing.create_hashes( [array .values() .slice(first_offset.as_usize(), value_bytes_len)], - hash_strategy, &mut values_hashes, )?; @@ -746,21 +989,20 @@ where } #[cfg(not(feature = "force_hash_collisions"))] -fn hash_list_view_array( +fn hash_list_view_array( array: &GenericListViewArray, - hash_strategy: &H, + child_hashing: &impl ChildHashing, hashes_buffer: &mut [u64], ) -> Result<()> where OffsetSize: OffsetSizeTrait, - H: HashStrategy + ?Sized, { let values = array.values(); let offsets = array.value_offsets(); let sizes = array.value_sizes(); let nulls = array.nulls(); let mut values_hashes = vec![0u64; values.len()]; - create_hashes_impl([values], hash_strategy, &mut values_hashes)?; + child_hashing.create_hashes([values], &mut values_hashes)?; if let Some(nulls) = nulls { for (i, (offset, size)) in offsets.iter().zip(sizes.iter()).enumerate() { if nulls.is_valid(i) { @@ -786,9 +1028,9 @@ where } #[cfg(not(feature = "force_hash_collisions"))] -fn hash_union_array( +fn hash_union_array( array: &UnionArray, - hash_strategy: &H, + child_hashing: &impl ChildHashing, hashes_buffer: &mut [u64], ) -> Result<()> { let DataType::Union(union_fields, _mode) = array.data_type() else { @@ -798,12 +1040,12 @@ fn hash_union_array( if array.is_dense() { // Dense union: children only contain values of their type, so they're already compact. // Use the default hashing approach which is efficient for dense unions. - hash_union_array_default(array, union_fields, hash_strategy, hashes_buffer) + hash_union_array_default(array, union_fields, child_hashing, hashes_buffer) } else { // Sparse union: each child has the same length as the union array. // Optimization: only hash the elements that are actually referenced by type_ids, // instead of hashing all K*N elements (where K = num types, N = array length). - hash_sparse_union_array(array, union_fields, hash_strategy, hashes_buffer) + hash_sparse_union_array(array, union_fields, child_hashing, hashes_buffer) } } @@ -817,10 +1059,10 @@ fn hash_union_array( /// `hash_sparse_union_array` is more efficient, but for 1-2 types or dense unions, /// this simpler approach is preferred. #[cfg(not(feature = "force_hash_collisions"))] -fn hash_union_array_default( +fn hash_union_array_default( array: &UnionArray, union_fields: &UnionFields, - hash_strategy: &H, + child_hashing: &impl ChildHashing, hashes_buffer: &mut [u64], ) -> Result<()> { let mut child_hashes: HashMap> = @@ -830,7 +1072,7 @@ fn hash_union_array_default( for (type_id, _field) in union_fields.iter() { let child = array.child(type_id); let mut child_hash_buffer = vec![0; child.len()]; - create_hashes_impl([child], hash_strategy, &mut child_hash_buffer)?; + child_hashing.create_hashes([child], &mut child_hash_buffer)?; child_hashes.insert(type_id, child_hash_buffer); } @@ -858,10 +1100,10 @@ fn hash_union_array_default( /// For 1-2 types, the overhead of take/scatter outweighs the benefit, so we use /// the default approach of hashing all children (same as dense unions). #[cfg(not(feature = "force_hash_collisions"))] -fn hash_sparse_union_array( +fn hash_sparse_union_array( array: &UnionArray, union_fields: &UnionFields, - hash_strategy: &H, + child_hashing: &impl ChildHashing, hashes_buffer: &mut [u64], ) -> Result<()> { use std::collections::HashMap; @@ -872,7 +1114,7 @@ fn hash_sparse_union_array( return hash_union_array_default( array, union_fields, - hash_strategy, + child_hashing, hashes_buffer, ); } @@ -900,7 +1142,7 @@ fn hash_sparse_union_array( // Hash the filtered array let mut filtered_hashes = vec![0u64; filtered.len()]; - create_hashes_impl([&filtered], hash_strategy, &mut filtered_hashes)?; + child_hashing.create_hashes([&filtered], &mut filtered_hashes)?; // Scatter hashes back to correct positions for (hash, &idx) in filtered_hashes.iter().zip(indices.iter()) { @@ -914,16 +1156,16 @@ fn hash_sparse_union_array( } #[cfg(not(feature = "force_hash_collisions"))] -fn hash_fixed_list_array( +fn hash_fixed_list_array( array: &FixedSizeListArray, - hash_strategy: &H, + child_hashing: &impl ChildHashing, hashes_buffer: &mut [u64], ) -> Result<()> { let values = array.values(); let value_length = array.value_length() as usize; let nulls = array.nulls(); let mut values_hashes = vec![0u64; values.len()]; - create_hashes_impl([values], hash_strategy, &mut values_hashes)?; + child_hashing.create_hashes([values], &mut values_hashes)?; if let Some(nulls) = nulls { for i in 0..array.len() { if nulls.is_valid(i) { @@ -951,12 +1193,12 @@ fn hash_fixed_list_array( #[cfg(not(feature = "force_hash_collisions"))] fn hash_run_array_inner< R: RunEndIndexType, - H: HashStrategy + ?Sized, + C: ChildHashing + ?Sized, const HAS_NULL_VALUES: bool, const REHASH: bool, >( array: &RunArray, - hash_strategy: &H, + child_hashing: &C, hashes_buffer: &mut [u64], ) -> Result<()> { // We find the relevant runs that cover potentially sliced arrays, so we can only hash those @@ -983,11 +1225,8 @@ fn hash_run_array_inner< end_physical_index - start_physical_index, ); let mut values_hashes = vec![0u64; sliced_values.len()]; - create_hashes_impl( - std::slice::from_ref(&sliced_values), - hash_strategy, - &mut values_hashes, - )?; + child_hashing + .create_hashes(std::slice::from_ref(&sliced_values), &mut values_hashes)?; let mut start_in_slice = 0; for (adjusted_physical_index, &absolute_run_end) in run_ends_values @@ -1021,28 +1260,28 @@ fn hash_run_array_inner< } #[cfg(not(feature = "force_hash_collisions"))] -fn hash_run_array( +fn hash_run_array( array: &RunArray, - hash_strategy: &H, + child_hashing: &impl ChildHashing, hashes_buffer: &mut [u64], rehash: bool, ) -> Result<()> { let has_null_values = array.values().null_count() != 0; match (has_null_values, rehash) { - (false, false) => hash_run_array_inner::( + (false, false) => hash_run_array_inner::( array, - hash_strategy, + child_hashing, hashes_buffer, ), (false, true) => { - hash_run_array_inner::(array, hash_strategy, hashes_buffer) + hash_run_array_inner::(array, child_hashing, hashes_buffer) } (true, false) => { - hash_run_array_inner::(array, hash_strategy, hashes_buffer) + hash_run_array_inner::(array, child_hashing, hashes_buffer) } (true, true) => { - hash_run_array_inner::(array, hash_strategy, hashes_buffer) + hash_run_array_inner::(array, child_hashing, hashes_buffer) } } } @@ -1050,64 +1289,68 @@ fn hash_run_array( /// Internal helper function that hashes a single array and either initializes or combines /// the hash values in the buffer. #[cfg(not(feature = "force_hash_collisions"))] -fn hash_single_array( +fn hash_single_array( array: &dyn Array, - hash_strategy: &H, + random_state: &RandomState, hashes_buffer: &mut [u64], rehash: bool, ) -> Result<()> { + // Nested types recurse through ChildHashing; primitive/string/binary cases + // stay on the dedicated RandomState fast path above. + let child_hashing = RandomStateChildHashing { random_state }; + downcast_primitive_array! { - array => hash_array_primitive(array, hash_strategy, hashes_buffer, rehash), - DataType::Null => hash_null(hash_strategy, hashes_buffer, rehash), - DataType::Boolean => hash_array(&as_boolean_array(array)?, hash_strategy, hashes_buffer, rehash), - DataType::Utf8 => hash_array(&as_string_array(array)?, hash_strategy, hashes_buffer, rehash), - DataType::Utf8View => hash_generic_byte_view_array(as_string_view_array(array)?, hash_strategy, hashes_buffer, rehash), - DataType::LargeUtf8 => hash_array(&as_largestring_array(array), hash_strategy, hashes_buffer, rehash), - DataType::Binary => hash_array(&as_generic_binary_array::(array)?, hash_strategy, hashes_buffer, rehash), - DataType::BinaryView => hash_generic_byte_view_array(as_binary_view_array(array)?, hash_strategy, hashes_buffer, rehash), - DataType::LargeBinary => hash_array(&as_generic_binary_array::(array)?, hash_strategy, hashes_buffer, rehash), + array => hash_array_primitive(array, random_state, hashes_buffer, rehash), + DataType::Null => hash_null(random_state, hashes_buffer, rehash), + DataType::Boolean => hash_array(&as_boolean_array(array)?, random_state, hashes_buffer, rehash), + DataType::Utf8 => hash_array(&as_string_array(array)?, random_state, hashes_buffer, rehash), + DataType::Utf8View => hash_generic_byte_view_array(as_string_view_array(array)?, random_state, hashes_buffer, rehash), + DataType::LargeUtf8 => hash_array(&as_largestring_array(array), random_state, hashes_buffer, rehash), + DataType::Binary => hash_array(&as_generic_binary_array::(array)?, random_state, hashes_buffer, rehash), + DataType::BinaryView => hash_generic_byte_view_array(as_binary_view_array(array)?, random_state, hashes_buffer, rehash), + DataType::LargeBinary => hash_array(&as_generic_binary_array::(array)?, random_state, hashes_buffer, rehash), DataType::FixedSizeBinary(_) => { let array: &FixedSizeBinaryArray = array.as_any().downcast_ref().unwrap(); - hash_array(&array, hash_strategy, hashes_buffer, rehash) + hash_array(&array, random_state, hashes_buffer, rehash) } DataType::Dictionary(_, _) => downcast_dictionary_array! { - array => hash_dictionary(array, hash_strategy, hashes_buffer, rehash)?, + array => hash_dictionary(array, &child_hashing, hashes_buffer, rehash)?, _ => unreachable!() } DataType::Struct(_) => { let array = as_struct_array(array)?; - hash_struct_array(array, hash_strategy, hashes_buffer)?; + hash_struct_array(array, &child_hashing, hashes_buffer)?; } DataType::List(_) => { let array = as_list_array(array)?; - hash_list_array(array, hash_strategy, hashes_buffer)?; + hash_list_array(array, &child_hashing, hashes_buffer)?; } DataType::LargeList(_) => { let array = as_large_list_array(array)?; - hash_list_array(array, hash_strategy, hashes_buffer)?; + hash_list_array(array, &child_hashing, hashes_buffer)?; } DataType::ListView(_) => { let array = as_list_view_array(array)?; - hash_list_view_array(array, hash_strategy, hashes_buffer)?; + hash_list_view_array(array, &child_hashing, hashes_buffer)?; } DataType::LargeListView(_) => { let array = as_large_list_view_array(array)?; - hash_list_view_array(array, hash_strategy, hashes_buffer)?; + hash_list_view_array(array, &child_hashing, hashes_buffer)?; } DataType::Map(_, _) => { let array = as_map_array(array)?; - hash_map_array(array, hash_strategy, hashes_buffer)?; + hash_map_array(array, &child_hashing, hashes_buffer)?; } DataType::FixedSizeList(_,_) => { let array = as_fixed_size_list_array(array)?; - hash_fixed_list_array(array, hash_strategy, hashes_buffer)?; + hash_fixed_list_array(array, &child_hashing, hashes_buffer)?; } DataType::Union(_, _) => { let array = as_union_array(array)?; - hash_union_array(array, hash_strategy, hashes_buffer)?; + hash_union_array(array, &child_hashing, hashes_buffer)?; } DataType::RunEndEncoded(_, _) => downcast_run_array! { - array => hash_run_array(array, hash_strategy, hashes_buffer, rehash)?, + array => hash_run_array(array, &child_hashing, hashes_buffer, rehash)?, _ => unreachable!() } _ => { @@ -1121,11 +1364,99 @@ fn hash_single_array( Ok(()) } +#[cfg(not(feature = "force_hash_collisions"))] +fn hash_single_array_with_hasher( + array: &dyn Array, + hash_builder: &S, + hashes_buffer: &mut [u64], + rehash: bool, +) -> Result<()> { + // The custom-hasher path reuses the shared structural combiners but keeps + // its own leaf dispatch and rehash semantics. + let child_hashing = BuildHasherChildHashing { hash_builder }; + + downcast_primitive_array! { + array => hash_array_primitive_with_hasher(array, hash_builder, hashes_buffer, rehash), + DataType::Null => hash_null_with_hasher(hash_builder, hashes_buffer, rehash), + DataType::Boolean => hash_array_with_hasher(&as_boolean_array(array)?, hash_builder, hashes_buffer, rehash), + DataType::Utf8 => hash_array_with_hasher(&as_string_array(array)?, hash_builder, hashes_buffer, rehash), + DataType::Utf8View => hash_generic_byte_view_array_with_hasher(as_string_view_array(array)?, hash_builder, hashes_buffer, rehash), + DataType::LargeUtf8 => hash_array_with_hasher(&as_largestring_array(array), hash_builder, hashes_buffer, rehash), + DataType::Binary => hash_array_with_hasher(&as_generic_binary_array::(array)?, hash_builder, hashes_buffer, rehash), + DataType::BinaryView => hash_generic_byte_view_array_with_hasher(as_binary_view_array(array)?, hash_builder, hashes_buffer, rehash), + DataType::LargeBinary => hash_array_with_hasher(&as_generic_binary_array::(array)?, hash_builder, hashes_buffer, rehash), + DataType::FixedSizeBinary(_) => { + let array: &FixedSizeBinaryArray = array.as_any().downcast_ref().unwrap(); + hash_array_with_hasher(&array, hash_builder, hashes_buffer, rehash) + } + DataType::Dictionary(_, _) => downcast_dictionary_array! { + array => hash_dictionary(array, &child_hashing, hashes_buffer, rehash)?, + _ => unreachable!() + } + DataType::Struct(_) => { + let array = as_struct_array(array)?; + hash_struct_array(array, &child_hashing, hashes_buffer)?; + } + DataType::List(_) => { + let array = as_list_array(array)?; + hash_list_array(array, &child_hashing, hashes_buffer)?; + } + DataType::LargeList(_) => { + let array = as_large_list_array(array)?; + hash_list_array(array, &child_hashing, hashes_buffer)?; + } + DataType::ListView(_) => { + let array = as_list_view_array(array)?; + hash_list_view_array(array, &child_hashing, hashes_buffer)?; + } + DataType::LargeListView(_) => { + let array = as_large_list_view_array(array)?; + hash_list_view_array(array, &child_hashing, hashes_buffer)?; + } + DataType::Map(_, _) => { + let array = as_map_array(array)?; + hash_map_array(array, &child_hashing, hashes_buffer)?; + } + DataType::FixedSizeList(_,_) => { + let array = as_fixed_size_list_array(array)?; + hash_fixed_list_array(array, &child_hashing, hashes_buffer)?; + } + DataType::Union(_, _) => { + let array = as_union_array(array)?; + hash_union_array(array, &child_hashing, hashes_buffer)?; + } + DataType::RunEndEncoded(_, _) => downcast_run_array! { + array => hash_run_array(array, &child_hashing, hashes_buffer, rehash)?, + _ => unreachable!() + } + _ => { + return _internal_err!( + "Unsupported data type in hasher: {}", + array.data_type() + ); + } + } + Ok(()) +} + /// Test version of `hash_single_array` that forces all hashes to collide to zero. #[cfg(feature = "force_hash_collisions")] -fn hash_single_array( +fn hash_single_array( + _array: &dyn Array, + _random_state: &RandomState, + hashes_buffer: &mut [u64], + _rehash: bool, +) -> Result<()> { + for hash in hashes_buffer.iter_mut() { + *hash = 0 + } + Ok(()) +} + +#[cfg(feature = "force_hash_collisions")] +fn hash_single_array_with_hasher( _array: &dyn Array, - _hash_strategy: &H, + _hash_builder: &S, hashes_buffer: &mut [u64], _rehash: bool, ) -> Result<()> { @@ -1172,20 +1503,24 @@ impl AsDynArray for &ArrayRef { } } -fn create_hashes_impl<'a, I, T, H>( +fn create_hashes_with_hasher_impl<'a, I, T, S>( arrays: I, - hash_strategy: &H, + hash_builder: &S, hashes_buffer: &'a mut [u64], ) -> Result<&'a mut [u64]> where I: IntoIterator, T: AsDynArray, - H: HashStrategy + ?Sized, + S: BuildHasher, { for (i, array) in arrays.into_iter().enumerate() { - // combine hashes with `combine_hashes` for all columns besides the first let rehash = i >= 1; - hash_single_array(array.as_dyn_array(), hash_strategy, hashes_buffer, rehash)?; + hash_single_array_with_hasher( + array.as_dyn_array(), + hash_builder, + hashes_buffer, + rehash, + )?; } Ok(hashes_buffer) } @@ -1203,13 +1538,15 @@ where I: IntoIterator, T: AsDynArray, { - create_hashes_impl(arrays, &DefaultHashStrategy { random_state }, hashes_buffer) + // Keep the default RandomState path concrete for the same benchmark reason as with_hashes. + for (i, array) in arrays.into_iter().enumerate() { + let rehash = i >= 1; + hash_single_array(array.as_dyn_array(), random_state, hashes_buffer, rehash)?; + } + Ok(hashes_buffer) } /// Creates hash values for every row using a caller-provided hash builder. -/// -/// The number of rows to hash is determined by `hashes_buffer.len()`. -/// `hashes_buffer` should be pre-sized appropriately. pub fn create_hashes_with_hasher<'a, I, T, S>( arrays: I, hash_builder: &S, @@ -1220,7 +1557,7 @@ where T: AsDynArray, S: BuildHasher, { - create_hashes_impl(arrays, &CustomHashStrategy { hash_builder }, hashes_buffer) + create_hashes_with_hasher_impl(arrays, hash_builder, hashes_buffer) } #[cfg(test)] @@ -2015,8 +2352,51 @@ mod tests { ); } + #[test] #[cfg(not(feature = "force_hash_collisions"))] + fn test_create_hashes_large_utf8_multi_column_rehash_matches_combine() { + let int_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4])); + let large_utf8: ArrayRef = Arc::new(LargeStringArray::from(vec![ + Some("repeat"), + None, + Some("repeat"), + Some("different"), + ])); + let random_state = RandomState::with_seed(0); + + let mut first_col_hashes = vec![0; int_array.len()]; + create_hashes([&int_array], &random_state, &mut first_col_hashes).unwrap(); + + let mut combined_hashes = vec![0; int_array.len()]; + create_hashes( + [&int_array, &large_utf8], + &random_state, + &mut combined_hashes, + ) + .unwrap(); + + let large_utf8 = large_utf8 + .as_any() + .downcast_ref::() + .unwrap(); + let expected = (0..large_utf8.len()) + .map(|i| { + if large_utf8.is_valid(i) { + combine_hashes( + large_utf8.value(i).hash_one(&random_state), + first_col_hashes[i], + ) + } else { + first_col_hashes[i] + } + }) + .collect::>(); + + assert_eq!(combined_hashes, expected); + } + #[test] + #[cfg(not(feature = "force_hash_collisions"))] fn test_create_hashes_with_custom_hasher() { let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 1, 4])); let hash_builder = BuildHasherDefault::::default(); From 501b2237a842801b7c3ab1ccccb5a7be4adcd338 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Thu, 9 Apr 2026 16:07:29 +0800 Subject: [PATCH 6/9] datafusion-common: restore hash_utils leaf fast path --- datafusion/common/src/hash_utils.rs | 75 ++++++++++++++++++++--------- 1 file changed, 53 insertions(+), 22 deletions(-) diff --git a/datafusion/common/src/hash_utils.rs b/datafusion/common/src/hash_utils.rs index f4bbbf7656784..33ea7292cf26d 100644 --- a/datafusion/common/src/hash_utils.rs +++ b/datafusion/common/src/hash_utils.rs @@ -209,13 +209,13 @@ fn hash_null(random_state: &RandomState, hashes_buffer: &'_ mut [u64], mul_col: } pub trait HashValue { - fn hash_one(&self, state: &S) -> u64; + fn hash_one(&self, state: &RandomState) -> u64; /// Write this value into an existing hasher (same data as `hash_one`). fn hash_write(&self, hasher: &mut impl Hasher); } impl HashValue for &T { - fn hash_one(&self, state: &S) -> u64 { + fn hash_one(&self, state: &RandomState) -> u64 { T::hash_one(self, state) } fn hash_write(&self, hasher: &mut impl Hasher) { @@ -223,15 +223,35 @@ impl HashValue for &T { } } +#[cfg(not(feature = "force_hash_collisions"))] +// Keep custom BuildHasher leaf hashing off the default RandomState fast path. +trait BuildHasherHashValue { + fn hash_one_with_hasher(&self, state: &S) -> u64; +} + +#[cfg(not(feature = "force_hash_collisions"))] +impl BuildHasherHashValue for &T { + fn hash_one_with_hasher(&self, state: &S) -> u64 { + T::hash_one_with_hasher(self, state) + } +} + macro_rules! hash_value { ($($t:ty),+) => { $(impl HashValue for $t { - fn hash_one(&self, state: &S) -> u64 { + fn hash_one(&self, state: &RandomState) -> u64 { state.hash_one(self) } fn hash_write(&self, hasher: &mut impl Hasher) { Hash::hash(self, hasher) } + } + + #[cfg(not(feature = "force_hash_collisions"))] + impl BuildHasherHashValue for $t { + fn hash_one_with_hasher(&self, state: &S) -> u64 { + state.hash_one(self) + } })+ }; } @@ -241,12 +261,19 @@ hash_value!(bool, str, [u8], IntervalDayTime, IntervalMonthDayNano); macro_rules! hash_float_value { ($(($t:ty, $i:ty)),+) => { $(impl HashValue for $t { - fn hash_one(&self, state: &S) -> u64 { + fn hash_one(&self, state: &RandomState) -> u64 { state.hash_one(<$i>::from_ne_bytes(self.to_ne_bytes())) } fn hash_write(&self, hasher: &mut impl Hasher) { hasher.write(&self.to_ne_bytes()) } + } + + #[cfg(not(feature = "force_hash_collisions"))] + impl BuildHasherHashValue for $t { + fn hash_one_with_hasher(&self, state: &S) -> u64 { + state.hash_one(<$i>::from_ne_bytes(self.to_ne_bytes())) + } })+ }; } @@ -551,7 +578,7 @@ fn hash_array_primitive_with_hasher( hashes_buffer: &mut [u64], rehash: bool, ) where - T: ArrowPrimitiveType, + T: ArrowPrimitiveType, S: BuildHasher, { assert_eq!( @@ -563,23 +590,25 @@ fn hash_array_primitive_with_hasher( if array.null_count() == 0 { if rehash { for (hash, &value) in hashes_buffer.iter_mut().zip(array.values().iter()) { - *hash = combine_hashes(value.hash_one(hash_builder), *hash); + *hash = combine_hashes(value.hash_one_with_hasher(hash_builder), *hash); } } else { for (hash, &value) in hashes_buffer.iter_mut().zip(array.values().iter()) { - *hash = value.hash_one(hash_builder); + *hash = value.hash_one_with_hasher(hash_builder); } } } else if rehash { for i in array.nulls().unwrap().valid_indices() { let value = unsafe { array.value_unchecked(i) }; - hashes_buffer[i] = - combine_hashes(value.hash_one(hash_builder), hashes_buffer[i]); + hashes_buffer[i] = combine_hashes( + value.hash_one_with_hasher(hash_builder), + hashes_buffer[i], + ); } } else { for i in array.nulls().unwrap().valid_indices() { let value = unsafe { array.value_unchecked(i) }; - hashes_buffer[i] = value.hash_one(hash_builder); + hashes_buffer[i] = value.hash_one_with_hasher(hash_builder); } } } @@ -592,7 +621,7 @@ fn hash_array_with_hasher( rehash: bool, ) where T: ArrayAccessor, - T::Item: HashValue, + T::Item: BuildHasherHashValue, S: BuildHasher, { assert_eq!( @@ -605,24 +634,26 @@ fn hash_array_with_hasher( if rehash { for (i, hash) in hashes_buffer.iter_mut().enumerate() { let value = unsafe { array.value_unchecked(i) }; - *hash = combine_hashes(value.hash_one(hash_builder), *hash); + *hash = combine_hashes(value.hash_one_with_hasher(hash_builder), *hash); } } else { for (i, hash) in hashes_buffer.iter_mut().enumerate() { let value = unsafe { array.value_unchecked(i) }; - *hash = value.hash_one(hash_builder); + *hash = value.hash_one_with_hasher(hash_builder); } } } else if rehash { for i in array.nulls().unwrap().valid_indices() { let value = unsafe { array.value_unchecked(i) }; - hashes_buffer[i] = - combine_hashes(value.hash_one(hash_builder), hashes_buffer[i]); + hashes_buffer[i] = combine_hashes( + value.hash_one_with_hasher(hash_builder), + hashes_buffer[i], + ); } } else { for i in array.nulls().unwrap().valid_indices() { let value = unsafe { array.value_unchecked(i) }; - hashes_buffer[i] = value.hash_one(hash_builder); + hashes_buffer[i] = value.hash_one_with_hasher(hash_builder); } } } @@ -664,17 +695,17 @@ fn hash_string_view_array_inner_with_hasher< let view_len = v as u32; if !HAS_BUFFERS || view_len <= 12 { if REHASH { - *hash = combine_hashes(v.hash_one(hash_builder), *hash); + *hash = combine_hashes(v.hash_one_with_hasher(hash_builder), *hash); } else { - *hash = v.hash_one(hash_builder); + *hash = v.hash_one_with_hasher(hash_builder); } continue; } let value = view_bytes(view_len, v); if REHASH { - *hash = combine_hashes(value.hash_one(hash_builder), *hash); + *hash = combine_hashes(value.hash_one_with_hasher(hash_builder), *hash); } else { - *hash = value.hash_one(hash_builder); + *hash = value.hash_one_with_hasher(hash_builder); } } } @@ -693,12 +724,12 @@ fn hash_generic_byte_view_array_with_hasher( ) { (false, false, false) => { for (hash, &view) in hashes_buffer.iter_mut().zip(array.views().iter()) { - *hash = view.hash_one(hash_builder); + *hash = view.hash_one_with_hasher(hash_builder); } } (false, false, true) => { for (hash, &view) in hashes_buffer.iter_mut().zip(array.views().iter()) { - *hash = combine_hashes(view.hash_one(hash_builder), *hash); + *hash = combine_hashes(view.hash_one_with_hasher(hash_builder), *hash); } } (false, true, false) => { From 87812d29f563bbb568e66f45d4d81b40a872516f Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Fri, 10 Apr 2026 09:27:08 +0800 Subject: [PATCH 7/9] datafusion-common: align parquet benchmark features --- datafusion/common/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index abe2ff718d095..d0c33a114af7c 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -49,7 +49,7 @@ parquet_encryption = [ ] force_hash_collisions = [] recursive_protection = ["dep:recursive"] -parquet = ["dep:parquet", "object_store"] +parquet = ["dep:parquet"] sql = ["sqlparser"] [[bench]] From 3a3bfc5760f980481c632d14b8225ea6955ebf04 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Wed, 15 Apr 2026 13:12:33 +0800 Subject: [PATCH 8/9] perf: fix dictionary hash regression by removing hasher type param from hot loop The #[inline(never)] hash_dictionary_inner carried a C: ChildHashing type parameter, causing different monomorphization for the RandomState path vs original code. This regressed dictionary_utf8_int32 by ~15%. Restructure: separate "hash dict values" (differs per hasher) from "scatter hashes to keys" (shared hot loop). The new hash_dictionary_scatter takes dict_hashes: &[u64] with no hasher parameter, so both paths call the exact same monomorphization. Co-Authored-By: Claude Opus 4.6 --- datafusion/common/src/hash_utils.rs | 103 ++++++++++++++++++---------- 1 file changed, 66 insertions(+), 37 deletions(-) diff --git a/datafusion/common/src/hash_utils.rs b/datafusion/common/src/hash_utils.rs index 33ea7292cf26d..4dfe0e40ef3f5 100644 --- a/datafusion/common/src/hash_utils.rs +++ b/datafusion/common/src/hash_utils.rs @@ -777,32 +777,30 @@ fn hash_generic_byte_view_array_with_hasher( } } -/// Hash dictionary array with compile-time specialization for null handling. +/// Scatter pre-computed dictionary value hashes to key positions. /// -/// Uses const generics to eliminate runtim branching in the hot loop: +/// Uses const generics to eliminate runtime branching in the hot loop: /// - `HAS_NULL_KEYS`: Whether to check for null dictionary keys /// - `HAS_NULL_VALUES`: Whether to check for null dictionary values /// - `MULTI_COL`: Whether to combine with existing hash (true) or initialize (false) +/// +/// Dictionary value hashing is done by the caller (`hash_dictionary` / +/// `hash_dictionary_with_child_hashing`) so this function has no hasher +/// type parameter — both the RandomState and custom-hasher paths call the +/// exact same monomorphization, preserving codegen quality. #[cfg(not(feature = "force_hash_collisions"))] #[inline(never)] -fn hash_dictionary_inner< +fn hash_dictionary_scatter< K: ArrowDictionaryKeyType, - C: ChildHashing + ?Sized, const HAS_NULL_KEYS: bool, const HAS_NULL_VALUES: bool, const MULTI_COL: bool, >( array: &DictionaryArray, - child_hashing: &C, + dict_hashes: &[u64], hashes_buffer: &mut [u64], -) -> Result<()> { - // Hash each dictionary value once, and then use that computed - // hash for each key value to avoid a potentially expensive - // redundant hashing for large dictionary elements (e.g. strings) +) { let dict_values = array.values(); - let mut dict_hashes = vec![0; dict_values.len()]; - child_hashing.create_hashes([dict_values], &mut dict_hashes)?; - if HAS_NULL_KEYS { for (hash, key) in hashes_buffer.iter_mut().zip(array.keys().iter()) { if let Some(key) = key { @@ -828,66 +826,97 @@ fn hash_dictionary_inner< } } } - Ok(()) } -/// Hash the values in a dictionary array +/// Dispatch to the correct `hash_dictionary_scatter` variant based on +/// null presence and multi-column mode. #[cfg(not(feature = "force_hash_collisions"))] -fn hash_dictionary( +fn dispatch_dictionary_scatter( array: &DictionaryArray, - child_hashing: &impl ChildHashing, + dict_hashes: &[u64], hashes_buffer: &mut [u64], multi_col: bool, -) -> Result<()> { +) { let has_null_keys = array.keys().null_count() != 0; let has_null_values = array.values().null_count() != 0; - // Dispatcher based on null presence and multi-column mode - // Should reduce branching within hot loops match (has_null_keys, has_null_values, multi_col) { - (false, false, false) => hash_dictionary_inner::( + (false, false, false) => hash_dictionary_scatter::( array, - child_hashing, + dict_hashes, hashes_buffer, ), - (false, false, true) => hash_dictionary_inner::( + (false, false, true) => hash_dictionary_scatter::( array, - child_hashing, + dict_hashes, hashes_buffer, ), - (false, true, false) => hash_dictionary_inner::( + (false, true, false) => hash_dictionary_scatter::( array, - child_hashing, + dict_hashes, hashes_buffer, ), - (false, true, true) => hash_dictionary_inner::( + (false, true, true) => hash_dictionary_scatter::( array, - child_hashing, + dict_hashes, hashes_buffer, ), - (true, false, false) => hash_dictionary_inner::( + (true, false, false) => hash_dictionary_scatter::( array, - child_hashing, + dict_hashes, hashes_buffer, ), - (true, false, true) => hash_dictionary_inner::( + (true, false, true) => hash_dictionary_scatter::( array, - child_hashing, + dict_hashes, hashes_buffer, ), - (true, true, false) => hash_dictionary_inner::( + (true, true, false) => hash_dictionary_scatter::( array, - child_hashing, + dict_hashes, hashes_buffer, ), - (true, true, true) => hash_dictionary_inner::( + (true, true, true) => hash_dictionary_scatter::( array, - child_hashing, + dict_hashes, hashes_buffer, ), } } +/// Hash the values in a dictionary array using the default RandomState path. +#[cfg(not(feature = "force_hash_collisions"))] +fn hash_dictionary( + array: &DictionaryArray, + random_state: &RandomState, + hashes_buffer: &mut [u64], + multi_col: bool, +) -> Result<()> { + // Hash each dictionary value once, and then use that computed + // hash for each key value to avoid a potentially expensive + // redundant hashing for large dictionary elements (e.g. strings) + let dict_values = array.values(); + let mut dict_hashes = vec![0; dict_values.len()]; + create_hashes([dict_values], random_state, &mut dict_hashes)?; + dispatch_dictionary_scatter(array, &dict_hashes, hashes_buffer, multi_col); + Ok(()) +} + +/// Hash the values in a dictionary array using a custom hasher. +#[cfg(not(feature = "force_hash_collisions"))] +fn hash_dictionary_with_child_hashing( + array: &DictionaryArray, + child_hashing: &impl ChildHashing, + hashes_buffer: &mut [u64], + multi_col: bool, +) -> Result<()> { + let dict_values = array.values(); + let mut dict_hashes = vec![0; dict_values.len()]; + child_hashing.create_hashes([dict_values], &mut dict_hashes)?; + dispatch_dictionary_scatter(array, &dict_hashes, hashes_buffer, multi_col); + Ok(()) +} + #[cfg(not(feature = "force_hash_collisions"))] fn hash_struct_array( array: &StructArray, @@ -1345,7 +1374,7 @@ fn hash_single_array( hash_array(&array, random_state, hashes_buffer, rehash) } DataType::Dictionary(_, _) => downcast_dictionary_array! { - array => hash_dictionary(array, &child_hashing, hashes_buffer, rehash)?, + array => hash_dictionary(array, random_state, hashes_buffer, rehash)?, _ => unreachable!() } DataType::Struct(_) => { @@ -1421,7 +1450,7 @@ fn hash_single_array_with_hasher( hash_array_with_hasher(&array, hash_builder, hashes_buffer, rehash) } DataType::Dictionary(_, _) => downcast_dictionary_array! { - array => hash_dictionary(array, &child_hashing, hashes_buffer, rehash)?, + array => hash_dictionary_with_child_hashing(array, &child_hashing, hashes_buffer, rehash)?, _ => unreachable!() } DataType::Struct(_) => { From 1ad2c2d0a0dc5e967873383add5307ca9fb717bb Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Wed, 15 Apr 2026 14:27:10 +0800 Subject: [PATCH 9/9] perf: move BuildHasher leaf functions after RandomState code for I-cache locality The _with_hasher leaf functions (~600 lines) were interleaved between the hot RandomState functions, polluting the instruction cache for the default path. Move all BuildHasherHashValue trait/impls and _with_hasher leaf functions to the end of the file (after create_hashes_with_hasher, before tests) so the hot default path stays tightly packed. Co-Authored-By: Claude Opus 4.6 --- datafusion/common/src/hash_utils.rs | 517 ++++++++++++++-------------- 1 file changed, 265 insertions(+), 252 deletions(-) diff --git a/datafusion/common/src/hash_utils.rs b/datafusion/common/src/hash_utils.rs index 4dfe0e40ef3f5..d6c50b2a7ce5e 100644 --- a/datafusion/common/src/hash_utils.rs +++ b/datafusion/common/src/hash_utils.rs @@ -223,19 +223,6 @@ impl HashValue for &T { } } -#[cfg(not(feature = "force_hash_collisions"))] -// Keep custom BuildHasher leaf hashing off the default RandomState fast path. -trait BuildHasherHashValue { - fn hash_one_with_hasher(&self, state: &S) -> u64; -} - -#[cfg(not(feature = "force_hash_collisions"))] -impl BuildHasherHashValue for &T { - fn hash_one_with_hasher(&self, state: &S) -> u64 { - T::hash_one_with_hasher(self, state) - } -} - macro_rules! hash_value { ($($t:ty),+) => { $(impl HashValue for $t { @@ -245,13 +232,6 @@ macro_rules! hash_value { fn hash_write(&self, hasher: &mut impl Hasher) { Hash::hash(self, hasher) } - } - - #[cfg(not(feature = "force_hash_collisions"))] - impl BuildHasherHashValue for $t { - fn hash_one_with_hasher(&self, state: &S) -> u64 { - state.hash_one(self) - } })+ }; } @@ -267,13 +247,6 @@ macro_rules! hash_float_value { fn hash_write(&self, hasher: &mut impl Hasher) { hasher.write(&self.to_ne_bytes()) } - } - - #[cfg(not(feature = "force_hash_collisions"))] - impl BuildHasherHashValue for $t { - fn hash_one_with_hasher(&self, state: &S) -> u64 { - state.hash_one(<$i>::from_ne_bytes(self.to_ne_bytes())) - } })+ }; } @@ -552,231 +525,6 @@ fn hash_generic_byte_view_array( } } -#[cfg(not(feature = "force_hash_collisions"))] -// The custom BuildHasher path intentionally mirrors the leaf helpers so the -// default path does not route its tight loops through generic abstractions. -fn hash_null_with_hasher( - hash_builder: &S, - hashes_buffer: &mut [u64], - mul_col: bool, -) { - if mul_col { - hashes_buffer.iter_mut().for_each(|hash| { - *hash = combine_hashes(hash_builder.hash_one(1), *hash); - }) - } else { - hashes_buffer.iter_mut().for_each(|hash| { - *hash = hash_builder.hash_one(1); - }) - } -} - -#[cfg(not(feature = "force_hash_collisions"))] -fn hash_array_primitive_with_hasher( - array: &PrimitiveArray, - hash_builder: &S, - hashes_buffer: &mut [u64], - rehash: bool, -) where - T: ArrowPrimitiveType, - S: BuildHasher, -{ - assert_eq!( - hashes_buffer.len(), - array.len(), - "hashes_buffer and array should be of equal length" - ); - - if array.null_count() == 0 { - if rehash { - for (hash, &value) in hashes_buffer.iter_mut().zip(array.values().iter()) { - *hash = combine_hashes(value.hash_one_with_hasher(hash_builder), *hash); - } - } else { - for (hash, &value) in hashes_buffer.iter_mut().zip(array.values().iter()) { - *hash = value.hash_one_with_hasher(hash_builder); - } - } - } else if rehash { - for i in array.nulls().unwrap().valid_indices() { - let value = unsafe { array.value_unchecked(i) }; - hashes_buffer[i] = combine_hashes( - value.hash_one_with_hasher(hash_builder), - hashes_buffer[i], - ); - } - } else { - for i in array.nulls().unwrap().valid_indices() { - let value = unsafe { array.value_unchecked(i) }; - hashes_buffer[i] = value.hash_one_with_hasher(hash_builder); - } - } -} - -#[cfg(not(feature = "force_hash_collisions"))] -fn hash_array_with_hasher( - array: &T, - hash_builder: &S, - hashes_buffer: &mut [u64], - rehash: bool, -) where - T: ArrayAccessor, - T::Item: BuildHasherHashValue, - S: BuildHasher, -{ - assert_eq!( - hashes_buffer.len(), - array.len(), - "hashes_buffer and array should be of equal length" - ); - - if array.null_count() == 0 { - if rehash { - for (i, hash) in hashes_buffer.iter_mut().enumerate() { - let value = unsafe { array.value_unchecked(i) }; - *hash = combine_hashes(value.hash_one_with_hasher(hash_builder), *hash); - } - } else { - for (i, hash) in hashes_buffer.iter_mut().enumerate() { - let value = unsafe { array.value_unchecked(i) }; - *hash = value.hash_one_with_hasher(hash_builder); - } - } - } else if rehash { - for i in array.nulls().unwrap().valid_indices() { - let value = unsafe { array.value_unchecked(i) }; - hashes_buffer[i] = combine_hashes( - value.hash_one_with_hasher(hash_builder), - hashes_buffer[i], - ); - } - } else { - for i in array.nulls().unwrap().valid_indices() { - let value = unsafe { array.value_unchecked(i) }; - hashes_buffer[i] = value.hash_one_with_hasher(hash_builder); - } - } -} - -#[cfg(not(feature = "force_hash_collisions"))] -#[inline(never)] -fn hash_string_view_array_inner_with_hasher< - T: ByteViewType, - S: BuildHasher, - const HAS_NULLS: bool, - const HAS_BUFFERS: bool, - const REHASH: bool, ->( - array: &GenericByteViewArray, - hash_builder: &S, - hashes_buffer: &mut [u64], -) { - assert_eq!( - hashes_buffer.len(), - array.len(), - "hashes_buffer and array should be of equal length" - ); - - let buffers = array.data_buffers(); - let view_bytes = |view_len: u32, view: u128| { - let view = ByteView::from(view); - let offset = view.offset as usize; - unsafe { - let data = buffers.get_unchecked(view.buffer_index as usize); - data.get_unchecked(offset..offset + view_len as usize) - } - }; - - let hashes_and_views = hashes_buffer.iter_mut().zip(array.views().iter()); - for (i, (hash, &v)) in hashes_and_views.enumerate() { - if HAS_NULLS && array.is_null(i) { - continue; - } - let view_len = v as u32; - if !HAS_BUFFERS || view_len <= 12 { - if REHASH { - *hash = combine_hashes(v.hash_one_with_hasher(hash_builder), *hash); - } else { - *hash = v.hash_one_with_hasher(hash_builder); - } - continue; - } - let value = view_bytes(view_len, v); - if REHASH { - *hash = combine_hashes(value.hash_one_with_hasher(hash_builder), *hash); - } else { - *hash = value.hash_one_with_hasher(hash_builder); - } - } -} - -#[cfg(not(feature = "force_hash_collisions"))] -fn hash_generic_byte_view_array_with_hasher( - array: &GenericByteViewArray, - hash_builder: &S, - hashes_buffer: &mut [u64], - rehash: bool, -) { - match ( - array.null_count() != 0, - !array.data_buffers().is_empty(), - rehash, - ) { - (false, false, false) => { - for (hash, &view) in hashes_buffer.iter_mut().zip(array.views().iter()) { - *hash = view.hash_one_with_hasher(hash_builder); - } - } - (false, false, true) => { - for (hash, &view) in hashes_buffer.iter_mut().zip(array.views().iter()) { - *hash = combine_hashes(view.hash_one_with_hasher(hash_builder), *hash); - } - } - (false, true, false) => { - hash_string_view_array_inner_with_hasher::( - array, - hash_builder, - hashes_buffer, - ) - } - (false, true, true) => { - hash_string_view_array_inner_with_hasher::( - array, - hash_builder, - hashes_buffer, - ) - } - (true, false, false) => { - hash_string_view_array_inner_with_hasher::( - array, - hash_builder, - hashes_buffer, - ) - } - (true, false, true) => { - hash_string_view_array_inner_with_hasher::( - array, - hash_builder, - hashes_buffer, - ) - } - (true, true, false) => { - hash_string_view_array_inner_with_hasher::( - array, - hash_builder, - hashes_buffer, - ) - } - (true, true, true) => { - hash_string_view_array_inner_with_hasher::( - array, - hash_builder, - hashes_buffer, - ) - } - } -} - /// Scatter pre-computed dictionary value hashes to key positions. /// /// Uses const generics to eliminate runtime branching in the hot loop: @@ -1620,6 +1368,271 @@ where create_hashes_with_hasher_impl(arrays, hash_builder, hashes_buffer) } +// --------------------------------------------------------------------------- +// Custom BuildHasher leaf hashing — kept below all RandomState code so the +// hot default path stays tightly packed in the instruction cache. +// --------------------------------------------------------------------------- + +#[cfg(not(feature = "force_hash_collisions"))] +trait BuildHasherHashValue { + fn hash_one_with_hasher(&self, state: &S) -> u64; +} + +#[cfg(not(feature = "force_hash_collisions"))] +impl BuildHasherHashValue for &T { + fn hash_one_with_hasher(&self, state: &S) -> u64 { + T::hash_one_with_hasher(self, state) + } +} + +macro_rules! build_hasher_hash_value { + ($($t:ty),+) => { + $(#[cfg(not(feature = "force_hash_collisions"))] + impl BuildHasherHashValue for $t { + fn hash_one_with_hasher(&self, state: &S) -> u64 { + state.hash_one(self) + } + })+ + }; +} +build_hasher_hash_value!(i8, i16, i32, i64, i128, i256, u8, u16, u32, u64, u128); +build_hasher_hash_value!(bool, str, [u8], IntervalDayTime, IntervalMonthDayNano); + +macro_rules! build_hasher_hash_float_value { + ($(($t:ty, $i:ty)),+) => { + $(#[cfg(not(feature = "force_hash_collisions"))] + impl BuildHasherHashValue for $t { + fn hash_one_with_hasher(&self, state: &S) -> u64 { + state.hash_one(<$i>::from_ne_bytes(self.to_ne_bytes())) + } + })+ + }; +} +build_hasher_hash_float_value!((half::f16, u16), (f32, u32), (f64, u64)); + +#[cfg(not(feature = "force_hash_collisions"))] +fn hash_null_with_hasher( + hash_builder: &S, + hashes_buffer: &mut [u64], + mul_col: bool, +) { + if mul_col { + hashes_buffer.iter_mut().for_each(|hash| { + *hash = combine_hashes(hash_builder.hash_one(1), *hash); + }) + } else { + hashes_buffer.iter_mut().for_each(|hash| { + *hash = hash_builder.hash_one(1); + }) + } +} + +#[cfg(not(feature = "force_hash_collisions"))] +fn hash_array_primitive_with_hasher( + array: &PrimitiveArray, + hash_builder: &S, + hashes_buffer: &mut [u64], + rehash: bool, +) where + T: ArrowPrimitiveType, + S: BuildHasher, +{ + assert_eq!( + hashes_buffer.len(), + array.len(), + "hashes_buffer and array should be of equal length" + ); + + if array.null_count() == 0 { + if rehash { + for (hash, &value) in hashes_buffer.iter_mut().zip(array.values().iter()) { + *hash = combine_hashes(value.hash_one_with_hasher(hash_builder), *hash); + } + } else { + for (hash, &value) in hashes_buffer.iter_mut().zip(array.values().iter()) { + *hash = value.hash_one_with_hasher(hash_builder); + } + } + } else if rehash { + for i in array.nulls().unwrap().valid_indices() { + let value = unsafe { array.value_unchecked(i) }; + hashes_buffer[i] = combine_hashes( + value.hash_one_with_hasher(hash_builder), + hashes_buffer[i], + ); + } + } else { + for i in array.nulls().unwrap().valid_indices() { + let value = unsafe { array.value_unchecked(i) }; + hashes_buffer[i] = value.hash_one_with_hasher(hash_builder); + } + } +} + +#[cfg(not(feature = "force_hash_collisions"))] +fn hash_array_with_hasher( + array: &T, + hash_builder: &S, + hashes_buffer: &mut [u64], + rehash: bool, +) where + T: ArrayAccessor, + T::Item: BuildHasherHashValue, + S: BuildHasher, +{ + assert_eq!( + hashes_buffer.len(), + array.len(), + "hashes_buffer and array should be of equal length" + ); + + if array.null_count() == 0 { + if rehash { + for (i, hash) in hashes_buffer.iter_mut().enumerate() { + let value = unsafe { array.value_unchecked(i) }; + *hash = combine_hashes(value.hash_one_with_hasher(hash_builder), *hash); + } + } else { + for (i, hash) in hashes_buffer.iter_mut().enumerate() { + let value = unsafe { array.value_unchecked(i) }; + *hash = value.hash_one_with_hasher(hash_builder); + } + } + } else if rehash { + for i in array.nulls().unwrap().valid_indices() { + let value = unsafe { array.value_unchecked(i) }; + hashes_buffer[i] = combine_hashes( + value.hash_one_with_hasher(hash_builder), + hashes_buffer[i], + ); + } + } else { + for i in array.nulls().unwrap().valid_indices() { + let value = unsafe { array.value_unchecked(i) }; + hashes_buffer[i] = value.hash_one_with_hasher(hash_builder); + } + } +} + +#[cfg(not(feature = "force_hash_collisions"))] +#[inline(never)] +fn hash_string_view_array_inner_with_hasher< + T: ByteViewType, + S: BuildHasher, + const HAS_NULLS: bool, + const HAS_BUFFERS: bool, + const REHASH: bool, +>( + array: &GenericByteViewArray, + hash_builder: &S, + hashes_buffer: &mut [u64], +) { + assert_eq!( + hashes_buffer.len(), + array.len(), + "hashes_buffer and array should be of equal length" + ); + + let buffers = array.data_buffers(); + let view_bytes = |view_len: u32, view: u128| { + let view = ByteView::from(view); + let offset = view.offset as usize; + unsafe { + let data = buffers.get_unchecked(view.buffer_index as usize); + data.get_unchecked(offset..offset + view_len as usize) + } + }; + + let hashes_and_views = hashes_buffer.iter_mut().zip(array.views().iter()); + for (i, (hash, &v)) in hashes_and_views.enumerate() { + if HAS_NULLS && array.is_null(i) { + continue; + } + let view_len = v as u32; + if !HAS_BUFFERS || view_len <= 12 { + if REHASH { + *hash = combine_hashes(v.hash_one_with_hasher(hash_builder), *hash); + } else { + *hash = v.hash_one_with_hasher(hash_builder); + } + continue; + } + let value = view_bytes(view_len, v); + if REHASH { + *hash = combine_hashes(value.hash_one_with_hasher(hash_builder), *hash); + } else { + *hash = value.hash_one_with_hasher(hash_builder); + } + } +} + +#[cfg(not(feature = "force_hash_collisions"))] +fn hash_generic_byte_view_array_with_hasher( + array: &GenericByteViewArray, + hash_builder: &S, + hashes_buffer: &mut [u64], + rehash: bool, +) { + match ( + array.null_count() != 0, + !array.data_buffers().is_empty(), + rehash, + ) { + (false, false, false) => { + for (hash, &view) in hashes_buffer.iter_mut().zip(array.views().iter()) { + *hash = view.hash_one_with_hasher(hash_builder); + } + } + (false, false, true) => { + for (hash, &view) in hashes_buffer.iter_mut().zip(array.views().iter()) { + *hash = combine_hashes(view.hash_one_with_hasher(hash_builder), *hash); + } + } + (false, true, false) => { + hash_string_view_array_inner_with_hasher::( + array, + hash_builder, + hashes_buffer, + ) + } + (false, true, true) => { + hash_string_view_array_inner_with_hasher::( + array, + hash_builder, + hashes_buffer, + ) + } + (true, false, false) => { + hash_string_view_array_inner_with_hasher::( + array, + hash_builder, + hashes_buffer, + ) + } + (true, false, true) => { + hash_string_view_array_inner_with_hasher::( + array, + hash_builder, + hashes_buffer, + ) + } + (true, true, false) => { + hash_string_view_array_inner_with_hasher::( + array, + hash_builder, + hashes_buffer, + ) + } + (true, true, true) => { + hash_string_view_array_inner_with_hasher::( + array, + hash_builder, + hashes_buffer, + ) + } + } +} + #[cfg(test)] mod tests { use std::hash::{BuildHasherDefault, Hasher};