Gene.bordegaray/2026/04/partition routed dynamic filters#21832
Gene.bordegaray/2026/04/partition routed dynamic filters#21832gene-bordegaray wants to merge 3 commits intoapache:mainfrom
Conversation
|
I have not read the code in detail (1.6k LOC) but I wonder how this will work in a world where the partition index of an opener is much less clear. Morselized scans (#20529) mean that filters will get applied at the morsel stage. Morsels won't belong to partitions per se / will be shared across partitions. So it seems to me that this entire feature is incompatible with the direction in which DataFusion is headed, unless I'm missing something. |
The important thing to point out is that this only uses partition-index routing when the scan still has a constant partition identity, which is the preserved file partitioned case we are targetting here. The If future morselized scans no longer have that identity for a morsel, they can fall back to the global dynamic filter, so this is still very valuable use case for partitioned data wee are seeing in production and should not be blocking the morselized driven work. Let me know thoughts, maybe @alamb @NGA-TRAN @gabotechs have thoughts |
| mut self, | ||
| partition_batches: Vec<Vec<RecordBatch>>, | ||
| ) -> Self { | ||
| self.partition_batches = partition_batches; |
There was a problem hiding this comment.
did this to eliminate review churn but can consolidate
|
I thought last call we had about this the conclusion was that the right way to go about this is to add native range partitioning support to DataFusion? I think that will be a big task but will be justifiable as benefiting a wider group of users and will ultimately land the codebase in a better place. |
We left off with this summary of the convo: #21207 (comment) and converged on an approach like this. Even with range partitioning, the need to route filters to a particular partition would be needed on partitioned data (without the RepartitonExec in the plan) this work would support this if the new partitioning is ever needed. |
|
@adriangb: Thanks for the proposal to add native range partitioning support to DataFusion. That’s one of the solutions Gene suggested. However, from our recent sync with @alamb and the ParadeDB team, Andrew recommended simply mapping the partition index of the build side and probe side — which is what this PR implements. The extra code is needed because we only apply this for range partitioning (not hash), and only when partition preservation is enabled and no repartitioning occurs beforehand, which aligns with the optimizer rules. This approach benefits all types of range partitions. Gene has a detailed analysis here: #21207 (comment) @alamb: If the extended solution isn’t what you had in mind, what would you suggest? Should we move toward native range partitioning support as Adrian proposed, or is the current PR approach sufficient for now? |
|
I would also want to highlight how important supporting partitioned data is for large, scalable systems. We have this feature in use and have seen amazing results by eliminating repartitions with pre-partitioned data and pushing dynamic filters down to the correct partition of that unshuffled data: Here are some metrics on this: I am sure others that use partitioned data will appreciate such results and the contributors at Datadog plan to continue to strengthen DataFusion's support of pre-partitioned data 😄 |
|
@gene-bordegaray those are indeed amazing numbers!! I think part of the tension here is that this is not a minor feature, no matter how it’s implemented it seems (we’ve tried multiple ways). We (Pydantic) don’t have data that is laid out like this so it’s hard for me to understand the nuances of how this should all work and also justify the time to help get it all across the line. If there are other users of DataFusion other than DataDog that have similar systems or want to be able to have them bringing them to the table would be very helpful because they can help shape a solution that works for everyone and also donate review and development time. @NGA-TRAN I actually discussed this with @alamb this morning. Maybe I am misunderstanding one or the both sides of the conversation, but I was trying to transmit the outcome of my discussion with him which was to approach this from the angle of adding general support for range partitioning. |
I am not quite sure what you are referring to here. .I am checking this PR and the discussions out now |
|
@adriangb @alamb @gene-bordegaray: Let me know if this approach is feasible:
I call it global-planning property but it is only a property of a plan so it can have a better name. My assumption is that this would be simple and require minimal code changes. Or the join property is always false. We add an optimization rule and traverse the plan bottom-up to set that property to true in the same fashion above |
It might be feasible, but I don't think it is general (aka would not be used by other users of DataFusion) and makes is more complicated for a single usecase. Thus I am not sure we should add it to this repo If you want to proveed with this approach, I think you could add it as a user defined PhysicalOptimizer pass to fix up the plan / DynamicFilter for your uase case. I also wrote up a summary of my thoughts here, including another possible alternate workaround Let me know what you think |
I recommend a 2 phase strategy:
|
Which issue does this PR close?
I have split it up into three commits to make it easier to review
Rationale for this change
What changes are included in this PR?
Extensively covered in #21207 via this comment: #21207 (comment)
Are these changes tested?
Yes:
Are there any user-facing changes?
Yes:
datafusion_physical_plan::joins::DynamicFilterRoutingModeEnum with Variants:
HashJoinExecdynamic_filter_routing_mode: DynamicFilterRoutingModeHashJoinExecBuilderwith_dynamic_filter_routing_modeDynamicFilterPhysicalExprupdate_partitioned(),partition_filter()Protos
DynamicFilterRoutingMode->dynamic_filter_routing_mode = 11