Rewrite the parquet output adapter manager#712
Open
arhamchopra wants to merge 8 commits into
Open
Conversation
ParquetWriter builds RecordBatches and hands them to a pluggable RecordBatchSink (onStart/onBatch/onFileChange/onStop). Removes the old C++ file-writer hierarchy and unifies output conversion via visitCspValueType. Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Remove dead StructColumnArrayBuilder and parquet_dict_basket_output_adapter, de-virtualize scheduleEndCycleEvent, propagate file metadata to per-column files in split mode, and expand output tests. Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Closure-based C++ sink writes parquet/IPC/split-column files directly, removing the per-batch C++<->Python hop. Adds FileExistsError. Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
From the multi-model review:
- delete the now-dead Python sink (_parquet_rb_writer.py, rb_sink wiring,
TestOutputSinkDirect) since file I/O is all C++ now
- RecordBatchFileSink: guard mkdir(""), fix close-path exception safety,
resolve compression via Arrow's Codec API (case-insensitive)
- honor an explicit writeTimestampColumn
- fail fast on single-file + dict basket
- add regression tests
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Remove the unused manager-level m_indexSink/setIndexSink (per-basket index sinks are unaffected) and stop all writers before destroying any in stop(). Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Encapsulate the scratch isSet reset behind StructField::clearIsSet, add a debug-only length assert in buildRecordBatch, and document that file_visitor runs synchronously on the engine thread. Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Rewrite the parquet output adapter for RecordBatch-based writing
Replaces the old per-format C++ file-writer hierarchy (
FileWriterWrapper/ParquetFileWriterWrapper/ArrowIPCFileWriterWrapper/FileWriterWrapperContainer) with a RecordBatch sink architecture:ParquetWriterbuilds ArrowRecordBatches and hands them to a smallRecordBatchSinkcallback interface, and a single closure-based C++ sink (makeFileSink) writes parquet files, Arrow IPC streams, and split-column directories. This mirrors the input-side rewrite in #704 and shares the same Arrow type machinery.Motivation
The old output path had:
FileWriterWrapperContainerto fan out to split-column filesDialectGenericListWriterInterfacefor list columns, duplicating type dispatch already implemented in the shared Arrow layerStructColumnArrayBuilder) that re-implemented nested-struct serialization the Arrow nodes already doparquet_dict_basket_output_adapter(dead since basket output goes through theparquet_dict_basket_writernode)The new implementation:
arrow::RecordBatchperbatch_sizerows and writes it through a 4-callback sink, so the file backend is a single, swappable componentArrowFieldWriter(viaArrowBackedArrayBuilder) for every column type — the same serialization code thestruct_to_record_batchesArrow node and the input adapter use (scalars, structs, nested structs, lists)Architecture
RecordBatchSink(new) is a struct of four callbacks (onStart/onBatch/onFileChange/onStop) — the only contract between the writer and the file backend.makeFileSink(new,RecordBatchFileSink.cpp) returns a closure-based sink (no class hierarchy) that owns overwrite checks, parent-directory creation, file rotation, compression, and the optionalfile_visitor. Compression is resolved through Arrow's ownarrow::util::CodecAPI rather than a hardcoded name map.ArrowBackedArrayBuilder(new) bridges csp's row-at-a-time "may-not-tick → null" model onto the batch-oriented sharedArrowFieldWriter, in two modes: scratch (single-field struct the tick writes into) and external (reads a field directly from a published struct).ParquetOutputAdapterManageris simplified to orchestrate the writer + per-basket dict writers and wire the sink (and a per-basket sink factory) at construction.What's removed
FileWriterWrapper/ParquetFileWriterWrapper/ArrowIPCFileWriterWrapper/FileWriterWrapperContainer— the old per-format C++ file-writer hierarchyDialectGenericListWriterInterface— list columns now go throughArrowBackedArrayBuilderStructColumnArrayBuilder— nested-struct columns are written by the sharedArrowFieldWriter::NestedStructWriterparquet_dict_basket_output_adapter— registered but unreachable dead adapterParquetOutputAdapter.cpp/ArrowSingleColumnArrayBuilder.hper-type boilerplate, folded into the shared Arrow writerBug fixes / hardening
Surfaced by a multi-model review of the new sink:
mkdir("")on a bare filename — writing to a relative path with no directory component (e.g."out.parquet") no longer fails withInvalid argument; the empty dirname is guarded.file_visitorruns (no double-close if it throws), and the output stream / all split sub-writers are always closed even if one close fails (no leaked file descriptors).arrow::util::Codec::GetCompressionType+IsAvailable(case-insensitive, tracks whatever the Arrow build supports, clear error otherwise) instead of a hardcoded lowercase map.writeTimestampColumnhonored — an explicitly requested timestamp column is no longer silently downgraded.split_columns_to_files=Truenow raises a clear error instead of a low-levelIOError.FileExistsError— writing over an existing file withallow_overwrite=Falseraises PythonFileExistsError.stop()flush/close all writers before destroying any of them.Performance
File I/O stays in C++ — there is no per-batch Python crossing on the write path. Measured against an intermediate pure-Python sink design (same harness/box, min-of-5):
Write-bound, "cheap-C++" workloads (struct/rotation, where little engine work amortizes the per-batch crossing) gain the most; multi-column scalar writes are already ~92% shared C++/engine work and move marginally. A whole-file accuracy harness (schema, row groups, compression codec, metadata, and values) confirms the C++ sink produces logically identical output to the Python sink across 19 scenarios.
API compatibility
The public Python API (
ParquetWriter,ParquetOutputConfig,publish,publish_struct,publish_dict_basket,filename_provider,file_visitor,file_metadata/column_metadata) is unchanged.csp/tests/adapters/test_parquet_output.py(65 tests, covering all scalar/struct/list/numpy types, batch-size→row-group counts, compression codecs, Arrow IPC, split-column, rotation, dict baskets, metadata, overwrite/FileExistsError) and the existingtest_parquet.pysuite pass.