Skip to content

perf: avoid JVM shuffle when sandwiched between non-Comet operators#4010

Open
andygrove wants to merge 4 commits intoapache:mainfrom
andygrove:avoid-jvm-shuffle-for-jvm-parent
Open

perf: avoid JVM shuffle when sandwiched between non-Comet operators#4010
andygrove wants to merge 4 commits intoapache:mainfrom
andygrove:avoid-jvm-shuffle-for-jvm-parent

Conversation

@andygrove
Copy link
Copy Markdown
Member

@andygrove andygrove commented Apr 20, 2026

Which issue does this PR close?

Closes #4004.

Rationale for this change

In TPC-DS plans such as q1, when partial/final hash aggregates cannot be converted to Comet (e.g. due to unsupported aggregate expressions or types), the shuffle between them is still wrapped as CometColumnarExchange (columnar shuffle). With a JVM operator on both sides of the shuffle, this adds a row -> arrow -> shuffle -> arrow -> row round trip with no Comet operator on either side able to consume columnar output:

HashAggregate
  +- CometNativeColumnarToRow
     +- CometColumnarExchange
        +- HashAggregate

The extra conversion is pure overhead compared to a vanilla Spark row-based shuffle.

What changes are included in this PR?

  • Added a post-transform pass revertRedundantColumnarShuffle in CometExecRule that detects CometShuffleExchangeExec with CometColumnarShuffle whose child is not a Comet plan and whose parent is not a Comet plan, and reverts it to the original Spark ShuffleExchangeExec (preserved in the originalPlan field).
  • The pass runs only in the COMET_EXEC_ENABLED=true branch of _apply, so users running with COMET_EXEC_ENABLED=false (shuffle-only mode) are unaffected.
  • Regenerated TPC-DS plan-stability golden files for Spark 3.4, 3.5, and 4.0 to reflect the new pattern.

After the fix, the q1 pattern above becomes:

HashAggregate
  +- Exchange
     +- HashAggregate

How are these changes tested?

  • New test CometExecRule should not wrap shuffle in CometColumnarShuffle when both sides are JVM in CometExecRuleSuite disables partial hash aggregate to force both aggregates to stay JVM, then asserts that the shuffle remains a plain ShuffleExchangeExec.
  • Existing CometShuffleSuite, CometExecSuite, CometShuffleFallbackStickinessSuite, and both TPC-DS plan-stability suites (v1.4 and v2.7) continue to pass against the regenerated golden files.

* nor the child is a Comet plan that can consume columnar output, that conversion is pure
* overhead (row->arrow->shuffle->arrow->row vs. row->shuffle->row).
*/
private def revertRedundantColumnarShuffle(plan: SparkPlan): SparkPlan = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the opttimzation @andygrove
I guess this is the first time where CometShuffleExchangeExec is reverted back to a plain ShuffleExchangeExec.

The two shuffle paths use different memory systems:

  • Comet columnar shuffle uses Comet's own memory pool. (off-heap)
  • Spark vanilla shuffle uses the JVM execution memory pool , with spills managed by ExternalSorter.

Users who have tuned their clusters for Comet (smaller JVM heap) could see unexpected spills after this chang, shifting shuffle memory pressure back to theJVM.
Additionally, Comet's Arrow IPC columnar format typically compresses better than Spark's row-based UnsafeRowSerializer path, so shuffle I/O mayalso increase.
It would be good to document or log when a shuffle is reverted so users can correlate any unexpected behavior with this optimization.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback @karuppayya. I'm assuming the cost of doing two transitions (r2c then c2r) would outweigh the benefits of using Comet shuffle? I agree that it would be worth adding documentation.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this optimization will improve performance and compute efficiency.

My main concern is determining the best recommendation for users to tune memory, particularly since they cannot explicitly disable it.

Also can it be a seperate rule in itself and have it only in org.apache.comet.CometSparkSessionExtensions.CometExecColumnar#postColumnarTransitions?

@andygrove andygrove changed the title perf: avoid JVM shuffle when sandwiched between non-Comet operators perf: avoid JVM shuffle when sandwiched between non-Comet operators [WIP] Apr 20, 2026
@andygrove
Copy link
Copy Markdown
Member Author

I may need to address the known issues with mixed Spark/Comet partial/final aggregates before making progress on this issue (#1267, #1389)

When a CometShuffleExchangeExec with CometColumnarShuffle has a non-Comet
child and a non-Comet parent, the columnar shuffle only adds
row->arrow->shuffle->arrow->row conversion overhead with no Comet operator
on either side to consume columnar output. Revert such shuffles to the
original Spark ShuffleExchangeExec after the main transform pass.

Closes apache#4004
The broader match that checked any non-Comet parent broke object-mode
Dataset plans in CometIcebergNativeSuite (DeserializeToObject around a
CometColumnarExchange over encoder nodes). A CometNativeColumnarToRowExec
elsewhere in the plan had its assertion child.supportsColumnar violated
when transform bubbled up the new row-based Exchange.

Restrict the match to the exact reported pattern: HashAggregateExec or
ObjectHashAggregateExec on both sides of the shuffle. Golden TPC-DS plans
are unchanged by this narrowing.
…ce Comet columnar shuffle

Without the tag, AQE re-plans each stage in isolation, and the isolated
subplan (which no longer shows the parent aggregate) converts the reverted
ShuffleExchangeExec back into a CometShuffleExchangeExec. Subsequent plan
canonicalization then fails because a ColumnarToRowExec ends up with a
non-columnar child.

Persist the revert decision via a TreeNodeTag on the ShuffleExchangeExec.
Both applyCometShuffle and the main transform now short-circuit when the
tag is set, so the decision survives re-entrancy.
@andygrove andygrove force-pushed the avoid-jvm-shuffle-for-jvm-parent branch from c856b23 to 5e1e49d Compare April 23, 2026 13:34
@andygrove andygrove changed the title perf: avoid JVM shuffle when sandwiched between non-Comet operators [WIP] perf: avoid JVM shuffle when sandwiched between non-Comet operators Apr 23, 2026
@andygrove andygrove marked this pull request as ready for review April 23, 2026 13:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Avoid JVM shuffle when parent stage will just convert back to rows

2 participants