Skip to content

ParquetSink's elapsed_compute accounts for IO, not just CPU #21797

@fred1268

Description

@fred1268

Describe the bug

ParquetSink registers an elapsed_compute metric, but its value is a single wall-clock timer spanning the entire write phase — from before writer tasks are spawned to after they all join (file_format.rs, spawn_writer_tasks_and_join):

let write_start = Instant::now();
// spawn tasks — each calls writer.write(&batch) then writer.close()
// join all tasks
elapsed_compute.add_elapsed(write_start);

This means elapsed_compute conflates three fundamentally different contributors:

  1. Upstream backpressure — time blocked on rx.recv().await waiting for the upstream plan to produce the next RecordBatch
  2. CPU encoding — Arrow → Parquet serialisation and column compression
  3. Object-store I/OBufWriter flushes at row-group boundaries and the final close()

DataSinkExec::metrics() is a transparent passthrough to ParquetSink::metrics(), so this value propagates unchanged up the plan tree. Any metrics walker that sums ElapsedCompute across operators will misattribute I/O latency and upstream wait time as compute time.

To Reproduce

No response

Expected behavior

elapsed_compute for ParquetSink should reflect only CPU time (Arrow → Parquet encoding and compression), consistent with how elapsed_compute is used in other operators. I/O time and upstream wait should either be tracked separately or excluded.

Additional context

ParquetSink has two write paths: a sequential path (one AsyncArrowWriter per file, active when allow_single_file_parallelism = false) and a parallel path (output_single_parquet_file_parallelized, active when allow_single_file_parallelism = true,
which is the default).

Sequential path. AsyncArrowWriter encodes batches one at a time. Encoding is synchronous (sync_writer.write(batch)?) and I/O is async, occurring only at row-group flush boundaries and on close(). The two phases are clearly separated inside the crate, but sync_writer is a private field — DataFusion cannot time them independently without changes to arrow-rs or an interception point at the AsyncFileWriter layer.

Parallel path. Each column in each row group is encoded in its own tokio task. A separate task assembles completed row groups and flushes them to the object store via a raw Box<dyn AsyncWrite>. CPU encoding is distributed across concurrent tasks and the I/O and CPU work are interleaved within the same concatenation loop — there is no clean trait boundary to intercept.

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No fields configured for Bug.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions