agent.go 2.28 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
package tabacco

import (
	"context"
	"fmt"
	"log"

	"git.autistici.org/ale/tabacco/jobs"
	"git.autistici.org/ale/tabacco/util"
)

ale's avatar
ale committed
12
// Agent holds a Manager and a Scheduler together, and runs periodic
13
// backup jobs for all known sources.
ale's avatar
ale committed
14
type Agent struct {
15 16 17
	mgr    Manager
	sched  *jobs.Scheduler
	stopCh chan struct{}
18 19
}

ale's avatar
ale committed
20 21
// NewAgent creates a new Agent with the specified config.
func NewAgent(ctx context.Context, configMgr *ConfigManager, ms MetadataStore) (*Agent, error) {
22 23 24 25 26
	mgr, err := NewManager(ctx, configMgr, ms)
	if err != nil {
		return nil, err
	}

ale's avatar
ale committed
27 28
	// Create a Scheduler and register with the configMgr so we
	// can reload the schedule on configuration changes.
29
	sched := jobs.NewScheduler()
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
	stopCh := make(chan struct{})
	notifyCh := configMgr.Notify()
	go func() {
		for {
			select {
			case <-stopCh:
				return
			case <-notifyCh:
				schedule, err := makeSchedule(ctx, mgr, configMgr.getSourceSpecs(), configMgr.getSeed())
				if err != nil {
					log.Printf("error updating scheduler: %v", err)
				}
				if schedule != nil {
					log.Printf("scheduler updated")
					sched.SetSchedule(schedule)
				}
			}
47
		}
48
	}()
49

ale's avatar
ale committed
50
	return &Agent{
51 52 53
		mgr:    mgr,
		sched:  sched,
		stopCh: stopCh,
54 55 56
	}, nil
}

ale's avatar
ale committed
57
// Close the Agent and all associated resources.
58
func (a *Agent) Close() {
59
	close(a.stopCh)
60 61
	a.mgr.Close() // nolint
	a.sched.Stop()
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
}

// Create a new jobs.Schedule that will trigger a separate backup for
// each configured data source that includes a 'schedule' attribute.
func makeSchedule(ctx context.Context, m Manager, sourceSpecs []SourceSpec, hostSeed int64) (*jobs.Schedule, error) {
	sched := jobs.NewSchedule(ctx, hostSeed)
	merr := new(util.MultiError)
	var good int
	for _, spec := range sourceSpecs {
		if spec.Schedule == "" {
			continue
		}
		// Bind spec to a new closure.
		err := func(spec SourceSpec) error {
			return sched.Add(spec.Name, spec.Schedule, func() jobs.Job {
				_, j, err := m.BackupJob(ctx, []SourceSpec{spec})
				if err != nil {
					log.Printf("%s: can't create backup job: %v", spec.Name, err)
				}
				return j
			})
		}(spec)
		if err != nil {
			merr.Add(fmt.Errorf("%s: %v", spec.Name, err))
		} else {
			good++
		}
	}

	// All sources failing is a fatal error, return a nil Schedule.
	if good == 0 && !merr.IsNil() {
		return nil, merr
	}

	return sched, merr.OrNil()
}