diff --git a/writer/writer.go b/writer/writer.go index fdc8173efab55bb30863e35350af64792174407a..3f968ee5b2aaaa9e614298d7d78327fd7eed7042 100644 --- a/writer/writer.go +++ b/writer/writer.go @@ -102,17 +102,12 @@ func NewRotating(src Source, pg *pathgen.Generator, period time.Duration) *Rotat } func (w *RotatingParquetWriter) Write(obj any) error { - now := time.Now() w.counter++ - if now.After(w.deadline) || w.counter > maxRecordsPerFile { + if time.Now().After(w.deadline) || w.counter > maxRecordsPerFile { if err := w.Reopen(w.schemaHandler); err != nil { return err } - - // Round to period for more accurate alignment. - w.deadline = now.Add(w.period).Round(w.period) - log.Printf("rotating output, new deadline=%v", w.deadline) } return w.ParquetWriter.Write(obj) @@ -122,7 +117,12 @@ func (w *RotatingParquetWriter) Reopen(schemaHandler *schema.ParquetSchemaHandle if err := w.ParquetWriter.Reopen(schemaHandler); err != nil { return err } + w.schemaHandler = schemaHandler w.counter = 0 + + // Round to period for more accurate alignment. + w.deadline = time.Now().Add(w.period).Round(w.period) + return nil }