From dd897141aa60e4507b08d27a398a4aa770257bb3 Mon Sep 17 00:00:00 2001 From: ale <ale@incal.net> Date: Sat, 30 Dec 2023 09:08:25 +0000 Subject: [PATCH] Address some linter warnings, report Close() errors on Parquet files --- cmd/pqlogd/main.go | 2 +- cmd/pqlogd/mtls.go | 2 +- http/ingest.go | 4 ++-- http/query.go | 3 ++- integrationtest/integrationtest.go | 2 +- schema/schema.go | 8 ++++---- writer/writer.go | 17 +++++++++++------ 7 files changed, 22 insertions(+), 16 deletions(-) diff --git a/cmd/pqlogd/main.go b/cmd/pqlogd/main.go index 286f43a..1bf36e8 100644 --- a/cmd/pqlogd/main.go +++ b/cmd/pqlogd/main.go @@ -97,7 +97,7 @@ func main() { <-sigCh log.Printf("received termination signal, exiting...") ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - httpSrv.Shutdown(ctx) + httpSrv.Shutdown(ctx) // nolint:errcheck cancel() httpSrv.Close() }() diff --git a/cmd/pqlogd/mtls.go b/cmd/pqlogd/mtls.go index f0180ee..4a78ae3 100644 --- a/cmd/pqlogd/mtls.go +++ b/cmd/pqlogd/mtls.go @@ -14,7 +14,7 @@ var ( tlsCA = flag.String("tls-ca", "", "TLS CA certificate") ) -// tlsConfig returns a tls.Config built from command-line options. +// buildTLSConfig returns a tls.Config built from command-line options. func buildTLSConfig() (*tls.Config, error) { if *tlsCert == "" || *tlsKey == "" { return nil, nil diff --git a/http/ingest.go b/http/ingest.go index 4226413..3a64cc7 100644 --- a/http/ingest.go +++ b/http/ingest.go @@ -146,7 +146,7 @@ func (s *IngestServer) DebugHandler(w http.ResponseWriter, req *http.Request) { s.mx.Unlock() w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(out) + _ = json.NewEncoder(w).Encode(out) } func (s *IngestServer) SchemaHandler(w http.ResponseWriter, req *http.Request) { @@ -158,5 +158,5 @@ func (s *IngestServer) SchemaHandler(w http.ResponseWriter, req *http.Request) { s.mx.Unlock() w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(out) + _ = json.NewEncoder(w).Encode(out) } diff --git a/http/query.go b/http/query.go index de28665..cbe499b 100644 --- a/http/query.go +++ b/http/query.go @@ -70,7 +70,7 @@ func (q *QueryServer) runQuery(ctx context.Context, w io.Writer, query string) e if err != nil { return fmt.Errorf("failed to create transaction: %w", err) } - defer tx.Rollback() + defer tx.Rollback() // nolint:errcheck remoteGlob := q.src.ParquetURI(q.glob) _, err = tx.Exec(fmt.Sprintf("CREATE VIEW %s AS SELECT * FROM read_parquet('%s', union_by_name=true)", table, remoteGlob)) @@ -79,6 +79,7 @@ func (q *QueryServer) runQuery(ctx context.Context, w io.Writer, query string) e } defer func() { // Well, ok, we have the Rollback() but. + // nolint:errcheck tx.Exec(fmt.Sprintf("DROP VIEW %s", table)) }() diff --git a/integrationtest/integrationtest.go b/integrationtest/integrationtest.go index 1a7e10a..b06f480 100644 --- a/integrationtest/integrationtest.go +++ b/integrationtest/integrationtest.go @@ -77,7 +77,7 @@ func randomRecord(rec *testRecord) { rec.A0 = rand.Int63n(3) rec.A1 = rand.Float64() * 128.0 } else { - rec.Message = fmt.Sprintf("this is a test message, quite boring") + rec.Message = "this is a test message, quite boring" } } diff --git a/schema/schema.go b/schema/schema.go index 64d0da0..0d80deb 100644 --- a/schema/schema.go +++ b/schema/schema.go @@ -153,9 +153,9 @@ func NewGuesser() *Guesser { } type serializableGuesser struct { - Columns map[string]int - Errors map[string]string - VersionCounter int64 + Columns map[string]int `json:"columns"` + Errors map[string]string `json:"errors"` + VersionCounter int64 `json:"version_counter"` } func (sg *serializableGuesser) Guesser() *Guesser { @@ -208,7 +208,7 @@ type Schema struct { upcast map[string]upcastFunc } -// GetSchema returns the best possible version of a schema given our +// NewSchema returns the best possible version of a schema given our // current data type guesses. func (g *Guesser) NewSchema() *Schema { cols := make(map[string]Type) diff --git a/writer/writer.go b/writer/writer.go index 633d509..03532bd 100644 --- a/writer/writer.go +++ b/writer/writer.go @@ -15,7 +15,7 @@ import ( type Writer interface { Reopen(*schema.ParquetSchemaHandler) error Write(any) error - Close() + Close() error } // The ParquetWriter writes a sequence of sequentially numbered @@ -41,9 +41,10 @@ func (w *ParquetWriter) Write(obj any) error { return w.pw.Write(obj) } -func (w *ParquetWriter) Close() { +func (w *ParquetWriter) Close() error { + var err error if w.pw != nil { - w.pw.WriteStop() + err = w.pw.WriteStop() w.pw = nil } if w.lw != nil { @@ -51,16 +52,20 @@ func (w *ParquetWriter) Close() { w.lw = nil } if w.renameTo != "" { - if err := w.src.Rename(w.renameTo+".tmp", w.renameTo); err != nil { - log.Printf("warning: could not finalize parquet file %s: %v", w.renameTo, err) + // Only rename the file if WriteStop was successful. + if err == nil { + err = w.src.Rename(w.renameTo+".tmp", w.renameTo) } w.renameTo = "" } + return err } // (Re)open the output with a new schema. func (w *ParquetWriter) Reopen(schemaHandler *schema.ParquetSchemaHandler) error { - w.Close() + if err := w.Close(); err != nil { + log.Printf("error finalizing Parquet file: %v", err) + } path, err := w.pg.Next() if err != nil { -- GitLab