Draft
Conversation
d7ff22c to
fb1e7d4
Compare
…lanning Replaces the hardcoded Analyzer/Optimizer slots in SessionState with an ordered Vec<Phase> pipeline. Each phase carries its own strategy (Once | FixedPoint), rule list, and enabled flag. Key additions: - SyncPhase<T> / AsyncPhase<T> generic phase containers - SyncAnalysisPhase, SyncOptimizationPhase, AsyncAnalysisPhase, AsyncOptimizationPhase type aliases - LogicalPlanningPipeline with apply_sync, apply_sync_explained, apply (async) - SessionStateBuilder gains with_logical_pipeline, with_phase, with_phase_before, with_phase_after - EXPLAIN captures per-rule StringifiedPlans via apply_sync_explained - Analysis and optimization phases both use LogicalPlanSignature for early convergence - Async max_passes reads config.optimizer.max_passes, not a hardcoded constant - planning_pipeline.rs split into logical_pipeline/ module (mod, pipeline, sync_phase, async_phase)
…d #[must_use] to insert_before/after
fb1e7d4 to
6bf1c30
Compare
…e and AsyncOptimizerRule
…pturing per-rule plans across all phase types
…pipeline directly, remove staging buffers
…nction_rewrites off generic SyncPhase<T>
…for function_rewrites, remove duplicate field from SessionState
e589f21 to
7124d84
Compare
…n doc links, format Cargo.toml
70b288b to
22c9160
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Why
DataFusion's planning architecture has two implicit constraints that block a class of real use cases:
1. All rules are synchronous. No async I/O is possible during planning. Systems that need to resolve remote metadata — catalog fetches, schema-on-read stores like Elasticsearch, Delta Lake, or Iceberg — must either block a thread or pre-resolve everything before planning begins. Neither is acceptable at scale.
2. The pipeline is a closed two-slot system. Frameworks and engines built on DataFusion have no way to inject custom planning stages without replacing the entire
AnalyzerorOptimizer. There is no named hook, no ordering guarantee, no enable/disable knob.Concrete scenarios that motivate this:
AsyncAnalyzerRulecan do those fetches and rewrite table scans in-place, without blocking or pre-warming a cache.TypeCoercionruns, or a post-optimization phase that applies cost-based rewrites using stats fetched asynchronously.What
1. Add async rule variants
New traits alongside the existing sync ones:
2. Unify everything into an extensible pipeline
Replaces the two hardcoded slots with a single
LogicalPlanningPipeline— an orderedVec<Phase>where each phase is named, can be enabled/disabled, carries aStrategy(OnceorFixedPoint), and holds rules of one type:SyncAnalysisAnalyzerRuleSyncOptimizationOptimizerRuleAsyncAnalysisAsyncAnalyzerRuleAsyncOptimizationAsyncOptimizerRuleThe design is directly inspired by Spark SQL's
Batch(name, strategy, rules*)and Apache Calcite'sHepPlannerbatches: named, ordered phases with explicit iteration strategy, strict analysis/optimization separation.The default pipeline has two phases that replicate existing behavior exactly:
SessionStatereplacesanalyzer: Analyzerandoptimizer: Optimizerwithlogical_pipeline: LogicalPlanningPipeline. Users can inject custom phases (including async ones) at any position:The sync
optimize()path (DataFrameAPI,PREPARE) fails fast with a clear error if an async phase is registered. The asynccreate_physical_plan()path handles all four variants, including EXPLAIN output that captures per-rule intermediate plans across both sync and async phases.Known gaps
SessionState::analyzer()/optimizer()now return owned values instead of references.