Describe the bug
ParquetSink registers an elapsed_compute metric, but its value is a single wall-clock timer spanning the entire write phase — from before writer tasks are spawned to after they all join (file_format.rs, spawn_writer_tasks_and_join):
let write_start = Instant::now();
// spawn tasks — each calls writer.write(&batch) then writer.close()
// join all tasks
elapsed_compute.add_elapsed(write_start);
This means elapsed_compute conflates three fundamentally different contributors:
- Upstream backpressure — time blocked on
rx.recv().await waiting for the upstream plan to produce the next RecordBatch
- CPU encoding — Arrow → Parquet serialisation and column compression
- Object-store I/O —
BufWriter flushes at row-group boundaries and the final close()
DataSinkExec::metrics() is a transparent passthrough to ParquetSink::metrics(), so this value propagates unchanged up the plan tree. Any metrics walker that sums ElapsedCompute across operators will misattribute I/O latency and upstream wait time as compute time.
To Reproduce
No response
Expected behavior
elapsed_compute for ParquetSink should reflect only CPU time (Arrow → Parquet encoding and compression), consistent with how elapsed_compute is used in other operators. I/O time and upstream wait should either be tracked separately or excluded.
Additional context
ParquetSink has two write paths: a sequential path (one AsyncArrowWriter per file, active when allow_single_file_parallelism = false) and a parallel path (output_single_parquet_file_parallelized, active when allow_single_file_parallelism = true,
which is the default).
Sequential path. AsyncArrowWriter encodes batches one at a time. Encoding is synchronous (sync_writer.write(batch)?) and I/O is async, occurring only at row-group flush boundaries and on close(). The two phases are clearly separated inside the crate, but sync_writer is a private field — DataFusion cannot time them independently without changes to arrow-rs or an interception point at the AsyncFileWriter layer.
Parallel path. Each column in each row group is encoded in its own tokio task. A separate task assembles completed row groups and flushes them to the object store via a raw Box<dyn AsyncWrite>. CPU encoding is distributed across concurrent tasks and the I/O and CPU work are interleaved within the same concatenation loop — there is no clean trait boundary to intercept.
Describe the bug
ParquetSinkregisters anelapsed_computemetric, but its value is a single wall-clock timer spanning the entire write phase — from before writer tasks are spawned to after they all join (file_format.rs,spawn_writer_tasks_and_join):This means
elapsed_computeconflates three fundamentally different contributors:rx.recv().awaitwaiting for the upstream plan to produce the nextRecordBatchBufWriterflushes at row-group boundaries and the finalclose()DataSinkExec::metrics()is a transparent passthrough toParquetSink::metrics(), so this value propagates unchanged up the plan tree. Any metrics walker that sumsElapsedComputeacross operators will misattribute I/O latency and upstream wait time as compute time.To Reproduce
No response
Expected behavior
elapsed_computeforParquetSinkshould reflect only CPU time (Arrow → Parquet encoding and compression), consistent with howelapsed_computeis used in other operators. I/O time and upstream wait should either be tracked separately or excluded.Additional context
ParquetSinkhas two write paths: a sequential path (oneAsyncArrowWriterper file, active whenallow_single_file_parallelism = false) and a parallel path (output_single_parquet_file_parallelized, active whenallow_single_file_parallelism = true,which is the default).
Sequential path.
AsyncArrowWriterencodes batches one at a time. Encoding is synchronous (sync_writer.write(batch)?) and I/O is async, occurring only at row-group flush boundaries and onclose(). The two phases are clearly separated inside the crate, butsync_writeris a private field — DataFusion cannot time them independently without changes toarrow-rsor an interception point at theAsyncFileWriterlayer.Parallel path. Each column in each row group is encoded in its own tokio task. A separate task assembles completed row groups and flushes them to the object store via a raw
Box<dyn AsyncWrite>. CPU encoding is distributed across concurrent tasks and the I/O and CPU work are interleaved within the same concatenation loop — there is no clean trait boundary to intercept.