fix: fix elapsed_compute metric in ParquetSink to report encoding time only #21825
fix: fix elapsed_compute metric in ParquetSink to report encoding time only #21825fred1268 wants to merge 1 commit intoapache:mainfrom
Conversation
| let elapsed_compute = MetricBuilder::new(&self.metrics).elapsed_compute(0); | ||
|
|
||
| let write_start = datafusion_common::instant::Instant::now(); | ||
| // Sequential path only: not registered in MetricsSet — used internally to | ||
| // compute elapsed_compute = total_write_time - io_time. | ||
| let total_write_time = Time::new(); | ||
| let io_time = Time::new(); |
There was a problem hiding this comment.
One of the things I see here is that the elapsed_compute metrics is completely wrong.
Measuring IO and compensating with total_write_time sounds like it can work, but it sounds like a patch for compensating the fact that elapsed_compute is not being measured correctly. It would be interesting to see if we can measure elapsed_compute correctly on the first place.
There was a problem hiding this comment.
In https://github.com/datafusion-contrib/datafusion-distributed we have this:
Which allows to wrap any arbitrary future, something like this:
async move {
// do some async work
}.with_elapsed_compute(elapsed_compute)I could imagine how something similar could just be used here for properly measuring elapsed_compute, although the change would be bigger.
There was a problem hiding this comment.
If you think this can help avoid the mess, it might be worth trying, otherwise, what you have here is an improvement.
| let _timer = encoding_time.timer(); | ||
| finalized_rg.push(writer.close()?); |
There was a problem hiding this comment.
Isn't this just measuring the writer.close() method here? is there nothing to be measured in the tasks above?
EDIT: I see now that the encoding_time is also passed while instantiating the tasks. It's a bit messy, but I think it should work.
| #[expect(clippy::too_many_arguments)] | ||
| fn spawn_parquet_parallel_serialization_task( |
There was a problem hiding this comment.
oh no... clippy kicked in.
I see this file was a bit messy before.
There was a problem hiding this comment.
Here is one idea about how to make it better:
I see a pattern where file_write_tasks is set only once on the FileSink::spawn_writer_tasks_and_join implementation, and then task are spawned for it.
I see that several of the arguments passed to these longs functions are not per-path, and are instead global to the whole FileSink::spawn_writer_tasks_and_join call, for example: schema, props, skip_arrow_metadata, parallel_options, pool.
One idea that comes to mind is to gather all these "contextual" variables in a struct that is responsible for task spawning, for example:
struct FileWriteTasks {
file_write_tasks: JoinSet<
std::result::Result<(Path, ParquetMetaData), DataFusionError>,
>,
schema: SchemaRef,
properties: WriterProperties,
parquet_options: ParallelParquetWriterOptions,
pool: Arc<dyn MemoryPool>
}
impl FileWriteTasks {
fn spawn_task_foo(&mut self) {
self.file_write_tasks.spawn(async move {
...
})
}
fn spawn_task_bar(&mut self) {
self.file_write_tasks.spawn(async move {
...
})
}
}That would allow the contextual variables to not need to be threaded around all the way down the callstack.
Actually, your new encoding_time, etc... variables fall in this category of "contextual variables" that are in scope for the whole duration of FileSink::spawn_writer_tasks_and_join, so they could be just fields of FileWriteTasks
Which issue does this PR close?
Rationale for this change
ParquetSinkregistered anelapsed_computemetric using a single wall-clock timer that spanned the entire write operation — upstream batch wait, CPU Arrow→Parquet encoding, and object-store I/O all rolled into one number. Thismade the metric misleading: it inflated
elapsed_computewith I/O latency, which is inconsistent with how every other operator in DataFusion reports this metric (CPU time only).What changes are included in this PR?
Two write paths are fixed independently:
Sequential path (
allow_single_file_parallelism = falseor CDC enabled):TimingWriter<W>wrapper implementsAsyncFileWriterand records wall-clock time spent in I/O calls (write/complete).writer.write()andwriter.close()is accumulated intotal_write_time. After all tasks join,elapsed_computeis set tototal_write_time − io_time, isolating pure Arrow→Parquet encoding time.Parallel path (
allow_single_file_parallelism = true, default):encoding_time: Time(a clone of the registeredelapsed_computemetric) is threaded through the five-function call chain down to the two leaf sites:writer.write()incolumn_serializer_taskandwriter.close()inspawn_rg_join_and_finalize_task. SinceTimeisArc<AtomicUsize>, all concurrent column tasks accumulate directly into the registered metric.append_to_row_group()inconcatenate_parallel_row_groupsis interleaved with I/O and cannot be cleanly isolated. It is excluded fromelapsed_compute. This is acceptable since it operates on already-encoded data and represents a small fraction of total encoding CPU time.Are these changes tested?
Yes. Two tests are added/extended in
datafusion/core/src/dataframe/parquet.rs:test_parquet_sink_metrics_sequential(new): verifieselapsed_compute > 0withallow_single_file_parallelism = false.test_parquet_sink_metrics_parallel: extended with anelapsed_compute > 0assertion (was previously missing for the parallel path).The existing
test_parquet_sink_metricstest (parallel path, default config) already assertedelapsed_compute > 0and continues to pass.Are there any user-facing changes?
No API changes. The
elapsed_computemetric was already surfaced — this PR makes its value accurate rather than introducing a new metric.