Skip to content
Snippets Groups Projects
Commit a40e140e authored by ale's avatar ale
Browse files

Fix error in computing rotation deadlines

parent e4f14c88
No related branches found
No related tags found
No related merge requests found
......@@ -102,17 +102,12 @@ func NewRotating(src Source, pg *pathgen.Generator, period time.Duration) *Rotat
}
func (w *RotatingParquetWriter) Write(obj any) error {
now := time.Now()
w.counter++
if now.After(w.deadline) || w.counter > maxRecordsPerFile {
if time.Now().After(w.deadline) || w.counter > maxRecordsPerFile {
if err := w.Reopen(w.schemaHandler); err != nil {
return err
}
// Round to period for more accurate alignment.
w.deadline = now.Add(w.period).Round(w.period)
log.Printf("rotating output, new deadline=%v", w.deadline)
}
return w.ParquetWriter.Write(obj)
......@@ -122,7 +117,12 @@ func (w *RotatingParquetWriter) Reopen(schemaHandler *schema.ParquetSchemaHandle
if err := w.ParquetWriter.Reopen(schemaHandler); err != nil {
return err
}
w.schemaHandler = schemaHandler
w.counter = 0
// Round to period for more accurate alignment.
w.deadline = time.Now().Add(w.period).Round(w.period)
return nil
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment