[SYSTEMDS-3929] Speed up Parquet frame reader/writer#2528
Conversation
Rewrites the Parquet frame reader to read columns via parquet's column API (ColumnReadStoreImpl, ColumnReader) instead of creating a Group object per row. On TPC-H lineitem (~30M rows), read time went from 88.9s to 34.5s (2.6x faster). Also rewrites the writer around a custom WriteSupport, removing the per-row Group allocation, adding INT96 timestamp decoding, and fixing the parallel reader to use the sequential implementation instead of reimplementing row iteration per thread. ParquetWriter batches by buffered size internally, so the old writer's manual batch buffer was redundant and has been removed. Compression, dictionary encoding, and row-group size are benchmarked and configurable. The old Group-based implementations are kept under test scope as a benchmark baseline. Adds tests covering null handling, INT96 decoding, and round trips against public Parquet files.
|
Im curious how this compares to the Delta IO i have in #2515 . There the main bottleneck is decoding the Parquet files. |
There was a problem hiding this comment.
If possible avoid adding parquet files to the system,
instead rely on some known good Parquet writer to generate your initial inputs. e.g. Spark has a writer you can use for making inputs.
There was a problem hiding this comment.
Parquet files have been deleted and instead, Spark is used to write the test files. Will take a look at how this compares to the Delta IO solution in #2515.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #2528 +/- ##
=========================================
Coverage 71.66% 71.66%
+ Complexity 49338 49328 -10
=========================================
Files 1580 1580
Lines 190516 190579 +63
Branches 37364 37368 +4
=========================================
+ Hits 136525 136582 +57
- Misses 43464 43472 +8
+ Partials 10527 10525 -2 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
[SYSTEMDS-3929] Speed up Parquet frame reader/writer
The old reader created a
Groupobject per row, boxing every decoded value into a wrapper object inside it, even though SystemDS frames are always flat. The parallel reader also converted every boxed value to a string and re-parsed it back to its target type inFrameBlock.set. This PR reads columns via parquet's internal column API (ColumnReadStoreImpl/ColumnReader) instead, avoiding theGroupallocation. On TPC-H lineitem (sf=5, ~30M rows, median of 7 runs on a cloud VM), read time went from 88.9s to 34.5s (2.6x faster).Also rewrites the writer to use a custom
WriteSupportwith tuned defaults, adds INT96 timestamp read support, and fixes row offsets for the parallel reader.The old writer had a tunable batch of 1000 rows. The batch size was benchmarked looking for a better default, but
ParquetWriteralready batches internally by buffered byte size, making the manual row-count batch on top of it redundant, so the new writer drops it and instead exposes that internal buffer size as the tunable row-group size. Compression codec and dictionary-encoding were benchmarked as well:ZSTDand per-column dictionary encoding (STRING_ONLY) are used as defaults. Row group size was benchmarked too and kept at parquet's default of 128MB. Note the batch-size comparison runs uncompressed to match the legacy writer's codec, while the row-group benchmark uses the ZSTD default, so absolute times differ between the two plots.TPC-H lineitem's comment column is ~84% unique, so dictionary encoding on it doesn't pay off, which is why
ALL_OFFwins on this benchmark. Typical frame workloads are assumed to have low-cardinality string columns where dictionaries pay off, soSTRING_ONLYis kept as the default.Also adds public Parquet test files under
src/test/resources/datasets/parquet/(duckdb, apache/parquet-testing, Titanic from HuggingFace) as a check against real-world files, and tests covering INT96 decoding, null handling and parallel/sequential equivalence.Possible next step: row-group-level parallel reads within a single file.
Limitations: INT96 decoding truncates nanosecond precision to milliseconds, and there's no logical-type mapping for dates/timestamps since FrameBlock has no such types.