From 6411732b72a46c567189e1a1895e016974df04b7 Mon Sep 17 00:00:00 2001 From: ale <ale@incal.net> Date: Fri, 29 Dec 2023 22:02:03 +0000 Subject: [PATCH] Allow configuration of performance-related Parquet parameters Compression, page size, and row group size. --- writer/marshal.go | 48 ++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 43 insertions(+), 5 deletions(-) diff --git a/writer/marshal.go b/writer/marshal.go index 13b765e..675f40a 100644 --- a/writer/marshal.go +++ b/writer/marshal.go @@ -1,6 +1,10 @@ package writer import ( + "errors" + "flag" + "strings" + "git.autistici.org/ai3/attic/pqlog/schema" "github.com/xitongsys/parquet-go/common" "github.com/xitongsys/parquet-go/layout" @@ -10,6 +14,42 @@ import ( pqwriter "github.com/xitongsys/parquet-go/writer" ) +var ( + parquetCompression = parquetCompressionFlag{codec: parquet.CompressionCodec_SNAPPY} + parquetRowGroupSize = flag.Int("parquet-row-group-size", 32, "Parquet row group size (MB)") + parquetPageSize = flag.Int("parquet-page-size", 8192, "Parquet page size") +) + +func init() { + flag.Var(&parquetCompression, "parquet-compression", "Parquet compression `algorithm`") +} + +type parquetCompressionFlag struct { + codec parquet.CompressionCodec +} + +func (p *parquetCompressionFlag) String() string { + return p.codec.String() +} + +func (p *parquetCompressionFlag) Set(value string) error { + switch strings.ToLower(value) { + case "none": + p.codec = parquet.CompressionCodec_UNCOMPRESSED + case "snappy": + p.codec = parquet.CompressionCodec_SNAPPY + case "gzip": + p.codec = parquet.CompressionCodec_GZIP + case "lz4": + p.codec = parquet.CompressionCodec_LZ4 + case "zstd": + p.codec = parquet.CompressionCodec_ZSTD + default: + return errors.New("unknown compression") + } + return nil +} + // Parquet marshaling code. Due to our peculiar data layout, there is // nothing that immediately serves our purpose in the parquet-go // library: the flattening of the data means that we can't simply feed @@ -36,9 +76,9 @@ func newTuplesWriter(schemaHandler *schema.ParquetSchemaHandler, pfile source.Pa res := new(pqwriter.ParquetWriter) res.SchemaHandler = schemaHandler.SchemaHandler res.PFile = pfile - res.PageSize = 8 * 1024 //8K - res.RowGroupSize = 32 * 1024 * 1024 //32M - res.CompressionType = parquet.CompressionCodec_ZSTD + res.PageSize = int64(*parquetPageSize) + res.RowGroupSize = int64(*parquetRowGroupSize) * 1024 * 1024 + res.CompressionType = parquetCompression.codec res.PagesMapBuf = make(map[string][]*layout.Page) res.DictRecs = make(map[string]*layout.DictRecType) res.NP = np @@ -78,8 +118,6 @@ func genMarshalTuples(pschema *schema.ParquetSchemaHandler) func([]interface{}, table.RepetitionLevels = make([]int32, 0, len(records)) table.DefinitionLevels = make([]int32, 0, len(records)) - //log.Printf("table %s: max_dl=%d, max_rl=%d", pathStr, table.MaxDefinitionLevel, table.MaxRepetitionLevel) - // Iterate over records. var dl int32 = 1 var rl int32 = 0 -- GitLab