Skip to content

Rewrite the parquet output adapter manager#712

Open
arhamchopra wants to merge 8 commits into
mainfrom
ac/parquet_output_adapter
Open

Rewrite the parquet output adapter manager#712
arhamchopra wants to merge 8 commits into
mainfrom
ac/parquet_output_adapter

Conversation

@arhamchopra

@arhamchopra arhamchopra commented Jun 12, 2026

Copy link
Copy Markdown
Collaborator

Rewrite the parquet output adapter for RecordBatch-based writing

Replaces the old per-format C++ file-writer hierarchy (FileWriterWrapper / ParquetFileWriterWrapper / ArrowIPCFileWriterWrapper / FileWriterWrapperContainer) with a RecordBatch sink architecture: ParquetWriter builds Arrow RecordBatches and hands them to a small RecordBatchSink callback interface, and a single closure-based C++ sink (makeFileSink) writes parquet files, Arrow IPC streams, and split-column directories. This mirrors the input-side rewrite in #704 and shares the same Arrow type machinery.

Motivation

The old output path had:

  • A per-format writer-wrapper class hierarchy plus a FileWriterWrapperContainer to fan out to split-column files
  • A separate DialectGenericListWriterInterface for list columns, duplicating type dispatch already implemented in the shared Arrow layer
  • Per-type column-builder code (StructColumnArrayBuilder) that re-implemented nested-struct serialization the Arrow nodes already do
  • A registered-but-unreachable parquet_dict_basket_output_adapter (dead since basket output goes through the parquet_dict_basket_writer node)

The new implementation:

  • Builds one arrow::RecordBatch per batch_size rows and writes it through a 4-callback sink, so the file backend is a single, swappable component
  • Reuses the shared ArrowFieldWriter (via ArrowBackedArrayBuilder) for every column type — the same serialization code the struct_to_record_batches Arrow node and the input adapter use (scalars, structs, nested structs, lists)
  • Keeps all file I/O in C++ (no per-batch Python / Arrow C Data Interface hop on the write path)
  • Bakes file- and column-level metadata into the Arrow schema in C++, preserved identically in single-file and split-column modes

Architecture

ParquetOutputAdapter (per published column / struct / list)
  └→ ArrowBackedArrayBuilder      — wraps the shared ArrowFieldWriter (scratch or external mode)
ParquetWriter (EndCycleListener)
  ├→ accumulates rows, builds arrow::RecordBatch every batch_size rows
  ├→ bakes file_metadata / column_metadata into the schema in start()
  └→ onStart(schema) / onBatch(rb) / onFileChange(path) / onStop()
RecordBatchSink                   — struct of 4 std::function callbacks
  └→ makeFileSink(...)            — writes files entirely in C++:
        single   : one .parquet / .arrow file with all columns
        split    : a directory, one file per column
        rotation : onFileChange closes the current file and opens the next

RecordBatchSink (new) is a struct of four callbacks (onStart/onBatch/onFileChange/onStop) — the only contract between the writer and the file backend.

makeFileSink (new, RecordBatchFileSink.cpp) returns a closure-based sink (no class hierarchy) that owns overwrite checks, parent-directory creation, file rotation, compression, and the optional file_visitor. Compression is resolved through Arrow's own arrow::util::Codec API rather than a hardcoded name map.

ArrowBackedArrayBuilder (new) bridges csp's row-at-a-time "may-not-tick → null" model onto the batch-oriented shared ArrowFieldWriter, in two modes: scratch (single-field struct the tick writes into) and external (reads a field directly from a published struct).

ParquetOutputAdapterManager is simplified to orchestrate the writer + per-basket dict writers and wire the sink (and a per-basket sink factory) at construction.

What's removed

  • FileWriterWrapper / ParquetFileWriterWrapper / ArrowIPCFileWriterWrapper / FileWriterWrapperContainer — the old per-format C++ file-writer hierarchy
  • DialectGenericListWriterInterface — list columns now go through ArrowBackedArrayBuilder
  • StructColumnArrayBuilder — nested-struct columns are written by the shared ArrowFieldWriter::NestedStructWriter
  • parquet_dict_basket_output_adapter — registered but unreachable dead adapter
  • A large slice of ParquetOutputAdapter.cpp / ArrowSingleColumnArrayBuilder.h per-type boilerplate, folded into the shared Arrow writer

