Commit 35f8a72e authored by ale's avatar ale

Refactor, take one

Try to make things simpler and more clear. Specifically:

* clarify the semantics of Source, Dataset and Atom
* separate the namespace of Atom paths from the Restic namespace
* simplify the command execution by delegating pipes to /bin/sh
  for efficiency

There's still a bunch to do.
parent a277f63f
Pipeline #3470 passed with stage
in 38 seconds
......@@ -63,7 +63,7 @@ func (a *Agent) Close() {
// Create a new jobs.Schedule that will trigger a separate backup for
// each configured data source that includes a 'schedule' attribute.
func makeSchedule(ctx context.Context, m Manager, sourceSpecs []SourceSpec, hostSeed int64) (*jobs.Schedule, error) {
func makeSchedule(ctx context.Context, m Manager, sourceSpecs []*SourceSpec, hostSeed int64) (*jobs.Schedule, error) {
sched := jobs.NewSchedule(ctx, hostSeed)
merr := new(util.MultiError)
var good int
......@@ -72,9 +72,9 @@ func makeSchedule(ctx context.Context, m Manager, sourceSpecs []SourceSpec, host
continue
}
// Bind spec to a new closure.
err := func(spec SourceSpec) error {
err := func(spec *SourceSpec) error {
return sched.Add(spec.Name, spec.Schedule, func() jobs.Job {
_, j, err := m.BackupJob(ctx, []SourceSpec{spec})
_, j, err := m.BackupJob(ctx, spec)
if err != nil {
log.Printf("%s: can't create backup job: %v", spec.Name, err)
}
......
......@@ -9,19 +9,19 @@ import (
type fakeManager struct{}
func (m *fakeManager) BackupJob(context.Context, []SourceSpec) (Backup, jobs.Job, error) {
return Backup{}, nil, nil
func (m *fakeManager) BackupJob(context.Context, *SourceSpec) (*Backup, jobs.Job, error) {
return &Backup{}, nil, nil
}
func (m *fakeManager) Backup(context.Context, []SourceSpec) (Backup, error) {
return Backup{}, nil
func (m *fakeManager) Backup(context.Context, *SourceSpec) (*Backup, error) {
return &Backup{}, nil
}
func (m *fakeManager) RestoreJob(context.Context, FindRequest, string) (jobs.Job, error) {
func (m *fakeManager) RestoreJob(context.Context, *FindRequest, string) (jobs.Job, error) {
return nil, nil
}
func (m *fakeManager) Restore(context.Context, FindRequest, string) error {
func (m *fakeManager) Restore(context.Context, *FindRequest, string) error {
return nil
}
......@@ -34,27 +34,32 @@ func (m *fakeManager) GetStatus() ([]jobs.Status, []jobs.Status, []jobs.Status)
}
func TestMakeSchedule(t *testing.T) {
sourceSpecs := []SourceSpec{
{
sourceSpecs := []*SourceSpec{
&SourceSpec{
Name: "source1",
Handler: "file1",
Schedule: "@random_every 1d",
Atoms: []Atom{
{
Name: "user1",
RelativePath: "user1",
},
{
Name: "user2",
RelativePath: "user2",
Datasets: []*DatasetSpec{
&DatasetSpec{
Name: "users",
Atoms: []Atom{
{
Name: "user1",
Path: "user1",
},
{
Name: "user2",
Path: "user2",
},
},
},
},
},
{
Name: "source2",
Handler: "dbpipe",
Schedule: "35 3 * * *",
AtomsCommand: "echo user1 user1 ; echo user2 user2",
&SourceSpec{
Name: "source2",
Handler: "dbpipe",
Schedule: "35 3 * * *",
DatasetsCommand: "echo user1 user1 ; echo user2 user2",
},
}
......
......@@ -77,8 +77,8 @@ func (c *restoreCommand) buildRestoreJob(ctx context.Context, mgr tabacco.Manage
return jobs.AsyncGroup(restoreJobs), nil
}
func (c *restoreCommand) newFindRequest(s string) tabacco.FindRequest {
return tabacco.FindRequest{
func (c *restoreCommand) newFindRequest(s string) *tabacco.FindRequest {
return &tabacco.FindRequest{
Pattern: s,
}
}
......
......@@ -32,12 +32,24 @@ type Config struct {
RandomSeedFile string `yaml:"random_seed_file"`
MetadataStoreBackend *clientutil.BackendConfig `yaml:"metadb"`
HandlerSpecs []HandlerSpec
SourceSpecs []SourceSpec
HandlerSpecs []*HandlerSpec
SourceSpecs []*SourceSpec
}
// RuntimeContext provides access to runtime objects whose lifetime is
// ultimately tied to the configuration.
type RuntimeContext interface {
Shell() *Shell
Close()
}
// The set of objects that are created from a Config. Can change, so
// its access is controlled by the ConfigManager. However it stays
// fixed during a running backup.
//
// This is an implementation of RuntimeContext.
type runtimeAssets struct {
handlerMap map[string]Handler
handlerMap map[string]*HandlerSpec
repo Repository
seed int64
shell *Shell
......@@ -47,18 +59,16 @@ func (a *runtimeAssets) Close() {
a.repo.Close() // nolint
}
func buildHandlerMap(specs []HandlerSpec, shell *Shell) (map[string]Handler, error) {
m := make(map[string]Handler)
merr := new(util.MultiError)
for _, spec := range specs {
h, err := spec.Parse(shell)
if err != nil {
merr.Add(err)
continue
}
m[spec.Name] = h
func (a *runtimeAssets) Shell() *Shell {
return a.shell
}
func buildHandlerMap(specs []*HandlerSpec) map[string]*HandlerSpec {
m := make(map[string]*HandlerSpec)
for _, h := range specs {
m[h.Name] = h
}
return m, merr.OrNil()
return m
}
func (c *Config) parse() (*runtimeAssets, error) {
......@@ -68,7 +78,7 @@ func (c *Config) parse() (*runtimeAssets, error) {
// Parse the repository config. An error here is fatal, as we
// don't have a way to operate without a repository.
repo, err := c.Repository.Parse(shell)
repo, err := c.Repository.Parse()
if err != nil {
return nil, err
}
......@@ -76,16 +86,15 @@ func (c *Config) parse() (*runtimeAssets, error) {
merr := new(util.MultiError)
// Build the handlers.
handlerMap, err := buildHandlerMap(c.HandlerSpecs, shell)
if err != nil {
merr.Add(err)
}
handlerMap := buildHandlerMap(c.HandlerSpecs)
// Validate the sources (Parse is called later at runtime).
var srcs []SourceSpec
// Sources that fail the check are removed from the
// SourceSpecs array.
var srcs []*SourceSpec
for _, spec := range c.SourceSpecs {
if err := spec.Check(handlerMap); err != nil {
merr.Add(err)
merr.Add(fmt.Errorf("source %s: %v", spec.Name, err))
continue
}
srcs = append(srcs, spec)
......@@ -110,17 +119,17 @@ func (c *Config) parse() (*runtimeAssets, error) {
// The following functions read YAML files from .d-style directories. To be nice
// to the user, each file can contain either a single object or a list of
// multiple objects.
func readHandlersFromDir(dir string) ([]HandlerSpec, error) {
var out []HandlerSpec
func readHandlersFromDir(dir string) ([]*HandlerSpec, error) {
var out []*HandlerSpec
err := foreachYAMLFile(dir, func(path string) error {
var specs []HandlerSpec
var specs []*HandlerSpec
log.Printf("reading handler: %s", path)
if err := readYAMLFile(path, &specs); err != nil {
var spec HandlerSpec
if err := readYAMLFile(path, &spec); err != nil {
return err
}
specs = []HandlerSpec{spec}
specs = append(specs, &spec)
}
out = append(out, specs...)
return nil
......@@ -128,17 +137,17 @@ func readHandlersFromDir(dir string) ([]HandlerSpec, error) {
return out, err
}
func readSourcesFromDir(dir string) ([]SourceSpec, error) {
var out []SourceSpec
func readSourcesFromDir(dir string) ([]*SourceSpec, error) {
var out []*SourceSpec
err := foreachYAMLFile(dir, func(path string) error {
var specs []SourceSpec
var specs []*SourceSpec
log.Printf("reading source: %s", path)
if err := readYAMLFile(path, &specs); err != nil {
var spec SourceSpec
if err := readYAMLFile(path, &spec); err != nil {
return err
}
specs = []SourceSpec{spec}
specs = append(specs, &spec)
}
out = append(out, specs...)
return nil
......@@ -215,6 +224,7 @@ func foreachYAMLFile(dir string, f func(string) error) error {
merr := new(util.MultiError)
for _, path := range files {
if err := f(path); err != nil {
log.Printf("error loading yaml file %s: %v", path, err)
merr.Add(err)
}
}
......@@ -263,6 +273,8 @@ func (m *ConfigManager) Reload(config *Config) error {
assets, err := config.parse()
if assets == nil {
return err
} else if err != nil {
log.Printf("warning: errors in configuration: %v", err)
}
// Update config and notify listeners (in a separate
......@@ -303,7 +315,12 @@ func (m *ConfigManager) Notify() <-chan struct{} {
return ch
}
func (m *ConfigManager) getHandler(name string) (Handler, bool) {
// Captures current runtime assets into a RuntimeContext
func (m *ConfigManager) newRuntimeContext() RuntimeContext {
return m.assets
}
func (m *ConfigManager) getHandlerSpec(name string) (*HandlerSpec, bool) {
m.mx.Lock()
defer m.mx.Unlock()
h, ok := m.assets.handlerMap[name]
......@@ -322,22 +339,27 @@ func (m *ConfigManager) getQueueSpec() jobs.QueueSpec {
return m.config.Queue
}
func (m *ConfigManager) getSourceSpecs() []SourceSpec {
func (m *ConfigManager) getSourceSpecs() []*SourceSpec {
m.mx.Lock()
defer m.mx.Unlock()
return m.config.SourceSpecs
}
func (m *ConfigManager) getSeed() int64 {
func (m *ConfigManager) findSource(name string) *SourceSpec {
m.mx.Lock()
defer m.mx.Unlock()
return m.assets.seed
for _, src := range m.config.SourceSpecs {
if src.Name == name {
return src
}
}
return nil
}
func (m *ConfigManager) getShell() *Shell {
func (m *ConfigManager) getSeed() int64 {
m.mx.Lock()
defer m.mx.Unlock()
return m.assets.shell
return m.assets.seed
}
func (m *ConfigManager) getWorkDir() string {
......
......@@ -4,9 +4,11 @@ import (
"context"
"errors"
"fmt"
"log"
"os"
"strings"
"testing"
"time"
)
func TestReadConfig(t *testing.T) {
......@@ -27,6 +29,7 @@ func TestConfigManager(t *testing.T) {
if err != nil {
t.Fatal("ReadConfig()", err)
}
log.Printf("loaded %d sources", len(conf.SourceSpecs))
mgr, err := NewConfigManager(conf)
if err != nil {
t.Fatal("NewConfigManager()", err)
......@@ -40,7 +43,12 @@ func TestConfigManager(t *testing.T) {
// Test the Notify() mechanism by checking that it triggers
// right away when setting up a new listener.
<-mgr.Notify()
tmr := time.NewTimer(1 * time.Second)
select {
case <-mgr.Notify():
case <-tmr.C:
t.Fatal("Notify() channel did not trigger")
}
}
func TestRandomSeed(t *testing.T) {
......@@ -61,7 +69,7 @@ func TestConfig_Parse(t *testing.T) {
type testdata struct {
config *Config
expectedOK bool
checkFn func(*runtimeAssets, []Dataset) error
checkFn func(*runtimeAssets, []*Dataset) error
}
tdd := []testdata{
// The following tests cover a few ways to generate
......@@ -69,24 +77,29 @@ func TestConfig_Parse(t *testing.T) {
// the README.
{
&Config{
SourceSpecs: []SourceSpec{
{
Name: "users/account1",
Handler: "file",
Atoms: []Atom{
{RelativePath: "/data/account1"},
},
},
{
Name: "users/account2",
Handler: "file",
Atoms: []Atom{
{RelativePath: "/data/account2"},
SourceSpecs: []*SourceSpec{
&SourceSpec{
Name: "users",
Handler: "file",
Schedule: "@random_every 24h",
Datasets: []*DatasetSpec{
&DatasetSpec{
Name: "account1",
Atoms: []Atom{
{Name: "account1"},
},
},
{
Name: "account2",
Atoms: []Atom{
{Name: "account2"},
},
},
},
},
},
HandlerSpecs: []HandlerSpec{
{
HandlerSpecs: []*HandlerSpec{
&HandlerSpec{
Name: "file",
Type: "file",
Params: map[string]interface{}{"path": "/"},
......@@ -98,18 +111,24 @@ func TestConfig_Parse(t *testing.T) {
},
{
&Config{
SourceSpecs: []SourceSpec{
{
Name: "users",
Handler: "file",
Atoms: []Atom{
{Name: "account1"},
{Name: "account2"},
SourceSpecs: []*SourceSpec{
&SourceSpec{
Name: "users",
Handler: "file",
Schedule: "@random_every 24h",
Datasets: []*DatasetSpec{
&DatasetSpec{
Name: "users",
Atoms: []Atom{
{Name: "account1"},
{Name: "account2"},
},
},
},
},
},
HandlerSpecs: []HandlerSpec{
{
HandlerSpecs: []*HandlerSpec{
&HandlerSpec{
Name: "file",
Type: "file",
Params: map[string]interface{}{"path": "/data"},
......@@ -121,15 +140,16 @@ func TestConfig_Parse(t *testing.T) {
},
{
&Config{
SourceSpecs: []SourceSpec{
{
Name: "users",
Handler: "file",
AtomsCommand: "echo account1; echo account2",
SourceSpecs: []*SourceSpec{
&SourceSpec{
Name: "users",
Handler: "file",
Schedule: "@random_every 24h",
DatasetsCommand: "echo '[{name: account1, atoms: [{name: account1}, {name: account2}]}]'",
},
},
HandlerSpecs: []HandlerSpec{
{
HandlerSpecs: []*HandlerSpec{
&HandlerSpec{
Name: "file",
Type: "file",
Params: map[string]interface{}{"path": "/data"},
......@@ -172,31 +192,27 @@ func TestConfig_Parse(t *testing.T) {
}
}
func parseAllSources(ra *runtimeAssets, specs []SourceSpec) ([]Dataset, error) {
var out []Dataset
func parseAllSources(ra *runtimeAssets, specs []*SourceSpec) ([]*Dataset, error) {
var out []*Dataset
for _, spec := range specs {
ds, err := spec.Parse(context.Background())
if err != nil {
return nil, err
}
dsb := ra.handlerMap[ds.Handler].DatasetsForBackup(ds)
out = append(out, dsb...)
out = append(out, ds...)
}
return out, nil
}
func checkTwoUserAccountsAtoms(ra *runtimeAssets, datasets []Dataset) error {
func checkTwoUserAccountsAtoms(ra *runtimeAssets, datasets []*Dataset) error {
var numAtoms int
for _, ds := range datasets {
if ds.Name == "" {
return errors.New("empty dataset name")
}
if ds.Handler != "file" {
return fmt.Errorf("expected handler 'file', got '%s'", ds.Handler)
}
for _, atom := range ds.Atoms {
if !strings.HasPrefix(atom.SourcePath, "/data/") {
return fmt.Errorf("bad atom source path: %s", atom.SourcePath)
if !strings.HasPrefix(atom.Name, "users/") {
return fmt.Errorf("bad atom name: %s", atom.Name)
}
numAtoms++
}
......
......@@ -10,48 +10,60 @@ import (
type fileHandler struct {
path string
spec HandlerSpec
}
func newFileHandler(spec HandlerSpec, _ *Shell) (Handler, error) {
path, ok := spec.Params["path"].(string)
if !ok || path == "" {
return nil, errors.New("missing path")
func newFileHandler(name string, params Params) (Handler, error) {
path := params.Get("path")
if path == "" {
return nil, errors.New("path not set")
}
return &fileHandler{path: path, spec: spec}, nil
return &fileHandler{path: path}, nil
}
func (h *fileHandler) Spec() HandlerSpec {
return h.spec
}
func (h *fileHandler) DatasetsForBackup(ds Dataset) []Dataset {
// Set SourcePath on all atoms.
var atoms []Atom
for _, atom := range ds.Atoms {
relPath := atom.RelativePath
if relPath == "" {
relPath = atom.Name
// Convert the atom to a path.
func atomPath(a Atom, root string) string {
// If the atom has a path, use that.
if a.Path != "" {
// If it's an absolute path, just use it.
if a.Path[0] == '/' {
return a.Path
}
atom.SourcePath = filepath.Join(h.path, relPath)
atoms = append(atoms, atom)
// Otherwise join it with the root path.
return filepath.Join(root, a.Path)
}
ds.Atoms = atoms
return []Dataset{ds}
// Join the name with the root path by default.
return filepath.Join(root, a.Name)
}
func (h *fileHandler) BackupJob(repo Repository, backup Backup, ds Dataset) jobs.Job {
func (h *fileHandler) BackupJob(rctx RuntimeContext, repo Repository, backup *Backup, ds *Dataset) jobs.Job {
// Build the list of filesystem paths to pass to the
// Repository.Backup method.
var paths []string
for _, a := range ds.Atoms {
paths = append(paths, atomPath(a, h.path))
}
cmd := repo.BackupCmd(backup, ds, paths)
// Now pass those paths to the Backup method.
return jobs.JobFunc(func(ctx context.Context) error {
return repo.Backup(ctx, backup, ds, h.path /* UNUSED */)
return rctx.Shell().Run(ctx, cmd)
})
}
func (h *fileHandler) DatasetsForRestore(ds Dataset) []Dataset {
return []Dataset{ds}
}
func (h *fileHandler) RestoreJob(rctx RuntimeContext, repo Repository, backup *Backup, ds *Dataset, target string) jobs.Job {
// Build the list of filesystem paths to pass to the
// Repository.Backup method.
var paths []string
for _, a := range ds.Atoms {
paths = append(paths, atomPath(a, h.path))
}
func (h *fileHandler) RestoreJob(repo Repository, backup Backup, ds Dataset, target string) jobs.Job {
// Call the repo Restore method.
return jobs.JobFunc(func(ctx context.Context) error {
return repo.Restore(ctx, backup, ds, target)
cmd, err := repo.RestoreCmd(ctx, rctx, backup, ds, paths, target)
if err != nil {
return err
}
return rctx.Shell().Run(ctx, cmd)
})
}
......@@ -4,9 +4,8 @@ import (
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"git.autistici.org/ai3/tools/tabacco/jobs"
)
......@@ -16,119 +15,74 @@ import (
// distinguish multiple atoms inside it.
type pipeHandler struct {
backupCmd, restoreCmd string
shell *Shell
spec HandlerSpec
}
func newPipeHandler(spec HandlerSpec, shell *Shell) (Handler, error) {
backupCmd, ok := spec.Params["backup_command"].(string)
if !ok || backupCmd == "" {
return nil, errors.New("missing backup_command")
func newPipeHandler(name string, params Params) (Handler, error) {
backupCmd := params.Get("backup_command")
if backupCmd == "" {
return nil, errors.New("backup_command not set")
}
restoreCmd, ok := spec.Params["restore_command"].(string)
if !ok || restoreCmd == "" {
return nil, errors.New("missing restore_command")
restoreCmd := params.Get("restore_command")
if restoreCmd == "" {
return nil, errors.New("restore_command not set")
}
return &pipeHandler{
spec: spec,
backupCmd: backupCmd,
restoreCmd: restoreCmd,
shell: shell,
}, nil
}
func (h *pipeHandler) Spec() HandlerSpec {
return h.spec
}
func (h *pipeHandler) DatasetsForBackup(ds Dataset) []Dataset {
var dsl []Dataset
for _, atom := range ds.Atoms {
atom.SourcePath = filepath.Join(ds.Name, atom.Name)
dsl = append(dsl, makeSingleAtomDataset(ds, atom))
}
return dsl
}
func (h *pipeHandler) BackupJob(repo Repository, backup Backup, ds Dataset) jobs.Job {
if len(ds.Atoms) > 1 {
panic("more than 1 atom in pipe source")
}
func (h *pipeHandler) BackupJob(rctx RuntimeContext, repo Repository, backup *Backup, ds *Dataset) jobs.Job {
cmd := fmt.Sprintf(
"(%s) | %s",
expandVars(h.backupCmd, backup, ds),
repo.BackupStreamCmd(backup, ds),
)
return jobs.JobFunc(func(ctx context.Context) error {
return h.backupAtom(ctx, repo, backup, ds, ds.Atoms[0])
return rctx.Shell().Run(ctx, cmd)
})
}
func (h *pipeHandler) backupAtom(ctx context.Context, repo Repository, backup Backup, ds Dataset, atom Atom) error {
return h.shell.RunStdoutPipe(
ctx,
expandVars(h.backupCmd, backup, ds, atom),
func(stdout io.Reader) error {
return repo.BackupStream(ctx, backup, singleAtomDataset(ds, atom), stdout)
},
)
}
func (h *pipeHandler) DatasetsForRestore(ds Dataset) []Dataset {
var dsl []Dataset
for _, atom := range ds.Atoms {
dsl = append(dsl, makeSingleAtomDataset(ds, atom))
}
return dsl
}
func (h *pipeHandler) RestoreJob(repo Repository, backup Backup, ds Dataset, target string) jobs.Job {
var restoreJobs []jobs.Job
for _, atom := range ds.Atoms {
func(atom Atom) {
restoreJobs = append(restoreJobs, jobs.JobFunc(func(ctx context.Context) error {
return h.restoreAtom(ctx, repo, backup, ds, atom, target)
}))
}(atom)
}
return jobs.AsyncGroup(restoreJobs)
}
func (h *pipeHandler) restoreAtom(ctx context.Context, repo Repository, backup Backup, ds Dataset, atom Atom, target string) error {
return h.shell.RunStdinPipe(
ctx,
expandVars(h.restoreCmd, backup, ds, atom),
func(stdin io.Writer) error {
return repo.RestoreStream(ctx, backup, singleAtomDataset(ds, atom), target, stdin)
},
)
}
func singleAtomDataset(ds Dataset, atom Atom) Dataset {
return Dataset{
Name: ds.Name,
Handler: ds.Handler,
Atoms: []Atom{atom},
}
func (h *pipeHandler) RestoreJob(rctx RuntimeContext, repo Repository, backup *Backup, ds *Dataset, target string) jobs.Job {
return jobs.JobFunc(func(ctx context.Context) error {
restoreCmd, err := repo.RestoreStreamCmd(ctx, rctx, backup, ds, getWorkDir(ctx))
if err != nil {
return err
}
cmd := fmt.Sprintf(
"%s | (%s)",
restoreCmd,
expandVars(h.restoreCmd, backup, ds),
)
return rctx.Shell().Run(ctx, cmd)
})
}
func expandVars(s string, backup Backup, ds Dataset, atom Atom) string {
func expandVars(s string, backup *Backup, ds *Dataset) string {
return os.Expand(s, func(key string) string {
switch key {
case "$":
return key
case "backup.id":
return backup.ID
case "ds.name":
return ds.Name
case "atom.name":
return atom.Name
case "atom.path":
return filepath.Join(ds.Name, atom.Name)
case "atom.names":
names := make([]string, 0, len(ds.Atoms))
for _, a := range ds.Atoms {
names = append(names, a.Name)
}
return strings.Join(names, " ")
case "atom.paths":
paths := make([]string, 0, len(ds.Atoms))
for _, a := range