Skip to content

fix: fix elapsed_compute metric in ParquetSink to report encoding time only #21825

Open
fred1268 wants to merge 1 commit intoapache:mainfrom
fred1268:issue-21297
Open

fix: fix elapsed_compute metric in ParquetSink to report encoding time only #21825
fred1268 wants to merge 1 commit intoapache:mainfrom
fred1268:issue-21297

Conversation

@fred1268
Copy link
Copy Markdown

Which issue does this PR close?

Rationale for this change

ParquetSink registered an elapsed_compute metric using a single wall-clock timer that spanned the entire write operation — upstream batch wait, CPU Arrow→Parquet encoding, and object-store I/O all rolled into one number. This
made the metric misleading: it inflated elapsed_compute with I/O latency, which is inconsistent with how every other operator in DataFusion reports this metric (CPU time only).

What changes are included in this PR?

Two write paths are fixed independently:

Sequential path (allow_single_file_parallelism = false or CDC enabled):

  • A new TimingWriter<W> wrapper implements AsyncFileWriter and records wall-clock time spent in I/O calls (write / complete).
  • The total time inside writer.write() and writer.close() is accumulated in total_write_time. After all tasks join, elapsed_compute is set to total_write_time − io_time, isolating pure Arrow→Parquet encoding time.

Parallel path (allow_single_file_parallelism = true, default):

  • encoding_time: Time (a clone of the registered elapsed_compute metric) is threaded through the five-function call chain down to the two leaf sites: writer.write() in column_serializer_task and writer.close() in spawn_rg_join_and_finalize_task. Since Time is Arc<AtomicUsize>, all concurrent column tasks accumulate directly into the registered metric.
  • Note: on the parallel path, append_to_row_group() in concatenate_parallel_row_groups is interleaved with I/O and cannot be cleanly isolated. It is excluded from elapsed_compute. This is acceptable since it operates on already-encoded data and represents a small fraction of total encoding CPU time.

Are these changes tested?

Yes. Two tests are added/extended in datafusion/core/src/dataframe/parquet.rs:

  • test_parquet_sink_metrics_sequential (new): verifies elapsed_compute > 0 with allow_single_file_parallelism = false.
  • test_parquet_sink_metrics_parallel: extended with an elapsed_compute > 0 assertion (was previously missing for the parallel path).

The existing test_parquet_sink_metrics test (parallel path, default config) already asserted elapsed_compute > 0 and continues to pass.

Are there any user-facing changes?

No API changes. The elapsed_compute metric was already surfaced — this PR makes its value accurate rather than introducing a new metric.

@github-actions github-actions Bot added core Core DataFusion crate datasource Changes to the datasource crate labels Apr 24, 2026
Comment on lines 1377 to +1381
let elapsed_compute = MetricBuilder::new(&self.metrics).elapsed_compute(0);

let write_start = datafusion_common::instant::Instant::now();
// Sequential path only: not registered in MetricsSet — used internally to
// compute elapsed_compute = total_write_time - io_time.
let total_write_time = Time::new();
let io_time = Time::new();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the things I see here is that the elapsed_compute metrics is completely wrong.

Measuring IO and compensating with total_write_time sounds like it can work, but it sounds like a patch for compensating the fact that elapsed_compute is not being measured correctly. It would be interesting to see if we can measure elapsed_compute correctly on the first place.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In https://github.com/datafusion-contrib/datafusion-distributed we have this:

https://github.com/datafusion-contrib/datafusion-distributed/blob/2b6c918e0636e0645fafcc109252262f94d6c452/src/worker/worker_connection_pool.rs#L455-L486

Which allows to wrap any arbitrary future, something like this:

async move {
    // do some async work
}.with_elapsed_compute(elapsed_compute)

I could imagine how something similar could just be used here for properly measuring elapsed_compute, although the change would be bigger.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you think this can help avoid the mess, it might be worth trying, otherwise, what you have here is an improvement.

Comment on lines +1637 to 1638
let _timer = encoding_time.timer();
finalized_rg.push(writer.close()?);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this just measuring the writer.close() method here? is there nothing to be measured in the tasks above?

EDIT: I see now that the encoding_time is also passed while instantiating the tasks. It's a bit messy, but I think it should work.

Comment on lines +1653 to 1654
#[expect(clippy::too_many_arguments)]
fn spawn_parquet_parallel_serialization_task(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh no... clippy kicked in.

I see this file was a bit messy before.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is one idea about how to make it better:

I see a pattern where file_write_tasks is set only once on the FileSink::spawn_writer_tasks_and_join implementation, and then task are spawned for it.

I see that several of the arguments passed to these longs functions are not per-path, and are instead global to the whole FileSink::spawn_writer_tasks_and_join call, for example: schema, props, skip_arrow_metadata, parallel_options, pool.

One idea that comes to mind is to gather all these "contextual" variables in a struct that is responsible for task spawning, for example:

struct FileWriteTasks {
    file_write_tasks: JoinSet<
        std::result::Result<(Path, ParquetMetaData), DataFusionError>,
    >,
    schema: SchemaRef,
    properties: WriterProperties,
    parquet_options: ParallelParquetWriterOptions,
    pool: Arc<dyn MemoryPool>
}

impl FileWriteTasks {
    fn spawn_task_foo(&mut self) {
        self.file_write_tasks.spawn(async move {
            ...
        })
    }

    fn spawn_task_bar(&mut self) {
        self.file_write_tasks.spawn(async move {
            ...
        })
    }
}

That would allow the contextual variables to not need to be threaded around all the way down the callstack.

Actually, your new encoding_time, etc... variables fall in this category of "contextual variables" that are in scope for the whole duration of FileSink::spawn_writer_tasks_and_join, so they could be just fields of FileWriteTasks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate datasource Changes to the datasource crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

ParquetSink's elapsed_compute accounts for IO, not just CPU

2 participants