Commit bb99d517 authored by ale's avatar ale

Simplify internal interfaces

Clear separation between configuration and its parsed results, which
are now maintained in the RuntimeContext to kept them consistent
within backup/restore jobs.
parent 434f164b
Pipeline #3487 passed with stage
in 41 seconds
......@@ -35,7 +35,8 @@ func NewAgent(ctx context.Context, configMgr *ConfigManager, ms MetadataStore) (
case <-stopCh:
return
case <-notifyCh:
schedule, err := makeSchedule(ctx, mgr, configMgr.getSourceSpecs(), configMgr.getSeed())
config := configMgr.current()
schedule, err := makeSchedule(ctx, mgr, config.SourceSpecs(), config.Seed())
if err != nil {
log.Printf("error updating scheduler: %v", err)
}
......
......@@ -23,7 +23,7 @@ var defaultSeedFile = "/var/tmp/.tabacco_scheduler_seed"
// holds it all together.
type Config struct {
Hostname string `yaml:"hostname"`
Queue jobs.QueueSpec `yaml:"queue_config"`
Queue *jobs.QueueSpec `yaml:"queue_config"`
Repository RepositorySpec `yaml:"repository"`
DryRun bool `yaml:"dry_run"`
DefaultNiceLevel int `yaml:"default_nice_level"`
......@@ -37,30 +37,54 @@ type Config struct {
}
// RuntimeContext provides access to runtime objects whose lifetime is
// ultimately tied to the configuration.
// ultimately tied to the configuration. Configuration can change
// during the lifetime of the process, but we want backup jobs to have
// a consistent view of the configuration while they execute, so
// access to the current version of the configuration is controlled to
// the ConfigManager.
type RuntimeContext interface {
Shell() *Shell
Repo() Repository
QueueSpec() *jobs.QueueSpec
Seed() int64
WorkDir() string
SourceSpecs() []*SourceSpec
FindSource(string) *SourceSpec
HandlerSpec(string) *HandlerSpec
Close()
}
// The set of objects that are created from a Config. Can change, so
// its access is controlled by the ConfigManager. However it stays
// fixed during a running backup.
//
// This is an implementation of RuntimeContext.
type runtimeAssets struct {
handlerMap map[string]*HandlerSpec
repo Repository
seed int64
shell *Shell
// The set of objects that are created from a Config and that the main
// code cares about.
type parsedConfig struct {
handlerMap map[string]*HandlerSpec
sourceSpecs []*SourceSpec
sourceSpecsByName map[string]*SourceSpec
queue *jobs.QueueSpec
repo Repository
seed int64
shell *Shell
workDir string
}
func (a *runtimeAssets) Close() {
func (a *parsedConfig) Close() {
a.repo.Close() // nolint
}
func (a *runtimeAssets) Shell() *Shell {
return a.shell
func (a *parsedConfig) Shell() *Shell { return a.shell }
func (a *parsedConfig) Repo() Repository { return a.repo }
func (a *parsedConfig) QueueSpec() *jobs.QueueSpec { return a.queue }
func (a *parsedConfig) Seed() int64 { return a.seed }
func (a *parsedConfig) WorkDir() string { return a.workDir }
func (a *parsedConfig) SourceSpecs() []*SourceSpec { return a.sourceSpecs }
func (a *parsedConfig) HandlerSpec(name string) *HandlerSpec {
return a.handlerMap[name]
}
func (a *parsedConfig) FindSource(name string) *SourceSpec {
return a.sourceSpecsByName[name]
}
func buildHandlerMap(specs []*HandlerSpec) map[string]*HandlerSpec {
......@@ -77,7 +101,7 @@ func buildHandlerMap(specs []*HandlerSpec) map[string]*HandlerSpec {
return m
}
func (c *Config) parse() (*runtimeAssets, error) {
func (c *Config) parse() (*parsedConfig, error) {
shell := NewShell(c.DryRun)
shell.SetNiceLevel(c.DefaultNiceLevel)
shell.SetIOClass(c.DefaultIOClass)
......@@ -98,21 +122,20 @@ func (c *Config) parse() (*runtimeAssets, error) {
// Sources that fail the check are removed from the
// SourceSpecs array. We also check that sources have unique
// names.
tmp := make(map[string]struct{})
srcMap := make(map[string]*SourceSpec)
var srcs []*SourceSpec
for _, spec := range c.SourceSpecs {
if err := spec.Check(handlerMap); err != nil {
merr.Add(fmt.Errorf("source %s: %v", spec.Name, err))
continue
}
if _, ok := tmp[spec.Name]; ok {
if _, ok := srcMap[spec.Name]; ok {
merr.Add(fmt.Errorf("duplicated source %s", spec.Name))
continue
}
tmp[spec.Name] = struct{}{}
srcMap[spec.Name] = spec
srcs = append(srcs, spec)
}
c.SourceSpecs = srcs
// Read (or create) the seed file.
seedFile := defaultSeedFile
......@@ -121,11 +144,15 @@ func (c *Config) parse() (*runtimeAssets, error) {
}
seed := mustGetSeed(seedFile)
return &runtimeAssets{
shell: shell,
repo: repo,
handlerMap: handlerMap,
seed: seed,
return &parsedConfig{
handlerMap: handlerMap,
sourceSpecs: srcs,
sourceSpecsByName: srcMap,
queue: c.Queue,
shell: shell,
repo: repo,
seed: seed,
workDir: c.WorkDir,
}, merr.OrNil()
}
......@@ -251,8 +278,7 @@ func foreachYAMLFile(dir string, f func(string) error) error {
// unregister).
type ConfigManager struct {
mx sync.Mutex
config *Config
assets *runtimeAssets
parsed *parsedConfig
// Listeners are notified on every reload.
notifyCh chan struct{}
......@@ -285,8 +311,8 @@ func NewConfigManager(config *Config) (*ConfigManager, error) {
// Reload the configuration (at least, the parts of it that can be
// dynamically reloaded).
func (m *ConfigManager) Reload(config *Config) error {
assets, err := config.parse()
if assets == nil {
parsed, err := config.parse()
if parsed == nil {
return err
} else if err != nil {
log.Printf("warning: errors in configuration: %v", err)
......@@ -296,13 +322,12 @@ func (m *ConfigManager) Reload(config *Config) error {
// goroutine, that does not hold the lock).
m.mx.Lock()
defer m.mx.Unlock()
if m.assets != nil {
m.assets.Close() // nolint
if m.parsed != nil {
m.parsed.Close() // nolint
}
log.Printf("loaded new config: %d handlers, %d sources", len(assets.handlerMap), len(config.SourceSpecs))
m.assets = assets
m.config = config
log.Printf("loaded new config: %d handlers, %d sources", len(parsed.handlerMap), len(parsed.sourceSpecs))
m.parsed = parsed
m.notifyCh <- struct{}{}
return nil
}
......@@ -311,8 +336,8 @@ func (m *ConfigManager) Reload(config *Config) error {
func (m *ConfigManager) Close() {
m.mx.Lock()
close(m.notifyCh)
if m.assets != nil {
m.assets.Close()
if m.parsed != nil {
m.parsed.Close()
}
m.mx.Unlock()
}
......@@ -330,57 +355,16 @@ func (m *ConfigManager) Notify() <-chan struct{} {
return ch
}
// Captures current runtime assets into a RuntimeContext
func (m *ConfigManager) newRuntimeContext() RuntimeContext {
return m.assets
}
func (m *ConfigManager) getHandlerSpec(name string) (*HandlerSpec, bool) {
m.mx.Lock()
defer m.mx.Unlock()
h, ok := m.assets.handlerMap[name]
return h, ok
}
func (m *ConfigManager) getRepository() Repository {
m.mx.Lock()
defer m.mx.Unlock()
return m.assets.repo
}
func (m *ConfigManager) getQueueSpec() jobs.QueueSpec {
m.mx.Lock()
defer m.mx.Unlock()
return m.config.Queue
}
func (m *ConfigManager) getSourceSpecs() []*SourceSpec {
m.mx.Lock()
defer m.mx.Unlock()
return m.config.SourceSpecs
}
func (m *ConfigManager) findSource(name string) *SourceSpec {
m.mx.Lock()
defer m.mx.Unlock()
for _, src := range m.config.SourceSpecs {
if src.Name == name {
return src
}
}
return nil
}
func (m *ConfigManager) getSeed() int64 {
m.mx.Lock()
defer m.mx.Unlock()
return m.assets.seed
// NewRuntimeContext returns a new RuntimeContext, capturing current
// configuration and runtime assets.
func (m *ConfigManager) NewRuntimeContext() RuntimeContext {
return m.current()
}
func (m *ConfigManager) getWorkDir() string {
func (m *ConfigManager) current() *parsedConfig {
m.mx.Lock()
defer m.mx.Unlock()
return m.config.WorkDir
return m.parsed
}
func mustGetSeed(path string) int64 {
......
......@@ -36,8 +36,8 @@ func TestConfigManager(t *testing.T) {
defer mgr.Close()
// Test one of the accessor methods.
if s := mgr.getSourceSpecs(); len(s) != 1 {
t.Fatalf("getSourceSpecs() bad result: %+v", s)
if s := mgr.current().SourceSpecs(); len(s) != 1 {
t.Fatalf("current().SourceSpecs() bad result: %+v", s)
}
// Test the Notify() mechanism by checking that it triggers
......@@ -68,7 +68,7 @@ func TestConfig_Parse(t *testing.T) {
type testdata struct {
config *Config
expectedOK bool
checkFn func(*runtimeAssets, []*Dataset) error
checkFn func([]*Dataset) error
}
tdd := []testdata{
// The following tests cover a few ways to generate
......@@ -166,29 +166,29 @@ func TestConfig_Parse(t *testing.T) {
"password": "hello",
}
ra, err := td.config.parse()
parsed, err := td.config.parse()
if err != nil && td.expectedOK {
t.Errorf("unexpected error for config %+v: %v", td.config, err)
} else if err == nil && !td.expectedOK {
t.Errorf("missing error for config %+v", td.config)
} else {
datasets, err := parseAllSources(ra, td.config.SourceSpecs)
datasets, err := parseAllSources(parsed.SourceSpecs())
if err != nil {
t.Errorf("failed to parse sources %+v: %v", td.config.SourceSpecs, err)
}
if td.checkFn != nil {
if err := td.checkFn(ra, datasets); err != nil {
if err := td.checkFn(datasets); err != nil {
t.Errorf("check failed for config %+v: %v", td.config, err)
}
}
}
if ra != nil {
ra.Close()
if parsed != nil {
parsed.Close()
}
}
}
func parseAllSources(ra *runtimeAssets, specs []*SourceSpec) ([]*Dataset, error) {
func parseAllSources(specs []*SourceSpec) ([]*Dataset, error) {
var out []*Dataset
for _, spec := range specs {
ds, err := spec.Parse(context.Background())
......@@ -200,7 +200,7 @@ func parseAllSources(ra *runtimeAssets, specs []*SourceSpec) ([]*Dataset, error)
return out, nil
}
func checkTwoUserAccountsAtoms(ra *runtimeAssets, datasets []*Dataset) error {
func checkTwoUserAccountsAtoms(datasets []*Dataset) error {
var numAtoms int
for _, ds := range datasets {
if ds.ID == "" {
......
......@@ -30,14 +30,14 @@ func atomPath(a Atom, root string) string {
return filepath.Join(root, a.Name)
}
func (h *fileHandler) BackupJob(rctx RuntimeContext, repo Repository, backup *Backup, ds *Dataset) jobs.Job {
func (h *fileHandler) BackupJob(rctx RuntimeContext, backup *Backup, ds *Dataset) jobs.Job {
// Build the list of filesystem paths to pass to the
// Repository.Backup method.
var paths []string
for _, a := range ds.Atoms {
paths = append(paths, atomPath(a, h.path))
}
cmd := repo.BackupCmd(backup, ds, paths)
cmd := rctx.Repo().BackupCmd(backup, ds, paths)
// Now pass those paths to the Backup method.
return jobs.JobFunc(func(ctx context.Context) error {
......@@ -45,7 +45,7 @@ func (h *fileHandler) BackupJob(rctx RuntimeContext, repo Repository, backup *Ba
})
}
func (h *fileHandler) RestoreJob(rctx RuntimeContext, repo Repository, backup *Backup, ds *Dataset, target string) jobs.Job {
func (h *fileHandler) RestoreJob(rctx RuntimeContext, backup *Backup, ds *Dataset, target string) jobs.Job {
// Build the list of filesystem paths to pass to the
// Repository.Backup method.
var paths []string
......@@ -55,7 +55,7 @@ func (h *fileHandler) RestoreJob(rctx RuntimeContext, repo Repository, backup *B
// Call the repo Restore method.
return jobs.JobFunc(func(ctx context.Context) error {
cmd, err := repo.RestoreCmd(ctx, rctx, backup, ds, paths, target)
cmd, err := rctx.Repo().RestoreCmd(ctx, rctx, backup, ds, paths, target)
if err != nil {
return err
}
......
......@@ -60,21 +60,21 @@ func newPipeHandler(name string, params Params) (Handler, error) {
return h, nil
}
func (h *pipeHandler) BackupJob(rctx RuntimeContext, repo Repository, backup *Backup, ds *Dataset) jobs.Job {
func (h *pipeHandler) BackupJob(rctx RuntimeContext, backup *Backup, ds *Dataset) jobs.Job {
cmd := fmt.Sprintf(
"(%s)%s | %s",
expandVars(h.backupCmd, backup, ds),
h.compressSuffix(),
repo.BackupStreamCmd(backup, ds),
rctx.Repo().BackupStreamCmd(backup, ds),
)
return jobs.JobFunc(func(ctx context.Context) error {
return rctx.Shell().Run(ctx, cmd)
})
}
func (h *pipeHandler) RestoreJob(rctx RuntimeContext, repo Repository, backup *Backup, ds *Dataset, target string) jobs.Job {
func (h *pipeHandler) RestoreJob(rctx RuntimeContext, backup *Backup, ds *Dataset, target string) jobs.Job {
return jobs.JobFunc(func(ctx context.Context) error {
restoreCmd, err := repo.RestoreStreamCmd(ctx, rctx, backup, ds, getWorkDir(ctx))
restoreCmd, err := rctx.Repo().RestoreStreamCmd(ctx, rctx, backup, ds, getWorkDir(ctx))
if err != nil {
return err
}
......
......@@ -191,7 +191,7 @@ type QueueSpec struct {
// NewQueueManager returns a new QueueManager with the provided
// configuration.
func NewQueueManager(spec QueueSpec) *QueueManager {
func NewQueueManager(spec *QueueSpec) *QueueManager {
q := make(map[string]chan struct{})
for name, n := range spec.Workers {
q[name] = make(chan struct{}, n)
......
......@@ -27,7 +27,7 @@ type tabaccoManager struct {
func NewManager(ctx context.Context, configMgr *ConfigManager, ms MetadataStore) (Manager, error) {
// If we can't create a workdirManager, it probably means we
// don't have permissions to the WorkDir, which is bad.
wm, err := newWorkdirManager(configMgr.getWorkDir())
wm, err := newWorkdirManager(configMgr.current().WorkDir())
if err != nil {
return nil, err
}
......@@ -35,7 +35,7 @@ func NewManager(ctx context.Context, configMgr *ConfigManager, ms MetadataStore)
// Note: the queue configuration won't be reloaded.
return &tabaccoManager{
ExclusiveLockManager: jobs.NewExclusiveLockManager(),
QueueManager: jobs.NewQueueManager(configMgr.getQueueSpec()),
QueueManager: jobs.NewQueueManager(configMgr.current().QueueSpec()),
StateManager: jobs.NewStateManager(),
workdirManager: wm,
......@@ -82,9 +82,8 @@ func (m *tabaccoManager) withMetadata(j jobs.Job, backup *Backup, ds *Dataset) j
// operation: we need to wait for it to complete to avoid running the
// backup tasks too soon.
func (m *tabaccoManager) prepareBackupJob(rctx RuntimeContext, backup *Backup) jobs.Job {
repo := m.configMgr.getRepository()
return jobs.JobFunc(func(ctx context.Context) error {
return repo.Init(ctx, rctx)
return rctx.Repo().Init(ctx, rctx)
//log.Printf("preparing backup %s", backup.ID)
//return repo.Prepare(ctx, backup)
})
......@@ -115,8 +114,8 @@ func (m *tabaccoManager) makeBackupJob(ctx context.Context, rctx RuntimeContext,
if err != nil {
return nil, err
}
hspec, ok := m.configMgr.getHandlerSpec(src.Handler)
if !ok {
hspec := rctx.HandlerSpec(src.Handler)
if hspec == nil {
return nil, fmt.Errorf("unknown handler '%s'", src.Handler)
}
h, err := hspec.Parse(src)
......@@ -129,10 +128,9 @@ func (m *tabaccoManager) makeBackupJob(ctx context.Context, rctx RuntimeContext,
//
// TODO: get the timeout from the SourceSpec.
var backupJobs []jobs.Job
repo := m.configMgr.getRepository()
for _, ds := range dsl {
backupJobs = append(backupJobs, m.withMetadata(
h.BackupJob(rctx, repo, backup, ds),
h.BackupJob(rctx, backup, ds),
backup,
ds,
))
......@@ -177,7 +175,7 @@ func (m *tabaccoManager) BackupJob(ctx context.Context, src *SourceSpec) (*Backu
b := newBackup("")
// Create a RuntimeContext.
rctx := m.configMgr.newRuntimeContext()
rctx := m.configMgr.NewRuntimeContext()
j, err := m.makeBackupJob(ctx, rctx, b, src)
return b, j, err
......@@ -195,8 +193,8 @@ func (m *tabaccoManager) Backup(ctx context.Context, src *SourceSpec) (*Backup,
func (m *tabaccoManager) makeRestoreJob(rctx RuntimeContext, backup *Backup, src *SourceSpec, dsl []*Dataset, target string) (jobs.Job, error) {
// Just need the Handler.
hspec, ok := m.configMgr.getHandlerSpec(src.Handler)
if !ok {
hspec := rctx.HandlerSpec(src.Handler)
if hspec == nil {
return nil, fmt.Errorf("unknown handler '%s'", src.Handler)
}
h, err := hspec.Parse(src)
......@@ -208,11 +206,10 @@ func (m *tabaccoManager) makeRestoreJob(rctx RuntimeContext, backup *Backup, src
// doBackupDataset() that binds together the context, backup,
// ds and target via the closure.
var restoreJobs []jobs.Job
repo := m.configMgr.getRepository()
for _, ds := range dsl {
restoreJobs = append(
restoreJobs,
h.RestoreJob(rctx, repo, backup, ds, target),
h.RestoreJob(rctx, backup, ds, target),
)
}
......@@ -267,7 +264,7 @@ func (m *tabaccoManager) RestoreJob(ctx context.Context, req *FindRequest, targe
}
// Create a RuntimeContext.
rctx := m.configMgr.newRuntimeContext()
rctx := m.configMgr.NewRuntimeContext()
var restoreJobs []jobs.Job
merr := new(util.MultiError)
......@@ -275,7 +272,7 @@ func (m *tabaccoManager) RestoreJob(ctx context.Context, req *FindRequest, targe
// Group the datasets by source, find the source and create the restore jobs.
for srcName, dsl := range groupDatasetsBySource(b.Datasets) {
src := m.configMgr.findSource(srcName)
src := rctx.FindSource(srcName)
if src == nil {
merr.Add(fmt.Errorf("unknown source '%s'", srcName))
continue
......
......@@ -202,7 +202,7 @@ func TestManager_Backup(t *testing.T) {
DatasetsCommand: "echo '[{name: users, atoms: [{name: user1}, {name: user2}]}]'",
},
}
queueSpec := jobs.QueueSpec{
queueSpec := &jobs.QueueSpec{
Workers: map[string]int{"backup": 2},
}
......@@ -225,7 +225,7 @@ func TestManager_Backup(t *testing.T) {
}
defer m.Close()
for _, src := range configMgr.getSourceSpecs() {
for _, src := range configMgr.current().SourceSpecs() {
backup, err := m.Backup(context.TODO(), src)
if err != nil {
t.Fatal(err)
......
......@@ -67,7 +67,7 @@ func runResticTest(t *testing.T, tmpdir string, source *SourceSpec, restorePatte
},
},
}
queueSpec := jobs.QueueSpec{
queueSpec := &jobs.QueueSpec{
Workers: map[string]int{"backup": 2, "restore": 1},
}
sourceSpecs := []*SourceSpec{source}
......@@ -90,7 +90,7 @@ func runResticTest(t *testing.T, tmpdir string, source *SourceSpec, restorePatte
}
defer m.Close()
backup, err := m.Backup(context.TODO(), configMgr.getSourceSpecs()[0])
backup, err := m.Backup(context.TODO(), configMgr.current().SourceSpecs()[0])
if err != nil {
t.Fatal(err)
}
......
......@@ -118,8 +118,8 @@ type MetadataStore interface {
// Handler can backup and restore a specific class of datasets.
type Handler interface {
BackupJob(RuntimeContext, Repository, *Backup, *Dataset) jobs.Job
RestoreJob(RuntimeContext, Repository, *Backup, *Dataset, string) jobs.Job
BackupJob(RuntimeContext, *Backup, *Dataset) jobs.Job
RestoreJob(RuntimeContext, *Backup, *Dataset, string) jobs.Job
}
// Repository is the interface to a remote repository.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment