Commit 46f7117a authored by ale's avatar ale

Use a slightly nicer solution for ConfigManager.Notify

parent 3f136702
......@@ -12,8 +12,9 @@ import (
// Agent holds a Manager and a Scheduler together, and runs periodic
// backup jobs for all known sources.
type Agent struct {
mgr Manager
sched *jobs.Scheduler
mgr Manager
sched *jobs.Scheduler
stopCh chan struct{}
}
// NewAgent creates a new Agent with the specified config.
......@@ -30,27 +31,36 @@ func NewAgent(ctx context.Context, configMgr *ConfigManager, ms MetadataStore) (
// being called twice, if the unbuffered notify channel in
// Notify hasn't been read yet).
sched := jobs.NewScheduler()
f := func() {
schedule, err := makeSchedule(ctx, mgr, configMgr.getSourceSpecs(), configMgr.getSeed())
if err != nil {
log.Printf("error updating scheduler: %v", err)
}
if schedule != nil {
log.Printf("scheduler updated")
sched.SetSchedule(schedule)
stopCh := make(chan struct{})
notifyCh := configMgr.Notify()
go func() {
for {
select {
case <-stopCh:
return
case <-notifyCh:
schedule, err := makeSchedule(ctx, mgr, configMgr.getSourceSpecs(), configMgr.getSeed())
if err != nil {
log.Printf("error updating scheduler: %v", err)
}
if schedule != nil {
log.Printf("scheduler updated")
sched.SetSchedule(schedule)
}
}
}
}
configMgr.Notify(f)
f()
}()
return &Agent{
mgr: mgr,
sched: sched,
mgr: mgr,
sched: sched,
stopCh: stopCh,
}, nil
}
// Close the Agent and all associated resources.
func (a *Agent) Close() {
close(a.stopCh)
a.mgr.Close() // nolint
a.sched.Stop()
}
......
......@@ -158,7 +158,10 @@ func foreachYAMLFile(dir string, f func(string) error) error {
}
// ConfigManager holds all runtime data derived from the configuration
// itself, so it can be easily reloaded in a single place.
// itself, so it can be easily reloaded by calling Reload(). Listeners
// should register themselves with Notify() in order to be updated
// when the configuration changes (there is currently no way to
// unregister).
type ConfigManager struct {
mx sync.Mutex
config *Config
......@@ -169,7 +172,7 @@ type ConfigManager struct {
// Listeners are notified on every reload.
notifyCh chan struct{}
listeners []func()
listeners []chan struct{}
}
// NewConfigManager creates a new ConfigManager.
......@@ -182,8 +185,11 @@ func NewConfigManager(config *Config) (*ConfigManager, error) {
}
go func() {
for range m.notifyCh {
for _, f := range m.listeners {
f()
for _, lch := range m.listeners {
select {
case lch <- struct{}{}:
default:
}
}
}
}()
......@@ -243,10 +249,13 @@ func (m *ConfigManager) Close() {
}
// Notify the caller when the configuration is reloaded.
func (m *ConfigManager) Notify(f func()) {
func (m *ConfigManager) Notify() <-chan struct{} {
m.mx.Lock()
m.listeners = append(m.listeners, f)
m.mx.Unlock()
defer m.mx.Unlock()
ch := make(chan struct{}, 1)
m.listeners = append(m.listeners, ch)
return ch
}
func (m *ConfigManager) getHandler(name string) (Handler, bool) {
......
......@@ -32,8 +32,9 @@ body { background: white; font-family: "Helvetica", sans-serif; }
<h1>tabacco</h1>
<p><a href="/debug/jobs">Job status</a></p>
<p><a href="/debug/sched">Schedule</a></p>
<p><a href="/debug/jobs">job status</a></p>
<p><a href="/debug/sched">schedule</a></p>
<p><a href="/metrics">metrics</a></p>
{{template "footer"}}
`
......@@ -187,5 +188,5 @@ func (a *Agent) StartHTTPServer(addr string) {
h.Handle("/metrics", promhttp.Handler())
h.HandleFunc("/", a.handleDebugPage)
go http.ListenAndServe(addr, h)
go http.ListenAndServe(addr, h) // nolint
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment