agent.go 2.4 KB
Newer Older
ale's avatar
ale committed
1 2 3 4 5 6 7
package tabacco

import (
	"context"
	"fmt"
	"log"

ale's avatar
ale committed
8 9
	"git.autistici.org/ai3/tools/tabacco/jobs"
	"git.autistici.org/ai3/tools/tabacco/util"
ale's avatar
ale committed
10 11
)

ale's avatar
ale committed
12
// Agent holds a Manager and a Scheduler together, and runs periodic
ale's avatar
ale committed
13
// backup jobs for all known sources.
ale's avatar
ale committed
14
type Agent struct {
15 16 17
	mgr    Manager
	sched  *jobs.Scheduler
	stopCh chan struct{}
ale's avatar
ale committed
18 19
}

ale's avatar
ale committed
20 21
// NewAgent creates a new Agent with the specified config.
func NewAgent(ctx context.Context, configMgr *ConfigManager, ms MetadataStore) (*Agent, error) {
ale's avatar
ale committed
22 23 24 25 26
	mgr, err := NewManager(ctx, configMgr, ms)
	if err != nil {
		return nil, err
	}

ale's avatar
ale committed
27 28
	// Create a Scheduler and register with the configMgr so we
	// can reload the schedule on configuration changes.
ale's avatar
ale committed
29
	sched := jobs.NewScheduler()
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
	stopCh := make(chan struct{})
	notifyCh := configMgr.Notify()
	go func() {
		for {
			select {
			case <-stopCh:
				return
			case <-notifyCh:
				schedule, err := makeSchedule(ctx, mgr, configMgr.getSourceSpecs(), configMgr.getSeed())
				if err != nil {
					log.Printf("error updating scheduler: %v", err)
				}
				if schedule != nil {
					log.Printf("scheduler updated")
					sched.SetSchedule(schedule)
				}
			}
ale's avatar
ale committed
47
		}
48
	}()
ale's avatar
ale committed
49

ale's avatar
ale committed
50
	return &Agent{
51 52 53
		mgr:    mgr,
		sched:  sched,
		stopCh: stopCh,
ale's avatar
ale committed
54 55 56
	}, nil
}

ale's avatar
ale committed
57
// Close the Agent and all associated resources.
ale's avatar
ale committed
58
func (a *Agent) Close() {
59
	close(a.stopCh)
ale's avatar
ale committed
60 61
	a.mgr.Close() // nolint
	a.sched.Stop()
ale's avatar
ale committed
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
}

// Create a new jobs.Schedule that will trigger a separate backup for
// each configured data source that includes a 'schedule' attribute.
func makeSchedule(ctx context.Context, m Manager, sourceSpecs []SourceSpec, hostSeed int64) (*jobs.Schedule, error) {
	sched := jobs.NewSchedule(ctx, hostSeed)
	merr := new(util.MultiError)
	var good int
	for _, spec := range sourceSpecs {
		if spec.Schedule == "" {
			continue
		}
		// Bind spec to a new closure.
		err := func(spec SourceSpec) error {
			return sched.Add(spec.Name, spec.Schedule, func() jobs.Job {
				_, j, err := m.BackupJob(ctx, []SourceSpec{spec})
				if err != nil {
					log.Printf("%s: can't create backup job: %v", spec.Name, err)
				}
				return j
			})
		}(spec)
		if err != nil {
			merr.Add(fmt.Errorf("%s: %v", spec.Name, err))
		} else {
			good++
		}
	}

	// All sources failing is a fatal error, return a nil Schedule.
	if good == 0 && !merr.IsNil() {
		return nil, merr
	}

	return sched, merr.OrNil()
}
ale's avatar
ale committed
98 99 100 101 102

// RunNow starts all jobs right now, regardless of their schedule.
func (a *Agent) RunNow() {
	a.sched.RunNow()
}