Select Git revision
customize-preview.js
agent.go 2.41 KiB
package tabacco
import (
"context"
"fmt"
"log"
"git.autistici.org/ai3/tools/tabacco/jobs"
"git.autistici.org/ai3/tools/tabacco/util"
)
// Agent holds a Manager and a Scheduler together, and runs periodic
// backup jobs for all known sources.
type Agent struct {
mgr Manager
sched *jobs.Scheduler
stopCh chan struct{}
}
// NewAgent creates a new Agent with the specified config.
func NewAgent(ctx context.Context, configMgr *ConfigManager, ms MetadataStore) (*Agent, error) {
mgr, err := NewManager(ctx, configMgr, ms)
if err != nil {
return nil, err
}
// Create a Scheduler and register with the configMgr so we
// can reload the schedule on configuration changes.
sched := jobs.NewScheduler()
stopCh := make(chan struct{})
notifyCh := configMgr.Notify()
go func() {
for {
select {
case <-stopCh:
return
case <-notifyCh:
config := configMgr.current()
schedule, err := makeSchedule(ctx, mgr, config.SourceSpecs(), config.Seed())
if err != nil {
log.Printf("error updating scheduler: %v", err)
}
if schedule != nil {
log.Printf("scheduler updated")
sched.SetSchedule(schedule)
}
}
}
}()
return &Agent{
mgr: mgr,
sched: sched,
stopCh: stopCh,
}, nil
}
// Close the Agent and all associated resources.
func (a *Agent) Close() {
close(a.stopCh)
a.mgr.Close() // nolint
a.sched.Stop()
}
// Create a new jobs.Schedule that will trigger a separate backup for
// each configured data source that includes a 'schedule' attribute.
func makeSchedule(ctx context.Context, m Manager, sourceSpecs []*SourceSpec, hostSeed int64) (*jobs.Schedule, error) {
sched := jobs.NewSchedule(ctx, hostSeed)
merr := new(util.MultiError)
var good int
for _, spec := range sourceSpecs {
if spec.Schedule == "" {
continue
}
// Bind spec to a new closure.
err := func(spec *SourceSpec) error {
return sched.Add(spec.Name, spec.Schedule, func() jobs.Job {
_, j, err := m.BackupJob(ctx, spec)
if err != nil {
log.Printf("%s: can't create backup job: %v", spec.Name, err)
}
return j
})
}(spec)
if err != nil {
merr.Add(fmt.Errorf("%s: %v", spec.Name, err))
} else {
good++
}
}
// All sources failing is a fatal error, return a nil Schedule.
if good == 0 && !merr.IsNil() {
return nil, merr
}
return sched, merr.OrNil()
}
// RunNow starts all jobs right now, regardless of their schedule.
func (a *Agent) RunNow() {
a.sched.RunNow()
}