Commit 3ece81fd authored by ale's avatar ale

Allow reloading of configuration

Use a configuration manager to handle runtime config-derived data and
eventually allow live configuration reloading.
parent 124dfaf7
......@@ -156,20 +156,25 @@ func TestBackup(t *testing.T) {
}
// Run the backup.
m, err := NewManager(
context.TODO(),
handlerSpecs,
repoSpec,
store,
queueSpec,
NewShell(true),
)
configMgr, err := newConfigManager(&Config{
Queue: queueSpec,
Repository: repoSpec,
HandlerSpecs: handlerSpecs,
SourceSpecs: sourceSpecs,
DryRun: true,
})
if err != nil {
t.Fatal(err)
}
defer configMgr.Close()
m, err := NewManager(context.TODO(), configMgr, store)
if err != nil {
t.Fatal(err)
}
defer m.Close()
backup, err := m.Backup(context.TODO(), sourceSpecs)
backup, err := m.Backup(context.TODO(), configMgr.SourceSpecs())
if err != nil {
t.Fatal(err)
}
......
package tabacco
import (
"errors"
"io/ioutil"
"log"
"path/filepath"
"sync"
"gopkg.in/yaml.v2"
)
// Config is the global configuration object. While the actual
// configuration is spread over multiple files and directories, this
// holds it all together.
type Config struct {
Hostname string `yaml:"hostname"`
Queue MultiQueueSpec `yaml:"queue_config"`
Repository RepositorySpec `yaml:"repository"`
DryRun bool `yaml:"dry_run"`
DefaultNiceLevel int `yaml:"default_nice_level"`
DefaultIOClass int `yaml:"default_io_class"`
HandlerSpecs []HandlerSpec
SourceSpecs []SourceSpec
}
func readHandlersFromDir(dir string) ([]HandlerSpec, error) {
var out []HandlerSpec
err := foreachYAMLFile(dir, func(path string) error {
var spec HandlerSpec
if err := readYAMLFile(path, &spec); err != nil {
return err
}
out = append(out, spec)
return nil
})
return out, err
}
func readSourcesFromDir(dir string) ([]SourceSpec, error) {
var out []SourceSpec
err := foreachYAMLFile(dir, func(path string) error {
var spec SourceSpec
if err := readYAMLFile(path, &spec); err != nil {
return err
}
out = append(out, spec)
return nil
})
return out, err
}
// ReadConfig reads the configuration from the given path. Sources and
// handlers are read from the 'sources' and 'handlers' subdirectories
// of the directory containing the main configuration file.
func ReadConfig(path string) (*Config, error) {
var config Config
if err := readYAMLFile(path, &config); err != nil {
return nil, err
}
dir := filepath.Dir(path)
sourceSpecs, err := readSourcesFromDir(filepath.Join(dir, "sources"))
if err != nil {
logMultiError("warning: source configuration error: ", err)
if len(sourceSpecs) == 0 {
return nil, errors.New("no configured sources")
}
}
config.SourceSpecs = sourceSpecs
handlerSpecs, err := readHandlersFromDir(filepath.Join(dir, "handlers"))
if err != nil {
logMultiError("warning: handler configuration error: ", err)
if len(handlerSpecs) == 0 {
return nil, errors.New("no configured handlers")
}
}
config.HandlerSpecs = handlerSpecs
return &config, nil
}
func logMultiError(prefix string, err error) {
if merr, ok := err.(*multiError); ok {
for _, e := range merr.Errors() {
log.Printf("%s%v", prefix, e)
}
} else {
log.Printf("%s%v", prefix, err)
}
}
func readYAMLFile(path string, obj interface{}) error {
data, err := ioutil.ReadFile(path) // nolint: gosec
if err != nil {
return err
}
return yaml.UnmarshalStrict(data, obj)
}
func foreachYAMLFile(dir string, f func(string) error) error {
files, err := filepath.Glob(filepath.Join(dir, "*.yml"))
if err != nil {
return err
}
merr := new(multiError)
for _, path := range files {
if err := f(path); err != nil {
merr.Add(err)
}
}
return merr.orNil()
}
// The config manager holds all runtime data derived from the
// configuration itself, so it can be easily reloaded in a single
// place.
type configManager struct {
mx sync.Mutex
config *Config
handlerMap map[string]Handler
repo Repository
shell *Shell
}
func newConfigManager(config *Config) (*configManager, error) {
m := new(configManager)
if err := m.reload(config); err != nil {
return nil, err
}
return m, nil
}
func (m *configManager) reload(config *Config) error {
m.mx.Lock()
defer m.mx.Unlock()
shell := NewShell(config.DryRun)
shell.SetNiceLevel(config.DefaultNiceLevel)
shell.SetIOClass(config.DefaultIOClass)
m.shell = shell
handlerMap, err := buildHandlerMap(config.HandlerSpecs, shell)
if err != nil {
return err
}
m.handlerMap = handlerMap
repo, err := config.Repository.Parse(shell)
if err != nil {
return err
}
if m.repo != nil {
m.repo.Close() // nolint
}
m.repo = repo
m.config = config
return nil
}
func (m *configManager) Close() {
m.mx.Lock()
if m.repo != nil {
m.repo.Close() // nolint
}
m.mx.Unlock()
}
func (m *configManager) Shell() *Shell {
m.mx.Lock()
defer m.mx.Unlock()
return m.shell
}
func (m *configManager) HandlerMap() map[string]Handler {
m.mx.Lock()
defer m.mx.Unlock()
return m.handlerMap
}
func (m *configManager) Repository() Repository {
m.mx.Lock()
defer m.mx.Unlock()
return m.repo
}
func (m *configManager) QueueSpec() MultiQueueSpec {
m.mx.Lock()
defer m.mx.Unlock()
return m.config.Queue
}
func (m *configManager) SourceSpecs() []SourceSpec {
m.mx.Lock()
defer m.mx.Unlock()
return m.config.SourceSpecs
}
......@@ -7,12 +7,12 @@ import (
// HandlerSpec defines the configuration for a handler.
type HandlerSpec struct {
// Handler name (unique global identifier).
Name string `json:"name"`
Name string `yaml:"name"`
// Handler type, one of the known types.
Type string `json:"type"`
Type string `yaml:"type"`
Params map[string]interface{} `json:"params"`
Params map[string]interface{} `yaml:"params"`
}
// Parse a HandlerSpec and return a Handler instance.
......
......@@ -140,7 +140,7 @@ type multiQueueManager struct {
// MultiQueueSpec describes the configuration of named queues.
type MultiQueueSpec struct {
Workers map[string]int `json:"workers"`
Workers map[string]int `yaml:"workers"`
}
func newMultiQueueManager(spec MultiQueueSpec) *multiQueueManager {
......
......@@ -15,35 +15,27 @@ type tabaccoManager struct {
*multiQueueManager
*stateManager
handlerMap map[string]Handler
repo Repository
ms MetadataStore
shell *Shell
configMgr *configManager
ms MetadataStore
}
// NewManager creates a new Manager.
func NewManager(ctx context.Context, handlerSpecs []HandlerSpec, repoSpec RepositorySpec, ms MetadataStore, queueSpec MultiQueueSpec, shell *Shell) (Manager, error) {
handlerMap, repo, err := parseSpecs(ctx, handlerSpecs, repoSpec, shell)
if err != nil {
return nil, err
}
func NewManager(ctx context.Context, configMgr *configManager, ms MetadataStore) (Manager, error) {
// Note: the queue configuration won't be reloaded.
return &tabaccoManager{
exclusiveLockManager: newExclusiveLockManager(),
multiQueueManager: newMultiQueueManager(queueSpec),
multiQueueManager: newMultiQueueManager(configMgr.QueueSpec()),
stateManager: newStateManager(),
handlerMap: handlerMap,
repo: repo,
ms: ms,
shell: shell,
configMgr: configMgr,
ms: ms,
}, nil
}
// Close the Manager and free all associated resources (those owned by
// this object).
func (m *tabaccoManager) Close() error {
return m.repo.Close()
return nil
}
// Prepare the repository for a new backup. This is a synchronous
......@@ -51,12 +43,13 @@ func (m *tabaccoManager) Close() error {
// backup tasks too soon.
func (m *tabaccoManager) prepareBackupJob(backup Backup) Job {
return newJob("backup_prep", func(ctx context.Context) error {
if err := m.repo.Init(ctx); err != nil {
repo := m.configMgr.Repository()
if err := repo.Init(ctx); err != nil {
log.Printf("repository init failed: %v", err)
return err
}
log.Printf("preparing backup")
return m.repo.Prepare(ctx, backup)
log.Printf("preparing backup %s", backup.ID)
return repo.Prepare(ctx, backup)
})
}
......@@ -64,12 +57,12 @@ func (m *tabaccoManager) prepareBackupJob(backup Backup) Job {
func (m *tabaccoManager) backupDatasetJob(backup Backup, ds Dataset) Job {
jobID := fmt.Sprintf("backup_%s", ds.Name)
j := newJob(jobID, func(ctx context.Context) (err error) {
h, ok := m.handlerMap[ds.Handler]
h, ok := m.configMgr.HandlerMap()[ds.Handler]
if !ok {
return fmt.Errorf("%s: unknown handler '%s'", ds.Name, ds.Handler)
}
ds, err = h.Backup(ctx, m.repo, backup, ds)
ds, err = h.Backup(ctx, m.configMgr.Repository(), backup, ds)
if err != nil {
return fmt.Errorf("%s: backup failed: %v", ds.Name, err)
}
......@@ -134,11 +127,11 @@ func (m *tabaccoManager) Backup(ctx context.Context, sourceSpecs []SourceSpec) (
func (m *tabaccoManager) restoreDatasetJob(backup Backup, ds Dataset, target string) Job {
j := newJob(fmt.Sprintf("restore_%s_%s", ds.Name, backup.ID), func(ctx context.Context) error {
log.Printf("restoring %+v %+v", ds, backup)
h, ok := m.handlerMap[ds.Handler]
h, ok := m.configMgr.HandlerMap()[ds.Handler]
if !ok {
return fmt.Errorf("%s: unknown handler '%s'", ds.Name, ds.Handler)
}
return h.Restore(ctx, m.repo, backup, ds, target)
return h.Restore(ctx, m.configMgr.Repository(), backup, ds, target)
})
return m.withDefaults(j, "restore")
}
......@@ -184,17 +177,3 @@ func newBackup(host string) Backup {
Timestamp: time.Now(),
}
}
func parseSpecs(ctx context.Context, handlerSpecs []HandlerSpec, repoSpec RepositorySpec, shell *Shell) (map[string]Handler, Repository, error) {
handlerMap, err := buildHandlerMap(handlerSpecs, shell)
if err != nil {
return nil, nil, err
}
repo, err := repoSpec.Parse(shell)
if err != nil {
return nil, nil, err
}
return handlerMap, repo, nil
}
......@@ -6,9 +6,9 @@ import (
// RepositorySpec defines the configuration of a repository.
type RepositorySpec struct {
Name string `json:"name"`
Type string `json:"type"`
Params map[string]interface{} `json:"params"`
Name string `yaml:"name"`
Type string `yaml:"type"`
Params map[string]interface{} `yaml:"params"`
}
// Parse a RepositorySpec and return a Repository instance.
......
......@@ -75,24 +75,28 @@ func TestRestic(t *testing.T) {
},
}
queueSpec := MultiQueueSpec{
Workers: map[string]int{"backup": 2},
Workers: map[string]int{"backup": 2, "restore": 1},
}
// Run the backup.
m, err := NewManager(
context.TODO(),
handlerSpecs,
repoSpec,
store,
queueSpec,
NewShell(false),
)
configMgr, err := newConfigManager(&Config{
Queue: queueSpec,
Repository: repoSpec,
HandlerSpecs: handlerSpecs,
SourceSpecs: sourceSpecs,
})
if err != nil {
t.Fatal(err)
}
defer configMgr.Close()
m, err := NewManager(context.TODO(), configMgr, store)
if err != nil {
t.Fatal(err)
}
defer m.Close()
backup, err := m.Backup(context.TODO(), sourceSpecs)
backup, err := m.Backup(context.TODO(), configMgr.SourceSpecs())
if err != nil {
t.Fatal(err)
}
......
......@@ -4,11 +4,13 @@ import (
"context"
"encoding/binary"
"fmt"
"hash/crc64"
"io/ioutil"
"log"
"math/rand"
"sort"
"strings"
"sync"
"time"
"github.com/robfig/cron"
......@@ -28,7 +30,12 @@ import (
// over time because the random seed it's generated from is saved in a
// file.
type Scheduler struct {
c *cron.Cron
mgr Manager
hostSeed int64
rootCtx context.Context
mx sync.Mutex
c *cron.Cron
}
// NewScheduler creates a new Scheduler.
......@@ -36,27 +43,57 @@ func NewScheduler(ctx context.Context, m Manager, sourceSpecs []SourceSpec, seed
if seedFile == "" {
seedFile = "/var/tmp/.tabacco_scheduler_seed"
}
rnd := mustGetRand(seedFile)
c := cron.New()
hostSeed := mustGetSeed(seedFile)
s := &Scheduler{
c: c,
mgr: m,
hostSeed: hostSeed,
rootCtx: ctx,
}
if err := s.updateSchedule(sourceSpecs); err != nil {
return nil, err
}
return s, nil
}
type scheduleAndJob struct {
sched cron.Schedule
spec SourceSpec
}
func (s *Scheduler) updateSchedule(sourceSpecs []SourceSpec) error {
// Parse first, schedule later.
merr := new(multiError)
var tmp []scheduleAndJob
for _, spec := range sourceSpecs {
if spec.Schedule != "" {
sched, err := parseSchedule(rnd, spec.Schedule)
sched, err := parseSchedule(spec.Schedule, s.hostSeed)
if err != nil {
merr.Add(fmt.Errorf("%s: bad schedule: %v", spec.Name, err))
continue
}
c.Schedule(sched, &startBackupCronJob{mgr: m, spec: spec, ctx: ctx})
tmp = append(tmp, scheduleAndJob{sched: sched, spec: spec})
}
}
c.Start()
if !merr.isNil() {
return merr
}
return s, merr.orNil()
// Create a new cron scheduler and schedule all the jobs.
c := cron.New()
for _, sj := range tmp {
c.Schedule(sj.sched, &startBackupCronJob{mgr: s.mgr, spec: sj.spec, ctx: s.rootCtx})
}
// Stop the previous cron job, if any, and start this one.
s.mx.Lock()
defer s.mx.Unlock()
if s.c != nil {
s.c.Stop()
}
c.Start()
s.c = c
return nil
}
type startBackupCronJob struct {
......@@ -111,15 +148,6 @@ func jobStatusListOrderByName(list []JobStatus) *jobStatusList {
}
}
// func jobStatusListOrderByRunTime(list []JobStatus) *jobStatusList {
// return &jobStatusList{
// list: list,
// lessFn: func(i, j int) bool {
// return list[i].RunAt.After(list[j].RunAt)
// },
// }
// }
// SchedulerStatus holds information about the scheduler state, and
// the past executions.
type SchedulerStatus struct {
......@@ -149,14 +177,14 @@ func (s *Scheduler) GetStatus() *SchedulerStatus {
return status
}
func parseSchedule(rnd *rand.Rand, s string) (cron.Schedule, error) {
func parseSchedule(s string, hostSeed int64) (cron.Schedule, error) {
switch {
case strings.HasPrefix(s, "@random_every "):
period, err := time.ParseDuration(s[14:])
if err != nil {
return nil, err
}
return randomScheduleEvery(rnd, period), nil
return randomScheduleEvery(makeScheduleRand(s, hostSeed), period), nil
default:
return cron.Parse(s)
}
......@@ -192,15 +220,28 @@ func (s *randomPeriodicSchedule) Next(now time.Time) time.Time {
return next
}
func mustGetRand(path string) *rand.Rand {
func mustGetSeed(path string) int64 {
if data, err := ioutil.ReadFile(path); err == nil && len(data) == 8 { // nolint: gosec
if seed := binary.LittleEndian.Uint64(data); seed > 0 {
return rand.New(rand.NewSource(int64(seed)))
return int64(seed)
}
}
seed, data := randomSeed()
if err := ioutil.WriteFile(path, data, 0600); err != nil {
log.Printf("warning: can't write random seed file: %v", err)
}
return rand.New(rand.NewSource(int64(seed)))
return int64(seed)
}
var crc64Table *crc64.Table
func init() {
crc64Table = crc64.MakeTable(crc64.ISO)
}
func makeScheduleRand(name string, hostSeed int64) *rand.Rand {
// Mix the unique schedule name into the host seed.
crc := int64(crc64.Checksum([]byte(name), crc64Table) & 0x7fffffff)
seed := hostSeed ^ crc
return rand.New(rand.NewSource(seed))
}
......@@ -10,16 +10,16 @@ import (
// SourceSpec defines the configuration for a data source.
type SourceSpec struct {
Name string `json:"name"`
Handler string `json:"handler"`
Name string `yaml:"name"`
Handler string `yaml:"handler"`
// Schedule to run the backup on.
Schedule string `json:"schedule"`
Schedule string `yaml:"schedule"`
// Define atoms statically, or use a script to generate them
// dynamically on every new backup.
Atoms []Atom `json:"atoms,omitempty"`
AtomsScript string `json:"atoms_script,omitempty"`
Atoms []Atom `yaml:"atoms"`
AtomsScript string `yaml:"atoms_script"`
//Params map[string]interface{} `json:"params"`
}
......
......@@ -34,6 +34,10 @@ func (m *multiError) orNil() error {
return nil
}
func (m *multiError) isNil() bool {
return len(m.errors) == 0
}
// Generate a random unique ID. It will return an identifier
// consisting of 32 ascii-friendly bytes (16 random bytes,
// hex-encoded).
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment