package acmeserver import ( "context" "sync" "time" ) // Updates are long-term jobs, so they should be interruptible. We run // updates in a separate goroutine, and cancel them when the // configuration is reloaded or on exit. A semaphore ensures that only // one update goroutine will be running at any given time (without // other ones piling up). func runWithUpdates(ctx context.Context, fn func(context.Context, interface{}), reloadCh <-chan interface{}, updateInterval time.Duration) { // Function to cancel the current update, and the associated // WaitGroup to wait for its termination. var upCancel context.CancelFunc var wg sync.WaitGroup sem := make(chan struct{}, 1) startUpdate := func(value interface{}) context.CancelFunc { // Acquire the semaphore, return if we fail to. // Equivalent to a 'try-lock' construct. select { case sem <- struct{}{}: default: return nil } defer func() { <-sem }() ctx, cancel := context.WithCancel(ctx) wg.Add(1) go func() { fn(ctx, value) wg.Done() }() return cancel } // Cancel the running update, if any. Called on config // updates, when exiting. cancelUpdate := func() { if upCancel != nil { upCancel() upCancel = nil } wg.Wait() } defer cancelUpdate() var cur interface{} tick := time.NewTicker(updateInterval) defer tick.Stop() for { select { case <-tick.C: // Do not cancel running update when running the ticker. if cancel := startUpdate(cur); cancel != nil { upCancel = cancel } case value := <-reloadCh: // Cancel the running update when configuration is reloaded. cancelUpdate() cur = value case <-ctx.Done(): return } } }