Skip to content
Snippets Groups Projects
Commit 6411732b authored by ale's avatar ale
Browse files

Allow configuration of performance-related Parquet parameters

Compression, page size, and row group size.
parent 30482aaf
No related branches found
No related tags found
No related merge requests found
package writer
import (
"errors"
"flag"
"strings"
"git.autistici.org/ai3/attic/pqlog/schema"
"github.com/xitongsys/parquet-go/common"
"github.com/xitongsys/parquet-go/layout"
......@@ -10,6 +14,42 @@ import (
pqwriter "github.com/xitongsys/parquet-go/writer"
)
var (
parquetCompression = parquetCompressionFlag{codec: parquet.CompressionCodec_SNAPPY}
parquetRowGroupSize = flag.Int("parquet-row-group-size", 32, "Parquet row group size (MB)")
parquetPageSize = flag.Int("parquet-page-size", 8192, "Parquet page size")
)
func init() {
flag.Var(&parquetCompression, "parquet-compression", "Parquet compression `algorithm`")
}
type parquetCompressionFlag struct {
codec parquet.CompressionCodec
}
func (p *parquetCompressionFlag) String() string {
return p.codec.String()
}
func (p *parquetCompressionFlag) Set(value string) error {
switch strings.ToLower(value) {
case "none":
p.codec = parquet.CompressionCodec_UNCOMPRESSED
case "snappy":
p.codec = parquet.CompressionCodec_SNAPPY
case "gzip":
p.codec = parquet.CompressionCodec_GZIP
case "lz4":
p.codec = parquet.CompressionCodec_LZ4
case "zstd":
p.codec = parquet.CompressionCodec_ZSTD
default:
return errors.New("unknown compression")
}
return nil
}
// Parquet marshaling code. Due to our peculiar data layout, there is
// nothing that immediately serves our purpose in the parquet-go
// library: the flattening of the data means that we can't simply feed
......@@ -36,9 +76,9 @@ func newTuplesWriter(schemaHandler *schema.ParquetSchemaHandler, pfile source.Pa
res := new(pqwriter.ParquetWriter)
res.SchemaHandler = schemaHandler.SchemaHandler
res.PFile = pfile
res.PageSize = 8 * 1024 //8K
res.RowGroupSize = 32 * 1024 * 1024 //32M
res.CompressionType = parquet.CompressionCodec_ZSTD
res.PageSize = int64(*parquetPageSize)
res.RowGroupSize = int64(*parquetRowGroupSize) * 1024 * 1024
res.CompressionType = parquetCompression.codec
res.PagesMapBuf = make(map[string][]*layout.Page)
res.DictRecs = make(map[string]*layout.DictRecType)
res.NP = np
......@@ -78,8 +118,6 @@ func genMarshalTuples(pschema *schema.ParquetSchemaHandler) func([]interface{},
table.RepetitionLevels = make([]int32, 0, len(records))
table.DefinitionLevels = make([]int32, 0, len(records))
//log.Printf("table %s: max_dl=%d, max_rl=%d", pathStr, table.MaxDefinitionLevel, table.MaxRepetitionLevel)
// Iterate over records.
var dl int32 = 1
var rl int32 = 0
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment