perf: strength reduce hash partition modulo (up to 1.16x faster)#21900
perf: strength reduce hash partition modulo (up to 1.16x faster)#21900Dandandan wants to merge 1 commit intoapache:mainfrom
Conversation
|
run benchmarks |
1 similar comment
|
run benchmarks |
|
run benchmark tpch10 |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing perf-strength-reduce-hash-partition (b0091c6) to 65f337d (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing perf-strength-reduce-hash-partition (b0091c6) to 65f337d (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing perf-strength-reduce-hash-partition (b0091c6) to 65f337d (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing perf-strength-reduce-hash-partition (b0091c6) to 65f337d (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing perf-strength-reduce-hash-partition (b0091c6) to 65f337d (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing perf-strength-reduce-hash-partition (b0091c6) to 65f337d (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing perf-strength-reduce-hash-partition (b0091c6) to 65f337d (merge-base) diff using: tpch10 File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
run benchmark tpch10 |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing perf-strength-reduce-hash-partition (b0091c6) to 65f337d (merge-base) diff using: tpch10 File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch10 — base (merge-base)
tpch10 — branch
File an issue against this benchmark runner |
|
FYI @adriangb |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch10 — base (merge-base)
tpch10 — branch
File an issue against this benchmark runner |
adriangb
left a comment
There was a problem hiding this comment.
This is very nice @Dandandan and benchmarks look great. Some small nits / cleanups left.
| timer: metrics::Time, | ||
| ) -> Self { | ||
| Self { | ||
| ) -> Result<Self> { |
There was a problem hiding this comment.
Technically a breaking change but I think this is one of those things that is more so public for the workspace than it is for end users. So may not be worth calling out.
| num_partitions: partitions, | ||
| num_partitions: _, |
There was a problem hiding this comment.
Dead code? Should we remove?
|
I think some more docstrings linking to explainations of what this is, other example implementations, why it helps ,etc. would be nice |
There was a problem hiding this comment.
Pull request overview
Improves performance of hash-based repartitioning by replacing per-row % num_partitions with a strength-reduced remainder computation specialized for the fixed partition count.
Changes:
- Added
StrengthReducedU64(power-of-two mask or reciprocal-based reduction) to computehash % num_partitionsfaster. - Updated hash repartitioning to use the precomputed reducer instead of
%. - Added unit tests for reducer correctness and a new error path when
num_partitions == 0.
Comments suppressed due to low confidence (1)
datafusion/physical-plan/src/repartition/mod.rs:596
- The docstring for
BatchPartitioner::try_newsays it only errors when the partitioning scheme is unsupported, butPartitioning::Hash(_, 0)will now also error vianew_hash_partitioner. Please update the docs (or behavior) so callers can rely on the documented error conditions.
/// Create a new [`BatchPartitioner`] based on the provided [`Partitioning`] scheme.
///
/// This is a convenience constructor that delegates to the specialized
/// hash or round-robin constructors depending on the partitioning variant.
///
/// # Parameters
/// - `partitioning`: Partitioning scheme to apply (hash or round-robin).
/// - `timer`: Metric used to record time spent during repartitioning.
/// - `input_partition`: Index of the current input partition.
/// - `num_input_partitions`: Total number of input partitions.
///
/// # Errors
/// Returns an error if the provided partitioning scheme is not supported.
pub fn try_new(
partitioning: Partitioning,
timer: metrics::Time,
input_partition: usize,
num_input_partitions: usize,
) -> Result<Self> {
match partitioning {
Partitioning::Hash(exprs, num_partitions) => {
Self::new_hash_partitioner(exprs, num_partitions, timer)
}
Partitioning::RoundRobinBatch(num_partitions) => {
Ok(Self::new_round_robin_partitioner(
num_partitions,
timer,
input_partition,
num_input_partitions,
))
}
other => {
not_impl_err!("Unsupported repartitioning scheme {other:?}")
}
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| pub fn new_hash_partitioner( | ||
| exprs: Vec<Arc<dyn PhysicalExpr>>, | ||
| num_partitions: usize, | ||
| timer: metrics::Time, | ||
| ) -> Self { | ||
| Self { | ||
| ) -> Result<Self> { | ||
| if num_partitions == 0 { | ||
| return internal_err!("Hash repartition requires at least one partition"); | ||
| } | ||
|
|
||
| Ok(Self { | ||
| state: BatchPartitionerState::Hash { | ||
| exprs, | ||
| num_partitions, | ||
| partition_reducer: StrengthReducedU64::new(num_partitions as u64), | ||
| hash_buffer: vec![], | ||
| indices: vec![vec![]; num_partitions], | ||
| }, | ||
| timer, | ||
| } | ||
| }) | ||
| } |
There was a problem hiding this comment.
new_hash_partitioner is pub in the datafusion-physical-plan crate, but this change makes it fallible (-> Result<Self>) and introduces a new runtime error for num_partitions == 0. That is a breaking public API change for downstream crates. Consider preserving the existing signature (e.g., keep new_hash_partitioner infallible and add a new try_new_hash_partitioner / new_hash_partitioner_checked), or reduce visibility if this is intended to be internal-only.
Which issue does this PR close?
Rationale for this change
Performance improvement on large hash-repartitions.
TPC-H at bigger scale factor shows biggest benefit:
Details
What changes are included in this PR?
Use strength-reduce to speed up hash % partition
Are these changes tested?
Existing tests
-## Are there any user-facing changes?
No.