Commit ed6da2b3 authored by ale's avatar ale

Move jobs library to a separate package

The Job and Scheduler objects are fairly generic.
parent cf0fc44a
......@@ -7,6 +7,8 @@ import (
"regexp"
"strings"
"time"
"git.autistici.org/ale/tabacco/jobs"
)
// Backup is the over-arching entity describing a high level backup
......@@ -122,9 +124,9 @@ type Repository interface {
// Manager for backups and restores.
type Manager interface {
BackupJob(context.Context, []SourceSpec) (Backup, Job, error)
BackupJob(context.Context, []SourceSpec) (Backup, jobs.Job, error)
Backup(context.Context, []SourceSpec) (Backup, error)
RestoreJob(context.Context, FindRequest, string) (Job, error)
RestoreJob(context.Context, FindRequest, string) (jobs.Job, error)
Restore(context.Context, FindRequest, string) error
Close() error
}
......@@ -5,6 +5,8 @@ import (
"fmt"
"testing"
"time"
"git.autistici.org/ale/tabacco/jobs"
)
type dummyMetadataEntry struct {
......@@ -146,12 +148,12 @@ func TestBackup(t *testing.T) {
},
},
{
Name: "source2",
Handler: "dbpipe",
AtomsScript: "echo user1 user1 ; echo user2 user2",
Name: "source2",
Handler: "dbpipe",
AtomsCommand: "echo user1 user1 ; echo user2 user2",
},
}
queueSpec := MultiQueueSpec{
queueSpec := jobs.QueueSpec{
Workers: map[string]int{"backup": 2},
}
......
......@@ -71,12 +71,12 @@ func (c *daemonCommand) Execute(ctx context.Context, f *flag.FlagSet, args ...in
return subcommands.ExitFailure
}
mgr, err := tabacco.NewManager(ctx, configMgr, store)
d, err := tabacco.NewDaemon(ctx, configMgr, store)
if err != nil {
log.Printf("error: %v", err)
return subcommands.ExitFailure
}
defer mgr.Close() // nolint
defer d.Close() // nolint
// Wait for the outmost Context to terminate (presumably due to SIGTERM).
log.Printf("backup manager started")
......
package tabacco
import (
"crypto/rand"
"encoding/binary"
"errors"
"fmt"
"io/ioutil"
......@@ -9,15 +11,19 @@ import (
"sync"
"git.autistici.org/ai3/go-common/clientutil"
"git.autistici.org/ale/tabacco/jobs"
"git.autistici.org/ale/tabacco/util"
"gopkg.in/yaml.v2"
)
var defaultSeedFile = "/var/tmp/.tabacco_scheduler_seed"
// Config is the global configuration object. While the actual
// configuration is spread over multiple files and directories, this
// holds it all together.
type Config struct {
Hostname string `yaml:"hostname"`
Queue MultiQueueSpec `yaml:"queue_config"`
Queue jobs.QueueSpec `yaml:"queue_config"`
Repository RepositorySpec `yaml:"repository"`
DryRun bool `yaml:"dry_run"`
......@@ -112,7 +118,7 @@ func ReadConfig(path string) (*Config, error) {
}
func logMultiError(prefix string, err error) {
if merr, ok := err.(*multiError); ok {
if merr, ok := err.(*util.MultiError); ok {
for _, e := range merr.Errors() {
log.Printf("%s%v", prefix, e)
}
......@@ -139,13 +145,13 @@ func foreachYAMLFile(dir string, f func(string) error) error {
if err != nil {
return err
}
merr := new(multiError)
merr := new(util.MultiError)
for _, path := range files {
if err := f(path); err != nil {
merr.Add(err)
}
}
return merr.orNil()
return merr.OrNil()
}
// ConfigManager holds all runtime data derived from the configuration
......@@ -155,6 +161,7 @@ type ConfigManager struct {
config *Config
handlerMap map[string]Handler
repo Repository
seed int64
// Listeners are notified on every reload.
notifyCh chan struct{}
......@@ -198,6 +205,12 @@ func (m *ConfigManager) Reload(config *Config) error {
return err
}
seedFile := defaultSeedFile
if config.RandomSeedFile != "" {
seedFile = config.RandomSeedFile
}
seed := mustGetSeed(seedFile)
// Update config and notify listeners (in a separate
// goroutine, that does not hold the lock).
m.mx.Lock()
......@@ -208,6 +221,7 @@ func (m *ConfigManager) Reload(config *Config) error {
m.repo = repo
m.handlerMap = handlerMap
m.config = config
m.seed = seed
m.notifyCh <- struct{}{}
return nil
}
......@@ -241,7 +255,7 @@ func (m *ConfigManager) getRepository() Repository {
return m.repo
}
func (m *ConfigManager) getQueueSpec() MultiQueueSpec {
func (m *ConfigManager) getQueueSpec() jobs.QueueSpec {
m.mx.Lock()
defer m.mx.Unlock()
return m.config.Queue
......@@ -253,8 +267,33 @@ func (m *ConfigManager) getSourceSpecs() []SourceSpec {
return m.config.SourceSpecs
}
func (m *ConfigManager) getSeedFile() string {
func (m *ConfigManager) getSeed() int64 {
m.mx.Lock()
defer m.mx.Unlock()
return m.config.RandomSeedFile
return m.seed
}
func mustGetSeed(path string) int64 {
if data, err := ioutil.ReadFile(path); err == nil && len(data) == 8 { // nolint: gosec
if seed := binary.LittleEndian.Uint64(data); seed > 0 {
return int64(seed)
}
}
seed, data := randomSeed()
if err := ioutil.WriteFile(path, data, 0600); err != nil {
log.Printf("warning: can't write random seed file: %v", err)
}
return int64(seed)
}
// Generate a random uint64, and return it along with its byte
// representation (encoding/binary, little-endian).
func randomSeed() (uint64, []byte) {
// Initialize the seed from a secure source.
var b [8]byte
if _, err := rand.Read(b[:]); err != nil { // nolint: gosec
panic(err)
}
seed := binary.LittleEndian.Uint64(b[:])
return seed, b[:]
}
package tabacco
import (
"context"
"fmt"
"log"
"net/http"
"git.autistici.org/ale/tabacco/jobs"
"git.autistici.org/ale/tabacco/util"
)
// Daemon holds a Manager and a Scheduler together, and runs periodic
// backup jobs for all known sources.
type Daemon struct {
mgr Manager
sched *jobs.Scheduler
}
// NewDaemon creates a new Daemon with the specified config.
func NewDaemon(ctx context.Context, configMgr *ConfigManager, ms MetadataStore) (*Daemon, error) {
mgr, err := NewManager(ctx, configMgr, ms)
if err != nil {
return nil, err
}
// Create a Scheduler and tell the configMgr to reload its
// configuration too.
sched := jobs.NewScheduler()
configMgr.Notify(func() {
schedule, err := makeSchedule(ctx, mgr, configMgr.getSourceSpecs(), configMgr.getSeed())
if err != nil {
log.Printf("error updating scheduler: %v", err)
}
if schedule != nil {
sched.SetSchedule(schedule)
}
})
return &Daemon{
mgr: mgr,
sched: sched,
}, nil
}
// Close the Daemon and all associated resources.
func (d *Daemon) Close() {
d.mgr.Close() // nolint
d.sched.Stop()
}
func (d *Daemon) startHTTPServer(addr string) error {
//http.Handle("/debug/jobs", d.mgr.StateManager)
http.Handle("/debug/sched", d.sched)
go http.ListenAndServe(addr, nil)
return nil
}
// Create a new jobs.Schedule that will trigger a separate backup for
// each configured data source that includes a 'schedule' attribute.
func makeSchedule(ctx context.Context, m Manager, sourceSpecs []SourceSpec, hostSeed int64) (*jobs.Schedule, error) {
sched := jobs.NewSchedule(ctx, hostSeed)
merr := new(util.MultiError)
var good int
for _, spec := range sourceSpecs {
if spec.Schedule == "" {
continue
}
// Bind spec to a new closure.
err := func(spec SourceSpec) error {
return sched.Add(spec.Name, spec.Schedule, func() jobs.Job {
_, j, err := m.BackupJob(ctx, []SourceSpec{spec})
if err != nil {
log.Printf("%s: can't create backup job: %v", spec.Name, err)
}
return j
})
}(spec)
if err != nil {
merr.Add(fmt.Errorf("%s: %v", spec.Name, err))
} else {
good++
}
}
// All sources failing is a fatal error, return a nil Schedule.
if good == 0 && !merr.IsNil() {
return nil, merr
}
return sched, merr.OrNil()
}
package tabacco
package jobs
import (
"html/template"
......@@ -112,8 +112,8 @@ func init() {
// ServeHTTP implements the job status debug handler, by making the
// stateManager object match the http.Handler interface.
func (j *stateManager) ServeHTTP(w http.ResponseWriter, r *http.Request) {
pending, running, done := j.getJobsStatus()
func (j *StateManager) ServeHTTP(w http.ResponseWriter, r *http.Request) {
pending, running, done := j.getStatus()
w.Header().Set("Content-Type", "text/html")
_ = debugTpl.Lookup("state_manager_debug_page").Execute(w, map[string]interface{}{
......
This diff is collapsed.
package tabacco
package jobs
import (
"context"
"encoding/binary"
"fmt"
"hash/crc64"
"io/ioutil"
"log"
"math/rand"
"sort"
......@@ -16,10 +13,112 @@ import (
"github.com/robfig/cron"
)
var defaultSeedFile = "/var/tmp/.tabacco_scheduler_seed"
// JobGeneratorFunc is a function that returns a new Job.
type JobGeneratorFunc func() Job
// The Scheduler runs backup jobs periodically, according to the
// schedule specified in the source spec.
// Exit status of a named cron job.
type cronJobExitStatus struct {
name string
err error
}
// A Schedule configures a Scheduler with job generators.
type Schedule struct {
hostSeed int64
rootCtx context.Context
// Runtime components. The Schedule actually runs the cron
// jobs, the Scheduler is just a switching wrapper to an
// active Schedule. But the Scheduler maintains a persistent
// (across reloads) log of the last error / success for every
// job, so we use a channel to link the two.
c *cron.Cron
mx sync.Mutex
notifyCh chan cronJobExitStatus
}
// NewSchedule creates a new Schedule. The context passed to this
// function is the one that all scheduled jobs will be using, so use
// it for global cancellation, or just pass context.Background().
func NewSchedule(ctx context.Context, hostSeed int64) *Schedule {
return &Schedule{
hostSeed: hostSeed,
rootCtx: ctx,
c: cron.New(),
}
}
// Add a task to the schedule.
func (s *Schedule) Add(name, schedStr string, jobFn JobGeneratorFunc) error {
sched, err := parseSchedule(schedStr, s.hostSeed)
if err != nil {
return err
}
s.c.Schedule(sched, &cronJob{
name: name,
fn: jobFn,
ctx: s.rootCtx,
schedule: s,
})
return nil
}
func (s *Schedule) notify(name string, err error) {
s.mx.Lock()
if s.notifyCh != nil {
s.notifyCh <- cronJobExitStatus{name: name, err: err}
}
s.mx.Unlock()
}
func (s *Schedule) start(notifyCh chan cronJobExitStatus) {
s.mx.Lock()
s.notifyCh = notifyCh
s.mx.Unlock()
s.c.Start()
}
func (s *Schedule) stop() {
s.c.Stop()
s.mx.Lock()
s.notifyCh = nil
s.mx.Unlock()
}
// A cronJob implements cron.Job. It will generate a new Job and start
// it in a new goroutine, so, from the point of view of the cron
// package, cron jobs are instantaneous.
type cronJob struct {
name string
fn JobGeneratorFunc
ctx context.Context
// Link back to the Schedule so we can notify the Scheduler
// about exit status.
schedule *Schedule
}
func (j *cronJob) Run() {
job := j.fn()
if job == nil {
return
}
go func() {
log.Printf("scheduled job %s starting", j.name)
err := job.RunContext(j.ctx)
if err != nil {
log.Printf("scheduled job %s failed: %v", j.name, err)
} else {
log.Printf("scheduled job %s succeeded", j.name)
}
j.schedule.notify(j.name, err)
}()
}
// A Scheduler triggers Jobs on a periodic schedule. It uses job
// generators (functions that return Jobs) to create new jobs at the
// desired time.
//
// The standard cron syntax (documentation available at
// https://github.com/robfig/cron) is extended with the syntax:
......@@ -32,104 +131,68 @@ var defaultSeedFile = "/var/tmp/.tabacco_scheduler_seed"
// over time because the random seed it's generated from is saved in a
// file.
type Scheduler struct {
mgr Manager
hostSeed int64
rootCtx context.Context
mx sync.Mutex
cur *Schedule
mx sync.Mutex
c *cron.Cron
notifyMx sync.Mutex
lastError map[string]error
notifyCh chan cronJobExitStatus
}
// NewScheduler creates a new Scheduler.
func NewScheduler(ctx context.Context, m Manager, sourceSpecs []SourceSpec, seedFile string) (*Scheduler, error) {
if seedFile == "" {
seedFile = defaultSeedFile
}
hostSeed := mustGetSeed(seedFile)
func NewScheduler() *Scheduler {
s := &Scheduler{
mgr: m,
hostSeed: hostSeed,
rootCtx: ctx,
}
if err := s.updateSchedule(sourceSpecs); err != nil {
return nil, err
lastError: make(map[string]error),
notifyCh: make(chan cronJobExitStatus),
}
return s, nil
}
type scheduleAndJob struct {
sched cron.Schedule
spec SourceSpec
}
func (s *Scheduler) updateSchedule(sourceSpecs []SourceSpec) error {
// Parse first, schedule later.
merr := new(multiError)
var tmp []scheduleAndJob
for _, spec := range sourceSpecs {
// Only schedule sources that have a 'schedule' attribute defined.
if spec.Schedule != "" {
sched, err := parseSchedule(spec.Schedule, s.hostSeed)
if err != nil {
merr.Add(fmt.Errorf("%s: bad schedule: %v", spec.Name, err))
continue
}
tmp = append(tmp, scheduleAndJob{sched: sched, spec: spec})
go func() {
for ex := range s.notifyCh {
s.notifyMx.Lock()
s.lastError[ex.name] = ex.err
s.notifyMx.Unlock()
}
}
if !merr.isNil() {
return merr
}
}()
return s
}
// Create a new cron scheduler and schedule all the jobs.
c := cron.New()
for _, sj := range tmp {
c.Schedule(sj.sched, &startBackupCronJob{mgr: s.mgr, spec: sj.spec, ctx: s.rootCtx})
func (s *Scheduler) getLastErrorString(name string) string {
s.notifyMx.Lock()
defer s.notifyMx.Unlock()
err := s.lastError[name]
if err == nil {
return ""
}
return err.Error()
}
// Stop the previous cron job, if any, and start this one.
// SetSchedule replaces the current schedule with a new one.
func (s *Scheduler) SetSchedule(schedule *Schedule) {
s.mx.Lock()
defer s.mx.Unlock()
if s.c != nil {
s.c.Stop()
if s.cur != nil {
s.cur.stop()
}
c.Start()
s.c = c
return nil
}
type startBackupCronJob struct {
mgr Manager
spec SourceSpec
ctx context.Context
}
func (j *startBackupCronJob) Run() {
go func() {
backup, err := j.mgr.Backup(j.ctx, []SourceSpec{j.spec})
if err != nil {
log.Printf("%s: error: could not create backup: %v", j.spec.Name, err)
return
}
log.Printf("%s: scheduled backup %s completed successfully", j.spec.Name, backup.ID)
}()
s.cur = schedule
s.cur.start(s.notifyCh)
s.mx.Unlock()
}
// Stop the scheduler (won't affect running jobs).
func (s *Scheduler) Stop() {
s.c.Stop()
s.mx.Lock()
if s.cur != nil {
s.cur.stop()
}
s.mx.Unlock()
close(s.notifyCh)
}
// CronJobStatus represents the status of a job, either scheduled,
// running, or terminated in the past.
type CronJobStatus struct {
Spec SourceSpec
Prev time.Time
Next time.Time
//RunAt time.Time
//Running bool
//Error string
Name string
Prev time.Time
Next time.Time
LastError string
}
type cronJobStatusList struct {
......@@ -145,27 +208,28 @@ func cronJobStatusListOrderByName(list []CronJobStatus) *cronJobStatusList {
return &cronJobStatusList{
list: list,
lessFn: func(i, j int) bool {
return list[i].Spec.Name < list[j].Spec.Name
return list[i].Name < list[j].Name
},
}
}
// SchedulerStatus holds information about the scheduler state, and
// the past executions.
type SchedulerStatus struct {
Scheduled []CronJobStatus `json:"scheduled"`
}
// getStatus returns the current status of the scheduled jobs.
func (s *Scheduler) getStatus() []CronJobStatus {
s.mx.Lock()
defer s.mx.Unlock()
if s.cur == nil {
return nil
}
var jobs []CronJobStatus
for _, entry := range s.c.Entries() {
// Get the startBackupCronJob behind the cron.Job interface.
if job, ok := entry.Job.(*startBackupCronJob); ok {
for _, entry := range s.cur.c.Entries() {
// Get the cronJob behind the cron.Job interface.
if job, ok := entry.Job.(*cronJob); ok {
jobs = append(jobs, CronJobStatus{
Spec: job.spec,
Prev: entry.Prev,
Next: entry.Next,
Name: job.name,
Prev: entry.Prev,
Next: entry.Next,
LastError: s.getLastErrorString(job.name),
})
}
}
......@@ -219,19 +283,6 @@ func (s *randomPeriodicSchedule) Next(now time.Time) time.Time {
return next
}
func mustGetSeed(path string) int64 {
if data, err := ioutil.ReadFile(path); err == nil && len(data) == 8 { // nolint: gosec
if seed := binary.LittleEndian.Uint64(data); seed > 0 {
return int64(seed)
}
}
seed, data := randomSeed()
if err := ioutil.WriteFile(path, data, 0600); err != nil {
log.Printf("warning: can't write random seed file: %v", err)
}
return int64(seed)
}
var crc64Table *crc64.Table
func init() {
......
package tabacco
package jobs
import (
"context"
......@@ -12,49 +12,45 @@ import (
"time"
)
type fakeManager struct {
counts map[string]int
type testJobCounter struct {
mx sync.Mutex
counter map[string]int
}
func newFakeManager() *fakeManager {
return &fakeManager{
counts: make(map[string]int),
func newTestJobCounter() *testJobCounter {
return &testJobCounter{
counter: make(map[string]int),
}
}
func (f *fakeManager) BackupJob(_ context.Context, specs []SourceSpec) (Backup, Job, error) {
j := newJob("backup", func(_ context.Context) error {
for _, spec := range specs {
f.counts[spec.Name] = f.counts[spec.Name] + 1
}
return nil
})
return Backup{ID: "haha"}, j, nil
}
func (f *fakeManager) Backup(ctx context.Context, sourceSpecs []SourceSpec) (Backup, error) {
backup, job, err := f.BackupJob(ctx, sourceSpecs)
if err != nil {
return backup, err
}
err = job.RunContext(ctx)
return backup, err
func (c *testJobCounter) incr(key string) {
c.mx.Lock()
c.counter[key]++
c.mx.Unlock()
}
func (f *fakeManager) RestoreJob(_ context.Context, _ FindRequest, _ string) (Job, error) {
return newJob("restore", func(_ context.Context) error { return nil }), nil
func (c *testJobCounter) withCounter(key string, job Job) Job {
return JobFunc(func(ctx context.Context) error {
c.incr(key)
return job.RunContext(ctx)
})
}
func (f *fakeManager) Restore(ctx context.Context, req FindRequest, target string) error {
job, err := f.RestoreJob(ctx, req, target)
if err != nil {
return err
// JobGeneratorFunc that will generate a job that waits a little.
func waitJob(jc *testJobCounter, key string) JobGeneratorFunc {
return func() Job {
return jc.withCounter(key, JobFunc(func(ctx context.Context) error {
c := time.After(300 * time.Millisecond)
select {
case <-c:
case <-ctx.Done():
return ctx.Err()
}
return nil
}))
}
return job.RunContext(ctx)
}
func (f *fakeManager) Close() error { return nil }
func TestScheduler(t *testing.T) {
tmpf, err := ioutil.TempFile("", "")
if err != nil {
......@@ -63,54 +59,35 @@ func TestScheduler(t *testing.T) {
tmpf.Close()
defer os.Remove(tmpf.Name()) // nolint
m := newFakeManager()
sourceSpecs := []SourceSpec{
{
Name: "source1",
Handler: "file1",
Schedule: "@every 1s",
Atoms: []Atom{
{
Name: "user1",
RelativePath: "user1",
},
{
Name: "user2",
RelativePath: "user2",