From 05afde3558c603baa6719f438ec68938a82a9db6 Mon Sep 17 00:00:00 2001 From: ale <ale@incal.net> Date: Fri, 29 Dec 2023 18:16:28 +0000 Subject: [PATCH] Fix the List marshaler Generate the correct RL / DL values for our tuples, including repeated values (lists). --- README.md | 179 +++++++++++++++++++++++++++++++++++++++++ cmd/pqlogd/main.go | 2 +- http/ingest.go | 16 +--- http/query.go | 36 ++++++--- schema/parquet.go | 6 +- schema/schema.go | 55 ------------- schema/schema_debug.go | 73 +++++++++++++++++ writer/marshal.go | 138 +++++++++++++++++-------------- writer/source.go | 27 +++++++ writer/writer.go | 52 ++++++++---- 10 files changed, 425 insertions(+), 159 deletions(-) create mode 100644 README.md create mode 100644 schema/schema_debug.go create mode 100644 writer/source.go diff --git a/README.md b/README.md new file mode 100644 index 0000000..3953f95 --- /dev/null +++ b/README.md @@ -0,0 +1,179 @@ +# pqlog + +*pqlog* is an indexing and retrieval service for structured and +unstructured logs (e.g. *syslog* and more). Its primary capability is +its ability to *scale down* for small installations, using very few +resources while maintaining a certain level of usefulness, offering an +alternative to heavyweight stacks like ELK in this scenario. + +## Overview + +The system's functionality is split into two parts: + +* *Ingestion* (or *indexing*), processing incoming logs and writing + them to log storage; +* *Query*, the ability to interrogate the log storage analytically. + +### Ingestion + +Ingestion presents an interface for external clients (syslog daemons, +fluentd, *etc*) to send logs for indexing, in batches. Incoming log +entries are converted to a unified "flattened" schema. + +The flattened schema maps structured attributes to a flat, one-level +structure by using their paths (dot-separated) as field names. As an +example, an input record such as: + +```json +{ + "a": { + "foo": "bar", + "baz": "arg" + }, + "b": 42 +} +``` + +would get internally converted to a "flat" structure like the following: + +```json +{ + "a.foo": "bar", + "a.baz": "arg", + "b": 42 +} +``` + +So, when querying the database, one should refer to the *foo* field as +*a.foo*. + +The flattened schema supports lists of scalar values (numbers, +strings), but it does not support nesting structures within lists, as +it would not know how to unroll them. For instance, the following +record has a problem: + +```json +{ + "a": [ + { + "foo": "bar" + }, + { + "baz": "arg" + } + ] +} +``` + +In this situation the ingestion stage will just ignore the problematic +field (*a* in the above example). + +The flattening is ultimately necessary because of an implementation +detail: the underlying query engine (DuckDB), although it fully +supports structured schemas, can have trouble reconciling different +STRUCTs across different Parquet files. But it's a TODO to get rid of +this limitation. + +The flattened records are then written to [Parquet]() files, which are +rotated periodically (and when they reach a certain size). These files +can be stored remotely, although the current implementation only +supports local filesystem. + +The ingestion API endpoint is at */ingest*, and it expects a POST +request with a ND-JSON request body: newline-delimited JSON-encoded +records, no additional headers or footers. + +### Schema unification + +As it ingests records, *pqlog* learns the data types of the incoming +attributes, determining an aggregate schema for all the records. Since +in the "flat" schema all nodes are leafs, it knows about the following +fundamental data types: + +* `STRING` +* `NUMBER` +* `TIMESTAMP` is a RFC3339-formatted string +* `LIST` is just a repeated `STRING` + +The mere existence of an aggregate schema, necessary in order to query +the data, imposes some restrictions on the structure of the data. It +is not after all unreasonable to expect the data to have some level of +consistency, if you want to be able to query it in aggregate. + +When the schema detector finds conflicting data types for a field, it +will try to promote them to a common type where possible, handling +common cases such as STRING / NUMBER conflicts by upgrading the values +to STRING. But where such type promotions aren't possible, the +problematic field will be dropped from the schema. + +### Querying + +The query engine is [DuckDB](https://duckdb.org), which can just [read +the Parquet files, even remotely]() and run fast analytical queries on +them. + +One thing of note is that, in the current implementation, it is only +possible to query fully written Parquet files. The implication is that +you won't see logs until the ingestion server decides it's time to +finalize the current Parquet file. For this reason, it might be +sensible to set the *--rotation-interval* option to a few minutes. + +The query API is at */query* and it takes a full SQL query as the *q* +parameter. The response will be JSON-encoded. Since the table to query +is created on-the-fly with every request, its name is not known in +advance to the caller: the SQL query should contain the placeholder +`$table`, which will be replaced at execution time with the name of +the temporary table to use. + +## Running + +The service ships as a single binary, *pqlogd*. + +From an operational perspective, the two components of the service +(ingestion and query) can be run independently, and are controlled by +the command-line options *--enable-ingest* and *--enable-query*, +respectively. In all cases, the server will listen on the HTTP address +specified by *--addr*. + +### Ingestion service + +The ingestion service needs to know where to store its output. This is +controlled by the *--dir* command-line option, which currently only +accepts a local path (S3 support TODO). + +Log files be written in this directory, rotated every +*--rotation-interval*. + +The schema detector works better if it can persist its internal state +and recover it on startup: to make it do so, specify the +*--state-file* option. + +An example: + +``` +$ pqlogd --dir ./data --enable-ingest --addr :3000 & +$ curl -s -XPOST --data-binary '{"a": "foo", "b": 42}' http://localhost:3000/ingest +$ kill %1 # Causes the Parquet files to be finalized +``` + +### Query service + +The query service also needs to know where the Parquet data files are +located, with the *--dir* option. + +Continuing the example above: + +``` +$ pqlogd --dir ./data --enable-query --addr :3001 & +$ curl -s -XPOST -d "q=SELECT COUNT(*) AS c FROM \$table WHERE a='foo'" http://localhost:3001/query +[{"c": 1}] +``` + +### Debugging + +The server offers some debugging endpoints which might be useful to +understand what it is doing: + +* */schema* will return the current schema in JSON format +* */debug/schema* will return a human-readable dump of the internal + state of the schema guesser diff --git a/cmd/pqlogd/main.go b/cmd/pqlogd/main.go index 98fa7f9..3d0efde 100644 --- a/cmd/pqlogd/main.go +++ b/cmd/pqlogd/main.go @@ -49,7 +49,7 @@ func main() { log.Fatal(err) } - w := writer.NewRotating(pathGen, *rotationInterval) + w := writer.NewRotating(writer.NewLocalSource(), pathGen, *rotationInterval) defer w.Close() isrv := pqhttp.NewIngest(w, *stateFile) diff --git a/http/ingest.go b/http/ingest.go index 5e874d8..9882166 100644 --- a/http/ingest.go +++ b/http/ingest.go @@ -6,7 +6,6 @@ import ( "io" "log" "net/http" - "strings" "sync" "git.autistici.org/ai3/attic/pqlog/schema" @@ -105,7 +104,7 @@ func (s *IngestServer) ingestBatch(r io.Reader) (int, error) { s.numSchemaChanges++ s.schema = s.guesser.NewSchema() s.pschema = s.schema.ParquetSchemaHandler() - if err := s.writer.Reopen(s.pschema.SchemaHandler); err != nil { + if err := s.writer.Reopen(s.pschema); err != nil { return 0, err } if s.stateFile != "" { @@ -130,10 +129,6 @@ func (s *IngestServer) IngestHandler(w http.ResponseWriter, req *http.Request) { http.Error(w, "Bad method", http.StatusMethodNotAllowed) return } - if !strings.HasPrefix(req.Header.Get("Content-Type"), "application/json") { - http.Error(w, "Bad Content-Type", http.StatusBadRequest) - return - } n, err := s.ingestBatch(req.Body) if err != nil { @@ -160,12 +155,9 @@ func (s *IngestServer) DebugHandler(w http.ResponseWriter, req *http.Request) { func (s *IngestServer) SchemaHandler(w http.ResponseWriter, req *http.Request) { s.mx.Lock() - out := struct { - Version int64 `json:"version"` - Fields map[string]string `json:"fields"` - }{ - Version: s.schema.Version, - Fields: s.schema.Debug(), + var out *schema.SchemaDebugInfo + if s.schema != nil { + out = s.schema.Debug() } s.mx.Unlock() diff --git a/http/query.go b/http/query.go index b614ed0..5551336 100644 --- a/http/query.go +++ b/http/query.go @@ -1,12 +1,14 @@ package pqhttp import ( + "bufio" "context" "database/sql" "encoding/json" "errors" "fmt" "io" + "log" "math/rand" "net/http" "path/filepath" @@ -45,18 +47,18 @@ func (q *QueryServer) Close() { } func (q *QueryServer) runQuery(ctx context.Context, w io.Writer, query string) error { - table := fmt.Sprintf("logs-%08x", rand.Int63()) + table := fmt.Sprintf("logs%08x", rand.Int63()) query = strings.Replace(query, "$table", table, -1) tx, err := q.db.BeginTx(ctx, nil) if err != nil { - return err + return fmt.Errorf("failed to create transaction: %w", err) } defer tx.Rollback() _, err = tx.Exec(fmt.Sprintf("CREATE VIEW %s AS SELECT * FROM read_parquet('%s', union_by_name=true)", table, q.glob)) if err != nil { - return err + return fmt.Errorf("failed to create DuckDB view: %w", err) } defer func() { // Well, ok, we have the Rollback() but. @@ -67,30 +69,41 @@ func (q *QueryServer) runQuery(ctx context.Context, w io.Writer, query string) e rows, err := tx.Query(query) if err != nil { // Report this as a client-originated error. - return newQueryError(err) + return newQueryError(fmt.Errorf("query execution error: %w", err)) } defer rows.Close() - io.WriteString(w, "{\"status\":\"ok\",\"results\":[") + bw := bufio.NewWriter(w) + + _, _ = io.WriteString(bw, "{\"status\":\"ok\",\"results\":[") cols, _ := rows.ColumnTypes() + first := true for rows.Next() { + if first { + first = false + } else { + _, _ = io.WriteString(bw, ",") + } + values := make([]any, len(cols)) for i := 0; i < len(cols); i++ { - values[i] = reflect.Zero(cols[i].ScanType()).Interface() + value := reflect.Zero(cols[i].ScanType()).Interface() + values[i] = &value } if err := rows.Scan(values...); err != nil { - return err + return fmt.Errorf("Scan error: %w", err) } row := make(map[string]any, len(cols)) for i := 0; i < len(cols); i++ { row[cols[i].Name()] = values[i] } - if err := json.NewEncoder(w).Encode(row); err != nil { - return err + if err := json.NewEncoder(bw).Encode(row); err != nil { + return fmt.Errorf("JSON encode error: %w", err) } - io.WriteString(w, ",\n") } - io.WriteString(w, "]}\n") + _, _ = io.WriteString(bw, "]}\n") + + bw.Flush() return rows.Err() } @@ -115,6 +128,7 @@ func (q *QueryServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { if errors.Is(err, &queryError{}) { status = http.StatusBadRequest } + log.Printf("query error: %v", err) http.Error(w, err.Error(), status) } } diff --git a/schema/parquet.go b/schema/parquet.go index 4ed5189..cb2149f 100644 --- a/schema/parquet.go +++ b/schema/parquet.go @@ -92,7 +92,7 @@ func (s *Schema) ParquetSchemaHandler() *ParquetSchemaHandler { func (s *ParquetSchemaHandler) TuplesToRow(tuples []Tuple) []any { row := make([]any, s.numFields) - copy(row, s.zeros) + //copy(row, s.zeros) for _, t := range tuples { idx := s.fieldIndexMap[t.key] value := t.value @@ -103,3 +103,7 @@ func (s *ParquetSchemaHandler) TuplesToRow(tuples []Tuple) []any { } return row } + +func (s *ParquetSchemaHandler) Zero(i int) any { + return s.zeros[i] +} diff --git a/schema/schema.go b/schema/schema.go index 031f2b5..64d0da0 100644 --- a/schema/schema.go +++ b/schema/schema.go @@ -135,26 +135,6 @@ func (g typeGuesser) datatype() (Type, upcastFunc, bool) { return t, nil, true } -func (g typeGuesser) debugState() ColumnDebugState { - var n int - var ds ColumnDebugState - for i := 0; i < maxTypeBits; i++ { - j := Type(1 << i) - if g.isSet(j) { - n++ - ds.Types = append(ds.Types, j.String()) - } - } - if n > 1 { - s := "type conflict" - if up, _, ok := g.supportedUpcast(); ok { - s = fmt.Sprintf("upcast to %s", up.String()) - } - ds.Notes = s - } - return ds -} - // The Guesser keeps track of the columns we see (after // flattening the input) and their associated data types, and infers a // schema over time, to the extent where it is possible to do so. @@ -220,32 +200,6 @@ func (g *Guesser) Save(path string) error { return json.NewEncoder(f).Encode(&sg) } -// ColumnDebugState holds information about a specific column. -type ColumnDebugState struct { - Types []string `json:"types"` - Notes string `json:"notes,omitempty"` -} - -// GuesserDebugState represents the state of the Guesser in human-readable form. -type GuesserDebugState struct { - Columns map[string]ColumnDebugState `json:"fields"` - Errors map[string]string `json:"errors,omitempty"` -} - -func (g *Guesser) Debug() *GuesserDebugState { - state := GuesserDebugState{ - Columns: make(map[string]ColumnDebugState), - Errors: make(map[string]string), - } - for name, col := range g.columns { - state.Columns[name] = col.debugState() - } - for name, err := range g.errors { - state.Errors[name] = err - } - return &state -} - // This structure holds the inferred schema of the datastream at a // specific point in time. type Schema struct { @@ -254,15 +208,6 @@ type Schema struct { upcast map[string]upcastFunc } -// Debug returns a map representation of the schema. -func (s *Schema) Debug() map[string]string { - deb := make(map[string]string) - for name, col := range s.columns { - deb[name] = col.String() - } - return deb -} - // GetSchema returns the best possible version of a schema given our // current data type guesses. func (g *Guesser) NewSchema() *Schema { diff --git a/schema/schema_debug.go b/schema/schema_debug.go new file mode 100644 index 0000000..d12d452 --- /dev/null +++ b/schema/schema_debug.go @@ -0,0 +1,73 @@ +package schema + +import "fmt" + +func (g typeGuesser) debugState() ColumnDebugState { + var n int + var ds ColumnDebugState + for i := 0; i < maxTypeBits; i++ { + j := Type(1 << i) + if g.isSet(j) { + n++ + ds.Types = append(ds.Types, j.String()) + } + } + if n > 1 { + s := "type conflict" + if up, _, ok := g.supportedUpcast(); ok { + s = fmt.Sprintf("upcast to %s", up.String()) + } + ds.Notes = s + } + return ds +} + +// ColumnDebugState holds information about a specific column. +type ColumnDebugState struct { + Types []string `json:"types"` + Notes string `json:"notes,omitempty"` +} + +// GuesserDebugState represents the state of the Guesser in human-readable form. +type GuesserDebugState struct { + Columns map[string]ColumnDebugState `json:"fields"` + Errors map[string]string `json:"errors,omitempty"` +} + +// Debug returns information on the Guesser internals. +func (g *Guesser) Debug() *GuesserDebugState { + state := GuesserDebugState{ + Columns: make(map[string]ColumnDebugState), + Errors: make(map[string]string), + } + for name, col := range g.columns { + state.Columns[name] = col.debugState() + } + for name, err := range g.errors { + state.Errors[name] = err + } + return &state +} + +type SchemaFieldDebugInfo struct { + Name string `json:"name"` + Type string `json:"type"` +} + +type SchemaDebugInfo struct { + Version int64 `json:"version"` + Fields []SchemaFieldDebugInfo `json:"fields"` +} + +// Debug returns a human-friendly representation of the schema. +func (s *Schema) Debug() *SchemaDebugInfo { + var info SchemaDebugInfo + info.Version = s.Version + for name, col := range s.columns { + info.Fields = append(info.Fields, SchemaFieldDebugInfo{ + Name: name, + Type: col.String(), + }) + } + return &info +} diff --git a/writer/marshal.go b/writer/marshal.go index 726b804..13b765e 100644 --- a/writer/marshal.go +++ b/writer/marshal.go @@ -1,6 +1,7 @@ package writer import ( + "git.autistici.org/ai3/attic/pqlog/schema" "github.com/xitongsys/parquet-go/common" "github.com/xitongsys/parquet-go/layout" "github.com/xitongsys/parquet-go/parquet" @@ -31,13 +32,13 @@ import ( // reason NewSchemaHandlerFromSchemaHandler (which gets called in that // case) is not 'transparent' and returns something different than the // original schema... -func newTuplesWriter(schemaHandler *pqschema.SchemaHandler, pfile source.ParquetFile, np int64) (*pqwriter.ParquetWriter, error) { +func newTuplesWriter(schemaHandler *schema.ParquetSchemaHandler, pfile source.ParquetFile, np int64) (*pqwriter.ParquetWriter, error) { res := new(pqwriter.ParquetWriter) - res.SchemaHandler = schemaHandler + res.SchemaHandler = schemaHandler.SchemaHandler res.PFile = pfile - res.PageSize = 8 * 1024 //8K - res.RowGroupSize = 128 * 1024 * 1024 //128M - res.CompressionType = parquet.CompressionCodec_SNAPPY + res.PageSize = 8 * 1024 //8K + res.RowGroupSize = 32 * 1024 * 1024 //32M + res.CompressionType = parquet.CompressionCodec_ZSTD res.PagesMapBuf = make(map[string][]*layout.Page) res.DictRecs = make(map[string]*layout.DictRecType) res.NP = np @@ -46,75 +47,88 @@ func newTuplesWriter(schemaHandler *pqschema.SchemaHandler, pfile source.Parquet res.Footer.Schema = append(res.Footer.Schema, res.SchemaHandler.SchemaElements...) res.Offset = 4 _, err := res.PFile.Write([]byte("PAR1")) - res.MarshalFunc = marshalTuples + res.MarshalFunc = genMarshalTuples(schemaHandler) return res, err } // marshalTuples is the Parquet marshaler for the tuplesToRow() format. -func marshalTuples(records []interface{}, schemaHandler *pqschema.SchemaHandler) (*map[string]*layout.Table, error) { - res := make(map[string]*layout.Table) - if ln := len(records); ln <= 0 { - return &res, nil - } +func genMarshalTuples(pschema *schema.ParquetSchemaHandler) func([]interface{}, *pqschema.SchemaHandler) (*map[string]*layout.Table, error) { + return func(records []interface{}, schemaHandler *pqschema.SchemaHandler) (*map[string]*layout.Table, error) { + res := make(map[string]*layout.Table) + if ln := len(records); ln <= 0 { + return &res, nil + } - // Iterate over columns. - for i := 0; i < len(records[0].([]interface{})); i++ { - pathStr := schemaHandler.GetRootInName() + common.PAR_GO_PATH_DELIMITER + schemaHandler.Infos[i+1].InName - table := layout.NewEmptyTable() - res[pathStr] = table - table.Path = common.StrToPath(pathStr) + // Iterate over columns. + for i := 0; i < len(records[0].([]interface{})); i++ { + pathStr := schemaHandler.GetRootInName() + common.PAR_GO_PATH_DELIMITER + schemaHandler.Infos[i+1].InName + table := layout.NewEmptyTable() + res[pathStr] = table + table.Path = common.StrToPath(pathStr) - schema := schemaHandler.SchemaElements[schemaHandler.MapIndex[pathStr]] - isOptional := true - if *schema.RepetitionType != parquet.FieldRepetitionType_OPTIONAL { - isOptional = false - } + schema := schemaHandler.SchemaElements[schemaHandler.MapIndex[pathStr]] - if isOptional { - table.MaxDefinitionLevel = 1 - } else { - table.MaxDefinitionLevel = 0 - } + table.MaxDefinitionLevel, _ = schemaHandler.MaxDefinitionLevel(table.Path) + table.MaxRepetitionLevel, _ = schemaHandler.MaxRepetitionLevel(table.Path) + table.RepetitionType = schema.GetRepetitionType() + table.Schema = schemaHandler.SchemaElements[schemaHandler.MapIndex[pathStr]] + table.Info = schemaHandler.Infos[i+1] + // Pre-allocate these arrays for efficiency + table.Values = make([]interface{}, 0, len(records)) + table.RepetitionLevels = make([]int32, 0, len(records)) + table.DefinitionLevels = make([]int32, 0, len(records)) - table.MaxRepetitionLevel = 0 - table.RepetitionType = parquet.FieldRepetitionType_OPTIONAL - table.Schema = schemaHandler.SchemaElements[schemaHandler.MapIndex[pathStr]] - table.Info = schemaHandler.Infos[i+1] - // Pre-allocate these arrays for efficiency - table.Values = make([]interface{}, 0, len(records)) - table.RepetitionLevels = make([]int32, 0, len(records)) - table.DefinitionLevels = make([]int32, 0, len(records)) + //log.Printf("table %s: max_dl=%d, max_rl=%d", pathStr, table.MaxDefinitionLevel, table.MaxRepetitionLevel) - // Iterate over records. - for j := 0; j < len(records); j++ { - var val any - if rec := records[j].([]interface{})[i]; rec != nil { - //val = types.InterfaceToParquetType(rec, table.Schema.Type) - val = rec - } - switch tval := val.(type) { - case []any: - // "Unroll" lists into repeated string values. - var rl int32 - if len(tval) < 1 { - tval = []any{""} - } - for _, lval := range tval { - table.Values = append(table.Values, lval) - table.DefinitionLevels = append(table.DefinitionLevels, 1) - table.RepetitionLevels = append(table.RepetitionLevels, rl) - rl++ - } - default: - table.Values = append(table.Values, val) - table.RepetitionLevels = append(table.RepetitionLevels, 0) - if val == nil || !isOptional { - table.DefinitionLevels = append(table.DefinitionLevels, 0) + // Iterate over records. + var dl int32 = 1 + var rl int32 = 0 + for j := 0; j < len(records); j++ { + val := records[j].([]interface{})[i] + + if val == nil { + encodeNil(table, pschema.Zero(i), dl, rl) } else { - table.DefinitionLevels = append(table.DefinitionLevels, 1) + encode(table, val, dl, rl) } } } + return &res, nil + } +} + +func encode(table *layout.Table, value any, dl, rl int32) { + switch tvalue := value.(type) { + case []any: + encodeSlice(table, tvalue, dl, rl) + default: + encodeValue(table, value, dl, rl) + } +} + +func encodeValue(table *layout.Table, value any, dl, rl int32) { + //value = types.InterfaceToParquetType(value, table.Schema.Type) + table.Values = append(table.Values, value) + table.DefinitionLevels = append(table.DefinitionLevels, dl) + table.RepetitionLevels = append(table.RepetitionLevels, rl) +} + +func encodeNil(table *layout.Table, value any, dl, rl int32) { + table.Values = append(table.Values, value) + table.DefinitionLevels = append(table.DefinitionLevels, dl-1) + table.RepetitionLevels = append(table.RepetitionLevels, rl) +} + +func encodeSlice(table *layout.Table, l []any, dl, rl int32) { + if len(l) == 0 { + encode(table, nil, dl, rl) + return + } + for i := 0; i < len(l); i++ { + var nodeRL = rl + 1 + if i == 0 { + nodeRL = rl + } + encodeValue(table, l[i], dl, nodeRL) } - return &res, nil } diff --git a/writer/source.go b/writer/source.go new file mode 100644 index 0000000..b449dfc --- /dev/null +++ b/writer/source.go @@ -0,0 +1,27 @@ +package writer + +import ( + "os" + + "github.com/xitongsys/parquet-go-source/local" + "github.com/xitongsys/parquet-go/source" +) + +type Source interface { + Create(string) (source.ParquetFile, error) + Rename(string, string) error +} + +type localSource struct{} + +func NewLocalSource() Source { + return new(localSource) +} + +func (l *localSource) Create(path string) (source.ParquetFile, error) { + return local.NewLocalFileWriter(path) +} + +func (l *localSource) Rename(old, new string) error { + return os.Rename(old, new) +} diff --git a/writer/writer.go b/writer/writer.go index 223338a..fdc8173 100644 --- a/writer/writer.go +++ b/writer/writer.go @@ -5,16 +5,15 @@ import ( "log" "time" + "git.autistici.org/ai3/attic/pqlog/schema" "git.autistici.org/ai3/attic/pqlog/writer/pathgen" - "github.com/xitongsys/parquet-go-source/local" - pqschema "github.com/xitongsys/parquet-go/schema" "github.com/xitongsys/parquet-go/source" pqwriter "github.com/xitongsys/parquet-go/writer" ) // Writer is the interface for our restartable ParquetWriter. type Writer interface { - Reopen(*pqschema.SchemaHandler) error + Reopen(*schema.ParquetSchemaHandler) error Write(any) error Close() } @@ -23,15 +22,18 @@ type Writer interface { // Parquet files, switching to a new one whenever the schema changes. // It guarantees the creation of consistent Parquet files. type ParquetWriter struct { - pg *pathgen.Generator + pg *pathgen.Generator + src Source - lw source.ParquetFile - pw *pqwriter.ParquetWriter + lw source.ParquetFile + pw *pqwriter.ParquetWriter + renameTo string } -func New(pg *pathgen.Generator) *ParquetWriter { +func New(src Source, pg *pathgen.Generator) *ParquetWriter { return &ParquetWriter{ - pg: pg, + src: src, + pg: pg, } } @@ -42,14 +44,22 @@ func (w *ParquetWriter) Write(obj any) error { func (w *ParquetWriter) Close() { if w.pw != nil { w.pw.WriteStop() + w.pw = nil } if w.lw != nil { w.lw.Close() + w.lw = nil + } + if w.renameTo != "" { + if err := w.src.Rename(w.renameTo+".tmp", w.renameTo); err != nil { + log.Printf("warning: could not finalize parquet file %s: %v", w.renameTo, err) + } + w.renameTo = "" } } // (Re)open the output with a new schema. -func (w *ParquetWriter) Reopen(schemaHandler *pqschema.SchemaHandler) error { +func (w *ParquetWriter) Reopen(schemaHandler *schema.ParquetSchemaHandler) error { w.Close() path, err := w.pg.Next() @@ -59,10 +69,11 @@ func (w *ParquetWriter) Reopen(schemaHandler *pqschema.SchemaHandler) error { log.Printf("creating parquet file %s (%d fields)", path, len(schemaHandler.Infos)-1) - w.lw, err = local.NewLocalFileWriter(path) + w.lw, err = w.src.Create(path + ".tmp") if err != nil { return fmt.Errorf("can't create Parquet file: %w", err) } + w.renameTo = path w.pw, err = newTuplesWriter(schemaHandler, w.lw, 4) if err != nil { @@ -71,40 +82,47 @@ func (w *ParquetWriter) Reopen(schemaHandler *pqschema.SchemaHandler) error { return nil } +const maxRecordsPerFile = 1000000 + // A RotatingParquetWriter will rotate the output file on a specific period. type RotatingParquetWriter struct { *ParquetWriter + + counter int64 period time.Duration deadline time.Time - schemaHandler *pqschema.SchemaHandler + schemaHandler *schema.ParquetSchemaHandler } -func NewRotating(pg *pathgen.Generator, period time.Duration) *RotatingParquetWriter { +func NewRotating(src Source, pg *pathgen.Generator, period time.Duration) *RotatingParquetWriter { return &RotatingParquetWriter{ - ParquetWriter: New(pg), + ParquetWriter: New(src, pg), period: period, } } func (w *RotatingParquetWriter) Write(obj any) error { now := time.Now() + w.counter++ - if now.After(w.deadline) { + if now.After(w.deadline) || w.counter > maxRecordsPerFile { if err := w.Reopen(w.schemaHandler); err != nil { return err } - // Add one minute to avoid hitting the same value. - w.deadline = now.Add(1 * time.Minute).Round(w.period) + // Round to period for more accurate alignment. + w.deadline = now.Add(w.period).Round(w.period) + log.Printf("rotating output, new deadline=%v", w.deadline) } return w.ParquetWriter.Write(obj) } -func (w *RotatingParquetWriter) Reopen(schemaHandler *pqschema.SchemaHandler) error { +func (w *RotatingParquetWriter) Reopen(schemaHandler *schema.ParquetSchemaHandler) error { if err := w.ParquetWriter.Reopen(schemaHandler); err != nil { return err } w.schemaHandler = schemaHandler + w.counter = 0 return nil } -- GitLab