Bug fixes / hardening

Surfaced by a multi-model review of the new sink:

  • mkdir("") on a bare filename — writing to a relative path with no directory component (e.g. "out.parquet") no longer fails with Invalid argument; the empty dirname is guarded.
  • Close-path exception safety — the current writer is reset before the user file_visitor runs (no double-close if it throws), and the output stream / all split sub-writers are always closed even if one close fails (no leaked file descriptors).
  • Compression resolution via Arrow — codecs are resolved through arrow::util::Codec::GetCompressionType + IsAvailable (case-insensitive, tracks whatever the Arrow build supports, clear error otherwise) instead of a hardcoded lowercase map.
  • Explicit writeTimestampColumn honored — an explicitly requested timestamp column is no longer silently downgraded.
  • Single-file + dict basket fails fast — publishing a dict basket without split_columns_to_files=True now raises a clear error instead of a low-level IOError.
  • FileExistsError — writing over an existing file with allow_overwrite=False raises Python FileExistsError.
  • Removed a dead manager-level index sink and made adapter-manager stop() flush/close all writers before destroying any of them.

Performance

File I/O stays in C++ — there is no per-batch Python crossing on the write path. Measured against an intermediate pure-Python sink design (same harness/box, min-of-5):

workload C++ sink Python sink speedup
single 10-col, batch 8192 ~535 ms 551 ms 1.03x
split 10-col, batch 8192 ~538 ms 561 ms 1.04x
struct 3-field, batch 8192 ~67 ms 135 ms 2.0x
single 10-col, batch 1024 ~529 ms 535 ms 1.01x

Write-bound, "cheap-C++" workloads (struct/rotation, where little engine work amortizes the per-batch crossing) gain the most; multi-column scalar writes are already ~92% shared C++/engine work and move marginally. A whole-file accuracy harness (schema, row groups, compression codec, metadata, and values) confirms the C++ sink produces logically identical output to the Python sink across 19 scenarios.

Note: a clean A/B rebuild of main in the shared build tree was blocked by a concurrent build, so the table compares the new C++ sink against the earlier Python-sink baseline rather than main directly. Because the C++ sink performs a strict subset of the Python sink's work (no capsule export/import, no per-batch pyarrow call), it is at parity-or-better by construction.

API compatibility

The public Python API (ParquetWriter, ParquetOutputConfig, publish, publish_struct, publish_dict_basket, filename_provider, file_visitor, file_metadata / column_metadata) is unchanged. csp/tests/adapters/test_parquet_output.py (65 tests, covering all scalar/struct/list/numpy types, batch-size→row-group counts, compression codecs, Arrow IPC, split-column, rotation, dict baskets, metadata, overwrite/FileExistsError) and the existing test_parquet.py suite pass.

ParquetWriter builds RecordBatches and hands them to a pluggable
RecordBatchSink (onStart/onBatch/onFileChange/onStop). Removes the old C++
file-writer hierarchy and unifies output conversion via visitCspValueType.

Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Remove dead StructColumnArrayBuilder and parquet_dict_basket_output_adapter,
de-virtualize scheduleEndCycleEvent, propagate file metadata to per-column
files in split mode, and expand output tests.

Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Closure-based C++ sink writes parquet/IPC/split-column files directly,
removing the per-batch C++<->Python hop. Adds FileExistsError.

Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
From the multi-model review:
- delete the now-dead Python sink (_parquet_rb_writer.py, rb_sink wiring,
  TestOutputSinkDirect) since file I/O is all C++ now
- RecordBatchFileSink: guard mkdir(""), fix close-path exception safety,
  resolve compression via Arrow's Codec API (case-insensitive)
- honor an explicit writeTimestampColumn
- fail fast on single-file + dict basket
- add regression tests

Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Remove the unused manager-level m_indexSink/setIndexSink (per-basket index
sinks are unaffected) and stop all writers before destroying any in stop().

Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Encapsulate the scratch isSet reset behind StructField::clearIsSet, add a
debug-only length assert in buildRecordBatch, and document that file_visitor
runs synchronously on the engine thread.

Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
@arhamchopra arhamchopra marked this pull request as ready for review June 13, 2026 02:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant