Skip to content

feat: LogicalPlanningPipeline#21829

Draft
notfilippo wants to merge 12 commits intoapache:mainfrom
notfilippo:filippo.rossi/logical-planning-pipeline
Draft

feat: LogicalPlanningPipeline#21829
notfilippo wants to merge 12 commits intoapache:mainfrom
notfilippo:filippo.rossi/logical-planning-pipeline

Conversation

@notfilippo
Copy link
Copy Markdown
Member

@notfilippo notfilippo commented Apr 24, 2026

POC for design validation only — not ready for review or merge. Will open a proper issue when the design is more stable.

Why

DataFusion's planning architecture has two implicit constraints that block a class of real use cases:

1. All rules are synchronous. No async I/O is possible during planning. Systems that need to resolve remote metadata — catalog fetches, schema-on-read stores like Elasticsearch, Delta Lake, or Iceberg — must either block a thread or pre-resolve everything before planning begins. Neither is acceptable at scale.

2. The pipeline is a closed two-slot system. Frameworks and engines built on DataFusion have no way to inject custom planning stages without replacing the entire Analyzer or Optimizer. There is no named hook, no ordering guarantee, no enable/disable knob.

Concrete scenarios that motivate this:

  • Schema-on-read catalogs (Elasticsearch, Delta Lake, Iceberg): field schemas are resolved by async metadata fetches. An AsyncAnalyzerRule can do those fetches and rewrite table scans in-place, without blocking or pre-warming a cache.
  • Join-driven metadata enrichment: you may want to fetch metadata based on the shape of a join in the plan — that requires seeing the plan first, then doing async I/O, then transforming it. Exactly the async rule model.
  • Custom pipeline stages: an engine embedding DataFusion may need a pre-analysis phase that rewrites vendor-specific SQL constructs before TypeCoercion runs, or a post-optimization phase that applies cost-based rewrites using stats fetched asynchronously.

What

1. Add async rule variants

New traits alongside the existing sync ones:

// analysis
pub trait AsyncAnalyzerRule { async fn analyze(...) }

// optimization
pub trait AsyncOptimizerRule { async fn rewrite(...) }

2. Unify everything into an extensible pipeline

Replaces the two hardcoded slots with a single LogicalPlanningPipeline — an ordered Vec<Phase> where each phase is named, can be enabled/disabled, carries a Strategy (Once or FixedPoint), and holds rules of one type:

Phase variant Rule type
SyncAnalysis AnalyzerRule
SyncOptimization OptimizerRule
AsyncAnalysis AsyncAnalyzerRule
AsyncOptimization AsyncOptimizerRule

The design is directly inspired by Spark SQL's Batch(name, strategy, rules*) and Apache Calcite's HepPlanner batches: named, ordered phases with explicit iteration strategy, strict analysis/optimization separation.

The default pipeline has two phases that replicate existing behavior exactly:

"analysis"      SyncAnalysis     FixedPoint  ← Analyzer::new().rules
"optimization"  SyncOptimization FixedPoint  ← Optimizer::new().rules

SessionState replaces analyzer: Analyzer and optimizer: Optimizer with logical_pipeline: LogicalPlanningPipeline. Users can inject custom phases (including async ones) at any position:

let mut phase = AsyncAnalysisPhase::new("my-async-phase", Strategy::Once);
phase.rules.push(Arc::new(MyAsyncRule));

let state = SessionStateBuilder::new()
    .with_phase_before(DEFAULT_ANALYSIS_PHASE, Phase::AsyncAnalysis(phase))
    .build();

The sync optimize() path (DataFrame API, PREPARE) fails fast with a clear error if an async phase is registered. The async create_physical_plan() path handles all four variants, including EXPLAIN output that captures per-rule intermediate plans across both sync and async phases.

Known gaps

  • SessionState::analyzer() / optimizer() now return owned values instead of references.

@github-actions github-actions Bot added optimizer Optimizer rules core Core DataFusion crate labels Apr 24, 2026
@notfilippo notfilippo marked this pull request as draft April 24, 2026 09:55
@notfilippo notfilippo changed the title feat: replace 4 hardcoded planning slots with extensible LogicalPlanningPipeline feat: LogicalPlanningPipeline Apr 24, 2026
@notfilippo notfilippo force-pushed the filippo.rossi/logical-planning-pipeline branch 4 times, most recently from d7ff22c to fb1e7d4 Compare April 24, 2026 12:24
…lanning

Replaces the hardcoded Analyzer/Optimizer slots in SessionState with an
ordered Vec<Phase> pipeline. Each phase carries its own strategy (Once |
FixedPoint), rule list, and enabled flag.

Key additions:
- SyncPhase<T> / AsyncPhase<T> generic phase containers
- SyncAnalysisPhase, SyncOptimizationPhase, AsyncAnalysisPhase, AsyncOptimizationPhase type aliases
- LogicalPlanningPipeline with apply_sync, apply_sync_explained, apply (async)
- SessionStateBuilder gains with_logical_pipeline, with_phase, with_phase_before, with_phase_after
- EXPLAIN captures per-rule StringifiedPlans via apply_sync_explained
- Analysis and optimization phases both use LogicalPlanSignature for early convergence
- Async max_passes reads config.optimizer.max_passes, not a hardcoded constant
- planning_pipeline.rs split into logical_pipeline/ module (mod, pipeline, sync_phase, async_phase)
@notfilippo notfilippo force-pushed the filippo.rossi/logical-planning-pipeline branch from fb1e7d4 to 6bf1c30 Compare April 24, 2026 12:57
@notfilippo notfilippo force-pushed the filippo.rossi/logical-planning-pipeline branch from e589f21 to 7124d84 Compare April 24, 2026 13:14
@notfilippo notfilippo force-pushed the filippo.rossi/logical-planning-pipeline branch from 70b288b to 22c9160 Compare April 24, 2026 14:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate optimizer Optimizer rules

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant