agent.go 2.39 KB
Newer Older
1 2 3 4 5 6 7
package tabacco

import (
	"context"
	"fmt"
	"log"

ale's avatar
ale committed
8 9
	"git.autistici.org/ai3/tools/tabacco/jobs"
	"git.autistici.org/ai3/tools/tabacco/util"
10 11
)

ale's avatar
ale committed
12
// Agent holds a Manager and a Scheduler together, and runs periodic
13
// backup jobs for all known sources.
ale's avatar
ale committed
14
type Agent struct {
15 16 17
	mgr    Manager
	sched  *jobs.Scheduler
	stopCh chan struct{}
18 19
}

ale's avatar
ale committed
20 21
// NewAgent creates a new Agent with the specified config.
func NewAgent(ctx context.Context, configMgr *ConfigManager, ms MetadataStore) (*Agent, error) {
22 23 24 25 26
	mgr, err := NewManager(ctx, configMgr, ms)
	if err != nil {
		return nil, err
	}

ale's avatar
ale committed
27 28
	// Create a Scheduler and register with the configMgr so we
	// can reload the schedule on configuration changes.
29
	sched := jobs.NewScheduler()
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
	stopCh := make(chan struct{})
	notifyCh := configMgr.Notify()
	go func() {
		for {
			select {
			case <-stopCh:
				return
			case <-notifyCh:
				schedule, err := makeSchedule(ctx, mgr, configMgr.getSourceSpecs(), configMgr.getSeed())
				if err != nil {
					log.Printf("error updating scheduler: %v", err)
				}
				if schedule != nil {
					log.Printf("scheduler updated")
					sched.SetSchedule(schedule)
				}
			}
47
		}
48
	}()
49

ale's avatar
ale committed
50
	return &Agent{
51 52 53
		mgr:    mgr,
		sched:  sched,
		stopCh: stopCh,
54 55 56
	}, nil
}

ale's avatar
ale committed
57
// Close the Agent and all associated resources.
58
func (a *Agent) Close() {
59
	close(a.stopCh)
60 61
	a.mgr.Close() // nolint
	a.sched.Stop()
62 63 64 65
}

// Create a new jobs.Schedule that will trigger a separate backup for
// each configured data source that includes a 'schedule' attribute.
ale's avatar
ale committed
66
func makeSchedule(ctx context.Context, m Manager, sourceSpecs []*SourceSpec, hostSeed int64) (*jobs.Schedule, error) {
67 68 69 70 71 72 73 74
	sched := jobs.NewSchedule(ctx, hostSeed)
	merr := new(util.MultiError)
	var good int
	for _, spec := range sourceSpecs {
		if spec.Schedule == "" {
			continue
		}
		// Bind spec to a new closure.
ale's avatar
ale committed
75
		err := func(spec *SourceSpec) error {
76
			return sched.Add(spec.Name, spec.Schedule, func() jobs.Job {
ale's avatar
ale committed
77
				_, j, err := m.BackupJob(ctx, spec)
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
				if err != nil {
					log.Printf("%s: can't create backup job: %v", spec.Name, err)
				}
				return j
			})
		}(spec)
		if err != nil {
			merr.Add(fmt.Errorf("%s: %v", spec.Name, err))
		} else {
			good++
		}
	}

	// All sources failing is a fatal error, return a nil Schedule.
	if good == 0 && !merr.IsNil() {
		return nil, merr
	}

	return sched, merr.OrNil()
}
ale's avatar
ale committed
98 99 100 101 102

// RunNow starts all jobs right now, regardless of their schedule.
func (a *Agent) RunNow() {
	a.sched.RunNow()
}