diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 01f6048b964b3755c2e0662285a1ef1101cc7e8c..dafc98a611d8c72e1af64232a32ac2ec558b9bb6 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -12,6 +12,7 @@ test: image: "ai/test:go" script: - "./install_restic_for_tests.sh" + - "apt-get install -y liblz4-tool" - "go-test-runner ." except: - master diff --git a/agent.go b/agent.go index 533b21ef54545eb5fd7b0eb76f71febfefdfff69..a121fa9dbd5923fbecdb8fed4536a03eb72c8886 100644 --- a/agent.go +++ b/agent.go @@ -35,7 +35,8 @@ func NewAgent(ctx context.Context, configMgr *ConfigManager, ms MetadataStore) ( case <-stopCh: return case <-notifyCh: - schedule, err := makeSchedule(ctx, mgr, configMgr.getSourceSpecs(), configMgr.getSeed()) + config := configMgr.current() + schedule, err := makeSchedule(ctx, mgr, config.SourceSpecs(), config.Seed()) if err != nil { log.Printf("error updating scheduler: %v", err) } @@ -63,7 +64,7 @@ func (a *Agent) Close() { // Create a new jobs.Schedule that will trigger a separate backup for // each configured data source that includes a 'schedule' attribute. -func makeSchedule(ctx context.Context, m Manager, sourceSpecs []SourceSpec, hostSeed int64) (*jobs.Schedule, error) { +func makeSchedule(ctx context.Context, m Manager, sourceSpecs []*SourceSpec, hostSeed int64) (*jobs.Schedule, error) { sched := jobs.NewSchedule(ctx, hostSeed) merr := new(util.MultiError) var good int @@ -72,9 +73,9 @@ func makeSchedule(ctx context.Context, m Manager, sourceSpecs []SourceSpec, host continue } // Bind spec to a new closure. - err := func(spec SourceSpec) error { + err := func(spec *SourceSpec) error { return sched.Add(spec.Name, spec.Schedule, func() jobs.Job { - _, j, err := m.BackupJob(ctx, []SourceSpec{spec}) + _, j, err := m.BackupJob(ctx, spec) if err != nil { log.Printf("%s: can't create backup job: %v", spec.Name, err) } diff --git a/agent_test.go b/agent_test.go index 2ef2c1650ce1e73b52d9f460b1eafb72123182c2..f579f4f903b8b33e4f19aa26b4aa7d1b78036a55 100644 --- a/agent_test.go +++ b/agent_test.go @@ -9,19 +9,19 @@ import ( type fakeManager struct{} -func (m *fakeManager) BackupJob(context.Context, []SourceSpec) (Backup, jobs.Job, error) { - return Backup{}, nil, nil +func (m *fakeManager) BackupJob(context.Context, *SourceSpec) (*Backup, jobs.Job, error) { + return &Backup{}, nil, nil } -func (m *fakeManager) Backup(context.Context, []SourceSpec) (Backup, error) { - return Backup{}, nil +func (m *fakeManager) Backup(context.Context, *SourceSpec) (*Backup, error) { + return &Backup{}, nil } -func (m *fakeManager) RestoreJob(context.Context, FindRequest, string) (jobs.Job, error) { +func (m *fakeManager) RestoreJob(context.Context, *FindRequest, string) (jobs.Job, error) { return nil, nil } -func (m *fakeManager) Restore(context.Context, FindRequest, string) error { +func (m *fakeManager) Restore(context.Context, *FindRequest, string) error { return nil } @@ -34,27 +34,31 @@ func (m *fakeManager) GetStatus() ([]jobs.Status, []jobs.Status, []jobs.Status) } func TestMakeSchedule(t *testing.T) { - sourceSpecs := []SourceSpec{ - { - Name: "source1", + sourceSpecs := []*SourceSpec{ + &SourceSpec{ + Name: "source1/users", Handler: "file1", Schedule: "@random_every 1d", - Atoms: []Atom{ - { - Name: "user1", - RelativePath: "user1", - }, - { - Name: "user2", - RelativePath: "user2", + Datasets: []*DatasetSpec{ + &DatasetSpec{ + Atoms: []Atom{ + { + Name: "user1", + Path: "user1", + }, + { + Name: "user2", + Path: "user2", + }, + }, }, }, }, - { - Name: "source2", - Handler: "dbpipe", - Schedule: "35 3 * * *", - AtomsCommand: "echo user1 user1 ; echo user2 user2", + &SourceSpec{ + Name: "source2", + Handler: "dbpipe", + Schedule: "35 3 * * *", + DatasetsCommand: "echo user1 user1 ; echo user2 user2", }, } diff --git a/cmd/tabacco/query.go b/cmd/tabacco/query.go new file mode 100644 index 0000000000000000000000000000000000000000..30a1f290ca2aed5834b75b61900e8238940bf651 --- /dev/null +++ b/cmd/tabacco/query.go @@ -0,0 +1,89 @@ +package main + +import ( + "context" + "encoding/json" + "errors" + "flag" + "log" + "os" + "time" + + "github.com/google/subcommands" + + "git.autistici.org/ai3/tools/tabacco" + mdbc "git.autistici.org/ai3/tools/tabacco/metadb/client" +) + +var rpcTimeout = 120 * time.Second + +type queryCommand struct { + configPath string + host string + numVersions int +} + +func (c *queryCommand) Name() string { return "query" } +func (c *queryCommand) Synopsis() string { return "query the backup metadata database" } +func (c *queryCommand) Usage() string { + return `query [] + Query the backup metadata database. + +` +} + +func (c *queryCommand) SetFlags(f *flag.FlagSet) { + f.StringVar(&c.configPath, "config", "/etc/tabacco/agent.yml", "configuration `file`") + f.StringVar(&c.host, "host", "", "filter by host") + f.IntVar(&c.numVersions, "num-versions", 1, "return the most recent `N` versions") +} + +func (c *queryCommand) buildRequest(f *flag.FlagSet) (*tabacco.FindRequest, error) { + if f.NArg() != 1 { + return nil, errors.New("error: wrong number of arguments") + } + return &tabacco.FindRequest{ + Pattern: f.Arg(0), + Host: c.host, + NumVersions: c.numVersions, + }, nil +} + +func (c *queryCommand) Execute(ctx context.Context, f *flag.FlagSet, args ...interface{}) subcommands.ExitStatus { + req, err := c.buildRequest(f) + if err != nil { + log.Printf("error in request: %v", err) + return subcommands.ExitUsageError + } + + // Parse configuration and connect to the metadata store. + config, err := tabacco.ReadConfig(c.configPath) + if err != nil { + log.Printf("error reading config: %v", err) + return subcommands.ExitFailure + } + store, err := mdbc.New(config.MetadataStoreBackend) + if err != nil { + log.Printf("error in metadata client config: %v", err) + return subcommands.ExitFailure + } + + // Make the RPC. + rctx, cancel := context.WithTimeout(ctx, rpcTimeout) + defer cancel() + result, err := store.FindAtoms(rctx, req) + + if err != nil { + log.Printf("FindAtoms() error: %v", err) + return subcommands.ExitFailure + } + + data, _ := json.MarshalIndent(result, "", " ") + os.Stdout.Write(data) + + return subcommands.ExitSuccess +} + +func init() { + subcommands.Register(&queryCommand{}, "") +} diff --git a/cmd/tabacco/restore.go b/cmd/tabacco/restore.go index 88cdc653918c7f37d7b333da63dd90589148b8dc..efa5b5e6afc389549cb99009b1a21fc9ef375764 100644 --- a/cmd/tabacco/restore.go +++ b/cmd/tabacco/restore.go @@ -77,8 +77,8 @@ func (c *restoreCommand) buildRestoreJob(ctx context.Context, mgr tabacco.Manage return jobs.AsyncGroup(restoreJobs), nil } -func (c *restoreCommand) newFindRequest(s string) tabacco.FindRequest { - return tabacco.FindRequest{ +func (c *restoreCommand) newFindRequest(s string) *tabacco.FindRequest { + return &tabacco.FindRequest{ Pattern: s, } } diff --git a/config.go b/config.go index a31c475e6b7860d2815671b15878b92bcfc079cc..bcbbe7c1d59172c654c568814fedd6ee1b42375f 100644 --- a/config.go +++ b/config.go @@ -23,7 +23,7 @@ var defaultSeedFile = "/var/tmp/.tabacco_scheduler_seed" // holds it all together. type Config struct { Hostname string `yaml:"hostname"` - Queue jobs.QueueSpec `yaml:"queue_config"` + Queue *jobs.QueueSpec `yaml:"queue_config"` Repository RepositorySpec `yaml:"repository"` DryRun bool `yaml:"dry_run"` DefaultNiceLevel int `yaml:"default_nice_level"` @@ -32,43 +32,83 @@ type Config struct { RandomSeedFile string `yaml:"random_seed_file"` MetadataStoreBackend *clientutil.BackendConfig `yaml:"metadb"` - HandlerSpecs []HandlerSpec - SourceSpecs []SourceSpec + HandlerSpecs []*HandlerSpec + SourceSpecs []*SourceSpec } -type runtimeAssets struct { - handlerMap map[string]Handler - repo Repository - seed int64 - shell *Shell +// RuntimeContext provides access to runtime objects whose lifetime is +// ultimately tied to the configuration. Configuration can change +// during the lifetime of the process, but we want backup jobs to have +// a consistent view of the configuration while they execute, so +// access to the current version of the configuration is controlled to +// the ConfigManager. +type RuntimeContext interface { + Shell() *Shell + Repo() Repository + QueueSpec() *jobs.QueueSpec + Seed() int64 + WorkDir() string + SourceSpecs() []*SourceSpec + FindSource(string) *SourceSpec + HandlerSpec(string) *HandlerSpec + Close() } -func (a *runtimeAssets) Close() { +// The set of objects that are created from a Config and that the main +// code cares about. +type parsedConfig struct { + handlerMap map[string]*HandlerSpec + sourceSpecs []*SourceSpec + sourceSpecsByName map[string]*SourceSpec + queue *jobs.QueueSpec + + repo Repository + seed int64 + shell *Shell + workDir string +} + +func (a *parsedConfig) Close() { a.repo.Close() // nolint } -func buildHandlerMap(specs []HandlerSpec, shell *Shell) (map[string]Handler, error) { - m := make(map[string]Handler) - merr := new(util.MultiError) - for _, spec := range specs { - h, err := spec.Parse(shell) - if err != nil { - merr.Add(err) - continue - } - m[spec.Name] = h +func (a *parsedConfig) Shell() *Shell { return a.shell } +func (a *parsedConfig) Repo() Repository { return a.repo } +func (a *parsedConfig) QueueSpec() *jobs.QueueSpec { return a.queue } +func (a *parsedConfig) Seed() int64 { return a.seed } +func (a *parsedConfig) WorkDir() string { return a.workDir } +func (a *parsedConfig) SourceSpecs() []*SourceSpec { return a.sourceSpecs } + +func (a *parsedConfig) HandlerSpec(name string) *HandlerSpec { + return a.handlerMap[name] +} + +func (a *parsedConfig) FindSource(name string) *SourceSpec { + return a.sourceSpecsByName[name] +} + +func buildHandlerMap(specs []*HandlerSpec) map[string]*HandlerSpec { + // Create a handler map with a default 'file' spec. + m := map[string]*HandlerSpec{ + "file": &HandlerSpec{ + Name: "file", + Type: "file", + }, + } + for _, h := range specs { + m[h.Name] = h } - return m, merr.OrNil() + return m } -func (c *Config) parse() (*runtimeAssets, error) { +func (c *Config) parse() (*parsedConfig, error) { shell := NewShell(c.DryRun) shell.SetNiceLevel(c.DefaultNiceLevel) shell.SetIOClass(c.DefaultIOClass) // Parse the repository config. An error here is fatal, as we // don't have a way to operate without a repository. - repo, err := c.Repository.Parse(shell) + repo, err := c.Repository.Parse() if err != nil { return nil, err } @@ -76,21 +116,26 @@ func (c *Config) parse() (*runtimeAssets, error) { merr := new(util.MultiError) // Build the handlers. - handlerMap, err := buildHandlerMap(c.HandlerSpecs, shell) - if err != nil { - merr.Add(err) - } + handlerMap := buildHandlerMap(c.HandlerSpecs) // Validate the sources (Parse is called later at runtime). - var srcs []SourceSpec + // Sources that fail the check are removed from the + // SourceSpecs array. We also check that sources have unique + // names. + srcMap := make(map[string]*SourceSpec) + var srcs []*SourceSpec for _, spec := range c.SourceSpecs { if err := spec.Check(handlerMap); err != nil { - merr.Add(err) + merr.Add(fmt.Errorf("source %s: %v", spec.Name, err)) continue } + if _, ok := srcMap[spec.Name]; ok { + merr.Add(fmt.Errorf("duplicated source %s", spec.Name)) + continue + } + srcMap[spec.Name] = spec srcs = append(srcs, spec) } - c.SourceSpecs = srcs // Read (or create) the seed file. seedFile := defaultSeedFile @@ -99,28 +144,32 @@ func (c *Config) parse() (*runtimeAssets, error) { } seed := mustGetSeed(seedFile) - return &runtimeAssets{ - shell: shell, - repo: repo, - handlerMap: handlerMap, - seed: seed, + return &parsedConfig{ + handlerMap: handlerMap, + sourceSpecs: srcs, + sourceSpecsByName: srcMap, + queue: c.Queue, + shell: shell, + repo: repo, + seed: seed, + workDir: c.WorkDir, }, merr.OrNil() } // The following functions read YAML files from .d-style directories. To be nice // to the user, each file can contain either a single object or a list of // multiple objects. -func readHandlersFromDir(dir string) ([]HandlerSpec, error) { - var out []HandlerSpec +func readHandlersFromDir(dir string) ([]*HandlerSpec, error) { + var out []*HandlerSpec err := foreachYAMLFile(dir, func(path string) error { - var specs []HandlerSpec + var specs []*HandlerSpec log.Printf("reading handler: %s", path) if err := readYAMLFile(path, &specs); err != nil { var spec HandlerSpec if err := readYAMLFile(path, &spec); err != nil { return err } - specs = []HandlerSpec{spec} + specs = append(specs, &spec) } out = append(out, specs...) return nil @@ -128,17 +177,17 @@ func readHandlersFromDir(dir string) ([]HandlerSpec, error) { return out, err } -func readSourcesFromDir(dir string) ([]SourceSpec, error) { - var out []SourceSpec +func readSourcesFromDir(dir string) ([]*SourceSpec, error) { + var out []*SourceSpec err := foreachYAMLFile(dir, func(path string) error { - var specs []SourceSpec + var specs []*SourceSpec log.Printf("reading source: %s", path) if err := readYAMLFile(path, &specs); err != nil { var spec SourceSpec if err := readYAMLFile(path, &spec); err != nil { return err } - specs = []SourceSpec{spec} + specs = append(specs, &spec) } out = append(out, specs...) return nil @@ -215,6 +264,7 @@ func foreachYAMLFile(dir string, f func(string) error) error { merr := new(util.MultiError) for _, path := range files { if err := f(path); err != nil { + log.Printf("error loading yaml file %s: %v", path, err) merr.Add(err) } } @@ -228,8 +278,7 @@ func foreachYAMLFile(dir string, f func(string) error) error { // unregister). type ConfigManager struct { mx sync.Mutex - config *Config - assets *runtimeAssets + parsed *parsedConfig // Listeners are notified on every reload. notifyCh chan struct{} @@ -246,12 +295,14 @@ func NewConfigManager(config *Config) (*ConfigManager, error) { } go func() { for range m.notifyCh { + m.mx.Lock() for _, lch := range m.listeners { select { case lch <- struct{}{}: default: } } + m.mx.Unlock() } }() return m, nil @@ -260,22 +311,23 @@ func NewConfigManager(config *Config) (*ConfigManager, error) { // Reload the configuration (at least, the parts of it that can be // dynamically reloaded). func (m *ConfigManager) Reload(config *Config) error { - assets, err := config.parse() - if assets == nil { + parsed, err := config.parse() + if parsed == nil { return err + } else if err != nil { + log.Printf("warning: errors in configuration: %v", err) } // Update config and notify listeners (in a separate // goroutine, that does not hold the lock). m.mx.Lock() defer m.mx.Unlock() - if m.assets != nil { - m.assets.Close() // nolint + if m.parsed != nil { + m.parsed.Close() // nolint } - log.Printf("loaded new config: %d handlers, %d sources", len(assets.handlerMap), len(config.SourceSpecs)) - m.assets = assets - m.config = config + log.Printf("loaded new config: %d handlers, %d sources", len(parsed.handlerMap), len(parsed.sourceSpecs)) + m.parsed = parsed m.notifyCh <- struct{}{} return nil } @@ -284,8 +336,8 @@ func (m *ConfigManager) Reload(config *Config) error { func (m *ConfigManager) Close() { m.mx.Lock() close(m.notifyCh) - if m.assets != nil { - m.assets.Close() + if m.parsed != nil { + m.parsed.Close() } m.mx.Unlock() } @@ -303,47 +355,16 @@ func (m *ConfigManager) Notify() <-chan struct{} { return ch } -func (m *ConfigManager) getHandler(name string) (Handler, bool) { - m.mx.Lock() - defer m.mx.Unlock() - h, ok := m.assets.handlerMap[name] - return h, ok -} - -func (m *ConfigManager) getRepository() Repository { - m.mx.Lock() - defer m.mx.Unlock() - return m.assets.repo -} - -func (m *ConfigManager) getQueueSpec() jobs.QueueSpec { - m.mx.Lock() - defer m.mx.Unlock() - return m.config.Queue -} - -func (m *ConfigManager) getSourceSpecs() []SourceSpec { - m.mx.Lock() - defer m.mx.Unlock() - return m.config.SourceSpecs -} - -func (m *ConfigManager) getSeed() int64 { - m.mx.Lock() - defer m.mx.Unlock() - return m.assets.seed -} - -func (m *ConfigManager) getShell() *Shell { - m.mx.Lock() - defer m.mx.Unlock() - return m.assets.shell +// NewRuntimeContext returns a new RuntimeContext, capturing current +// configuration and runtime assets. +func (m *ConfigManager) NewRuntimeContext() RuntimeContext { + return m.current() } -func (m *ConfigManager) getWorkDir() string { +func (m *ConfigManager) current() *parsedConfig { m.mx.Lock() defer m.mx.Unlock() - return m.config.WorkDir + return m.parsed } func mustGetSeed(path string) int64 { diff --git a/config_test.go b/config_test.go index dcf00e5eeed5c33ab4dfdf1e77a772cb0e16d126..80933e9224068198796f9f5e7e6484687e21b7e1 100644 --- a/config_test.go +++ b/config_test.go @@ -4,9 +4,10 @@ import ( "context" "errors" "fmt" + "log" "os" - "strings" "testing" + "time" ) func TestReadConfig(t *testing.T) { @@ -27,6 +28,7 @@ func TestConfigManager(t *testing.T) { if err != nil { t.Fatal("ReadConfig()", err) } + log.Printf("loaded %d sources", len(conf.SourceSpecs)) mgr, err := NewConfigManager(conf) if err != nil { t.Fatal("NewConfigManager()", err) @@ -34,13 +36,18 @@ func TestConfigManager(t *testing.T) { defer mgr.Close() // Test one of the accessor methods. - if s := mgr.getSourceSpecs(); len(s) != 1 { - t.Fatalf("getSourceSpecs() bad result: %+v", s) + if s := mgr.current().SourceSpecs(); len(s) != 1 { + t.Fatalf("current().SourceSpecs() bad result: %+v", s) } // Test the Notify() mechanism by checking that it triggers // right away when setting up a new listener. - <-mgr.Notify() + tmr := time.NewTimer(1 * time.Second) + select { + case <-mgr.Notify(): + case <-tmr.C: + t.Fatal("Notify() channel did not trigger") + } } func TestRandomSeed(t *testing.T) { @@ -61,7 +68,7 @@ func TestConfig_Parse(t *testing.T) { type testdata struct { config *Config expectedOK bool - checkFn func(*runtimeAssets, []Dataset) error + checkFn func([]*Dataset) error } tdd := []testdata{ // The following tests cover a few ways to generate @@ -69,24 +76,27 @@ func TestConfig_Parse(t *testing.T) { // the README. { &Config{ - SourceSpecs: []SourceSpec{ - { - Name: "users/account1", - Handler: "file", - Atoms: []Atom{ - {RelativePath: "/data/account1"}, - }, - }, - { - Name: "users/account2", - Handler: "file", - Atoms: []Atom{ - {RelativePath: "/data/account2"}, + SourceSpecs: []*SourceSpec{ + &SourceSpec{ + Name: "users", + Handler: "file", + Schedule: "@random_every 24h", + Datasets: []*DatasetSpec{ + &DatasetSpec{ + Atoms: []Atom{ + {Name: "account1"}, + }, + }, + { + Atoms: []Atom{ + {Name: "account2"}, + }, + }, }, }, }, - HandlerSpecs: []HandlerSpec{ - { + HandlerSpecs: []*HandlerSpec{ + &HandlerSpec{ Name: "file", Type: "file", Params: map[string]interface{}{"path": "/"}, @@ -98,18 +108,23 @@ func TestConfig_Parse(t *testing.T) { }, { &Config{ - SourceSpecs: []SourceSpec{ - { - Name: "users", - Handler: "file", - Atoms: []Atom{ - {Name: "account1"}, - {Name: "account2"}, + SourceSpecs: []*SourceSpec{ + &SourceSpec{ + Name: "users", + Handler: "file", + Schedule: "@random_every 24h", + Datasets: []*DatasetSpec{ + &DatasetSpec{ + Atoms: []Atom{ + {Name: "account1"}, + {Name: "account2"}, + }, + }, }, }, }, - HandlerSpecs: []HandlerSpec{ - { + HandlerSpecs: []*HandlerSpec{ + &HandlerSpec{ Name: "file", Type: "file", Params: map[string]interface{}{"path": "/data"}, @@ -121,15 +136,16 @@ func TestConfig_Parse(t *testing.T) { }, { &Config{ - SourceSpecs: []SourceSpec{ - { - Name: "users", - Handler: "file", - AtomsCommand: "echo account1; echo account2", + SourceSpecs: []*SourceSpec{ + &SourceSpec{ + Name: "users", + Handler: "file", + Schedule: "@random_every 24h", + DatasetsCommand: "echo '[{atoms: [{name: account1}, {name: account2}]}]'", }, }, - HandlerSpecs: []HandlerSpec{ - { + HandlerSpecs: []*HandlerSpec{ + &HandlerSpec{ Name: "file", Type: "file", Params: map[string]interface{}{"path": "/data"}, @@ -150,53 +166,51 @@ func TestConfig_Parse(t *testing.T) { "password": "hello", } - ra, err := td.config.parse() + parsed, err := td.config.parse() if err != nil && td.expectedOK { t.Errorf("unexpected error for config %+v: %v", td.config, err) } else if err == nil && !td.expectedOK { t.Errorf("missing error for config %+v", td.config) } else { - datasets, err := parseAllSources(ra, td.config.SourceSpecs) + datasets, err := parseAllSources(parsed.SourceSpecs()) if err != nil { t.Errorf("failed to parse sources %+v: %v", td.config.SourceSpecs, err) } if td.checkFn != nil { - if err := td.checkFn(ra, datasets); err != nil { + if err := td.checkFn(datasets); err != nil { t.Errorf("check failed for config %+v: %v", td.config, err) } } } - if ra != nil { - ra.Close() + if parsed != nil { + parsed.Close() } } } -func parseAllSources(ra *runtimeAssets, specs []SourceSpec) ([]Dataset, error) { - var out []Dataset +func parseAllSources(specs []*SourceSpec) ([]*Dataset, error) { + var out []*Dataset for _, spec := range specs { ds, err := spec.Parse(context.Background()) if err != nil { return nil, err } - dsb := ra.handlerMap[ds.Handler].DatasetsForBackup(ds) - out = append(out, dsb...) + out = append(out, ds...) } return out, nil } -func checkTwoUserAccountsAtoms(ra *runtimeAssets, datasets []Dataset) error { +func checkTwoUserAccountsAtoms(datasets []*Dataset) error { var numAtoms int for _, ds := range datasets { - if ds.Name == "" { - return errors.New("empty dataset name") - } - if ds.Handler != "file" { - return fmt.Errorf("expected handler 'file', got '%s'", ds.Handler) + if ds.ID == "" { + return errors.New("empty dataset ID") } for _, atom := range ds.Atoms { - if !strings.HasPrefix(atom.SourcePath, "/data/") { - return fmt.Errorf("bad atom source path: %s", atom.SourcePath) + switch atom.Name { + case "account1", "account2": + default: + return fmt.Errorf("bad atom name: %s", atom.Name) } numAtoms++ } diff --git a/handler_file.go b/handler_file.go index c82abb3e2c3756da7bf387bffee6d040b4f65af5..1da94871b15d636e464fb2a65f2575708881113f 100644 --- a/handler_file.go +++ b/handler_file.go @@ -2,7 +2,6 @@ package tabacco import ( "context" - "errors" "path/filepath" "git.autistici.org/ai3/tools/tabacco/jobs" @@ -10,48 +9,56 @@ import ( type fileHandler struct { path string - spec HandlerSpec } -func newFileHandler(spec HandlerSpec, _ *Shell) (Handler, error) { - path, ok := spec.Params["path"].(string) - if !ok || path == "" { - return nil, errors.New("missing path") - } - return &fileHandler{path: path, spec: spec}, nil -} - -func (h *fileHandler) Spec() HandlerSpec { - return h.spec +func newFileHandler(name string, params Params) (Handler, error) { + return &fileHandler{path: params.Get("path")}, nil } -func (h *fileHandler) DatasetsForBackup(ds Dataset) []Dataset { - // Set SourcePath on all atoms. - var atoms []Atom - for _, atom := range ds.Atoms { - relPath := atom.RelativePath - if relPath == "" { - relPath = atom.Name +// Convert the atom to a path. +func atomPath(a Atom, root string) string { + // If the atom has a path, use that. + if a.Path != "" { + // If it's an absolute path, just use it. + if a.Path[0] == '/' { + return a.Path } - atom.SourcePath = filepath.Join(h.path, relPath) - atoms = append(atoms, atom) + // Otherwise join it with the root path. + return filepath.Join(root, a.Path) } - ds.Atoms = atoms - return []Dataset{ds} + // Join the name with the root path by default. + return filepath.Join(root, a.Name) } -func (h *fileHandler) BackupJob(repo Repository, backup Backup, ds Dataset) jobs.Job { +func (h *fileHandler) BackupJob(rctx RuntimeContext, backup *Backup, ds *Dataset) jobs.Job { + // Build the list of filesystem paths to pass to the + // Repository.Backup method. + var paths []string + for _, a := range ds.Atoms { + paths = append(paths, atomPath(a, h.path)) + } + cmd := rctx.Repo().BackupCmd(backup, ds, paths) + + // Now pass those paths to the Backup method. return jobs.JobFunc(func(ctx context.Context) error { - return repo.Backup(ctx, backup, ds, h.path /* UNUSED */) + return rctx.Shell().Run(ctx, cmd) }) } -func (h *fileHandler) DatasetsForRestore(ds Dataset) []Dataset { - return []Dataset{ds} -} +func (h *fileHandler) RestoreJob(rctx RuntimeContext, backup *Backup, ds *Dataset, target string) jobs.Job { + // Build the list of filesystem paths to pass to the + // Repository.Backup method. + var paths []string + for _, a := range ds.Atoms { + paths = append(paths, atomPath(a, h.path)) + } -func (h *fileHandler) RestoreJob(repo Repository, backup Backup, ds Dataset, target string) jobs.Job { + // Call the repo Restore method. return jobs.JobFunc(func(ctx context.Context) error { - return repo.Restore(ctx, backup, ds, target) + cmd, err := rctx.Repo().RestoreCmd(ctx, rctx, backup, ds, paths, target) + if err != nil { + return err + } + return rctx.Shell().Run(ctx, cmd) }) } diff --git a/handler_pipe.go b/handler_pipe.go index cb40e5230f7f91f937ca0e73000f49c6fdc1e065..8e06e23fc8aa5e8b663e42ea7cf55b987e630124 100644 --- a/handler_pipe.go +++ b/handler_pipe.go @@ -4,9 +4,8 @@ import ( "context" "errors" "fmt" - "io" "os" - "path/filepath" + "strings" "git.autistici.org/ai3/tools/tabacco/jobs" ) @@ -15,120 +14,115 @@ import ( // generates a single file on the repository, and thus it can't // distinguish multiple atoms inside it. type pipeHandler struct { - backupCmd, restoreCmd string - shell *Shell - spec HandlerSpec + backupCmd string + restoreCmd string + compress bool + compressCmd string + decompressCmd string } -func newPipeHandler(spec HandlerSpec, shell *Shell) (Handler, error) { - backupCmd, ok := spec.Params["backup_command"].(string) - if !ok || backupCmd == "" { - return nil, errors.New("missing backup_command") - } - restoreCmd, ok := spec.Params["restore_command"].(string) - if !ok || restoreCmd == "" { - return nil, errors.New("missing restore_command") - } - return &pipeHandler{ - spec: spec, - backupCmd: backupCmd, - restoreCmd: restoreCmd, - shell: shell, - }, nil -} +const ( + defaultCompress = false + defaultCompressCmd = "lz4c -3z - -" + defaultDecompressCmd = "lz4c -d - -" +) -func (h *pipeHandler) Spec() HandlerSpec { - return h.spec -} +func newPipeHandler(name string, params Params) (Handler, error) { + backupCmd := params.Get("backup_command") + if backupCmd == "" { + return nil, errors.New("backup_command not set") + } -func (h *pipeHandler) DatasetsForBackup(ds Dataset) []Dataset { - var dsl []Dataset - for _, atom := range ds.Atoms { - atom.SourcePath = filepath.Join(ds.Name, atom.Name) - dsl = append(dsl, makeSingleAtomDataset(ds, atom)) + restoreCmd := params.Get("restore_command") + if restoreCmd == "" { + return nil, errors.New("restore_command not set") } - return dsl -} -func (h *pipeHandler) BackupJob(repo Repository, backup Backup, ds Dataset) jobs.Job { - if len(ds.Atoms) > 1 { - panic("more than 1 atom in pipe source") + // Create the pipeHandler with defaults, which can be + // overriden from Params. + h := &pipeHandler{ + backupCmd: backupCmd, + restoreCmd: restoreCmd, + compress: defaultCompress, + compressCmd: defaultCompressCmd, + decompressCmd: defaultDecompressCmd, + } + if b, ok := params.GetBool("compress"); ok { + h.compress = b + } + if s := params.Get("compress_command"); s != "" { + h.compressCmd = s + } + if s := params.Get("decompress_command"); s != "" { + h.decompressCmd = s } - return jobs.JobFunc(func(ctx context.Context) error { - return h.backupAtom(ctx, repo, backup, ds, ds.Atoms[0]) - }) + return h, nil } -func (h *pipeHandler) backupAtom(ctx context.Context, repo Repository, backup Backup, ds Dataset, atom Atom) error { - return h.shell.RunStdoutPipe( - ctx, - expandVars(h.backupCmd, backup, ds, atom), - func(stdout io.Reader) error { - return repo.BackupStream(ctx, backup, singleAtomDataset(ds, atom), stdout) - }, +func (h *pipeHandler) BackupJob(rctx RuntimeContext, backup *Backup, ds *Dataset) jobs.Job { + cmd := fmt.Sprintf( + "(%s)%s | %s", + expandVars(h.backupCmd, backup, ds), + h.compressSuffix(), + rctx.Repo().BackupStreamCmd(backup, ds), ) + return jobs.JobFunc(func(ctx context.Context) error { + return rctx.Shell().Run(ctx, cmd) + }) } -func (h *pipeHandler) DatasetsForRestore(ds Dataset) []Dataset { - var dsl []Dataset - for _, atom := range ds.Atoms { - dsl = append(dsl, makeSingleAtomDataset(ds, atom)) - } - return dsl +func (h *pipeHandler) RestoreJob(rctx RuntimeContext, backup *Backup, ds *Dataset, target string) jobs.Job { + return jobs.JobFunc(func(ctx context.Context) error { + restoreCmd, err := rctx.Repo().RestoreStreamCmd(ctx, rctx, backup, ds, getWorkDir(ctx)) + if err != nil { + return err + } + cmd := fmt.Sprintf( + "%s | %s(%s)", + restoreCmd, + h.decompressPrefix(), + expandVars(h.restoreCmd, backup, ds), + ) + return rctx.Shell().Run(ctx, cmd) + }) } -func (h *pipeHandler) RestoreJob(repo Repository, backup Backup, ds Dataset, target string) jobs.Job { - var restoreJobs []jobs.Job - for _, atom := range ds.Atoms { - func(atom Atom) { - restoreJobs = append(restoreJobs, jobs.JobFunc(func(ctx context.Context) error { - return h.restoreAtom(ctx, repo, backup, ds, atom, target) - })) - }(atom) +func (h *pipeHandler) compressSuffix() string { + if !h.compress { + return "" } - return jobs.AsyncGroup(restoreJobs) + return fmt.Sprintf(" | %s", h.compressCmd) } -func (h *pipeHandler) restoreAtom(ctx context.Context, repo Repository, backup Backup, ds Dataset, atom Atom, target string) error { - return h.shell.RunStdinPipe( - ctx, - expandVars(h.restoreCmd, backup, ds, atom), - func(stdin io.Writer) error { - return repo.RestoreStream(ctx, backup, singleAtomDataset(ds, atom), target, stdin) - }, - ) -} - -func singleAtomDataset(ds Dataset, atom Atom) Dataset { - return Dataset{ - Name: ds.Name, - Handler: ds.Handler, - Atoms: []Atom{atom}, +func (h *pipeHandler) decompressPrefix() string { + if !h.compress { + return "" } + return fmt.Sprintf("%s | ", h.decompressCmd) } -func expandVars(s string, backup Backup, ds Dataset, atom Atom) string { +func expandVars(s string, backup *Backup, ds *Dataset) string { return os.Expand(s, func(key string) string { switch key { + case "$": + return key case "backup.id": return backup.ID - case "ds.name": - return ds.Name - case "atom.name": - return atom.Name - case "atom.path": - return filepath.Join(ds.Name, atom.Name) + case "atom.names": + names := make([]string, 0, len(ds.Atoms)) + for _, a := range ds.Atoms { + names = append(names, a.Name) + } + return strings.Join(names, " ") + case "atom.paths": + paths := make([]string, 0, len(ds.Atoms)) + for _, a := range ds.Atoms { + paths = append(paths, a.Path) + } + return strings.Join(paths, " ") default: return os.Getenv(key) } }) } - -func makeSingleAtomDataset(ds Dataset, atom Atom) Dataset { - return Dataset{ - Name: fmt.Sprintf("/%s.%s", ds.Name, atom.Name), - Handler: ds.Handler, - Atoms: []Atom{atom}, - } -} diff --git a/handlers.go b/handlers.go index c0039c6f0da0b990c42b48d5ece2036867ee77c0..afc6958ad0aa8ffa7a26c6d26a16465d2394abf9 100644 --- a/handlers.go +++ b/handlers.go @@ -13,25 +13,30 @@ type HandlerSpec struct { // Handler type, one of the known types. Type string `yaml:"type"` - Params map[string]interface{} `yaml:"params"` - - PreBackupCommand string `yaml:"pre_backup_command"` - PostBackupCommand string `yaml:"post_backup_command"` - PreRestoreCommand string `yaml:"pre_restore_command"` - PostRestoreCommand string `yaml:"post_restore_command"` + Params Params `yaml:"params"` } // Parse a HandlerSpec and return a Handler instance. -func (spec *HandlerSpec) Parse(shell *Shell) (Handler, error) { +func (spec *HandlerSpec) Parse(src *SourceSpec) (Handler, error) { if spec.Name == "" { - return nil, errors.New("name is empty") + return nil, errors.New("name is not set") + } + + // Merge parameters from the handler spec and the source, with + // preference to the latter. + params := make(map[string]interface{}) + for k, v := range spec.Params { + params[k] = v + } + for k, v := range src.Params { + params[k] = v } switch spec.Type { case "file": - return newFileHandler(*spec, shell) + return newFileHandler(spec.Name, params) case "pipe": - return newPipeHandler(*spec, shell) + return newPipeHandler(spec.Name, params) default: return nil, fmt.Errorf("%s: unknown handler type '%s'", spec.Name, spec.Type) } diff --git a/jobs/job.go b/jobs/job.go index c82529296936586bf6a09158ad929453435a8f5f..42571ee782ba7b2b0f0ecb956790dfff8c73ecab 100644 --- a/jobs/job.go +++ b/jobs/job.go @@ -191,7 +191,7 @@ type QueueSpec struct { // NewQueueManager returns a new QueueManager with the provided // configuration. -func NewQueueManager(spec QueueSpec) *QueueManager { +func NewQueueManager(spec *QueueSpec) *QueueManager { q := make(map[string]chan struct{}) for name, n := range spec.Workers { q[name] = make(chan struct{}, n) diff --git a/manager.go b/manager.go index 7f97410a1c19520aaad6461fa49182e9c497560f..7d89dc24b38a733728d57d999e0331a1903f33de 100644 --- a/manager.go +++ b/manager.go @@ -2,6 +2,7 @@ package tabacco import ( "context" + "errors" "fmt" "log" "os" @@ -26,7 +27,7 @@ type tabaccoManager struct { func NewManager(ctx context.Context, configMgr *ConfigManager, ms MetadataStore) (Manager, error) { // If we can't create a workdirManager, it probably means we // don't have permissions to the WorkDir, which is bad. - wm, err := newWorkdirManager(configMgr.getWorkDir()) + wm, err := newWorkdirManager(configMgr.current().WorkDir()) if err != nil { return nil, err } @@ -34,7 +35,7 @@ func NewManager(ctx context.Context, configMgr *ConfigManager, ms MetadataStore) // Note: the queue configuration won't be reloaded. return &tabaccoManager{ ExclusiveLockManager: jobs.NewExclusiveLockManager(), - QueueManager: jobs.NewQueueManager(configMgr.getQueueSpec()), + QueueManager: jobs.NewQueueManager(configMgr.current().QueueSpec()), StateManager: jobs.NewStateManager(), workdirManager: wm, @@ -46,27 +47,29 @@ func NewManager(ctx context.Context, configMgr *ConfigManager, ms MetadataStore) // Close the Manager and free all associated resources (those owned by // this object). func (m *tabaccoManager) Close() error { + m.workdirManager.Close() + return nil } type metadataJob struct { jobs.Job ms MetadataStore - backup Backup - ds Dataset + backup *Backup + ds *Dataset } func (j *metadataJob) RunContext(ctx context.Context) error { err := j.Job.RunContext(ctx) if err == nil { if merr := j.ms.AddDataset(ctx, j.backup, j.ds); merr != nil { - log.Printf("%s: error saving metadata: %v", j.ds.Name, merr) + log.Printf("%s@%s: error saving metadata: %v", j.ds.Source, j.ds.ID, merr) } } return err } -func (m *tabaccoManager) withMetadata(j jobs.Job, backup Backup, ds Dataset) jobs.Job { +func (m *tabaccoManager) withMetadata(j jobs.Job, backup *Backup, ds *Dataset) jobs.Job { return &metadataJob{ Job: j, ms: m.ms, @@ -78,28 +81,46 @@ func (m *tabaccoManager) withMetadata(j jobs.Job, backup Backup, ds Dataset) job // Prepare the repository for a new backup. This is a synchronous // operation: we need to wait for it to complete to avoid running the // backup tasks too soon. -func (m *tabaccoManager) prepareBackupJob(backup Backup) jobs.Job { +func (m *tabaccoManager) prepareBackupJob(rctx RuntimeContext, backup *Backup) jobs.Job { return jobs.JobFunc(func(ctx context.Context) error { - repo := m.configMgr.getRepository() - if err := repo.Init(ctx); err != nil { - log.Printf("repository init failed: %v", err) - return err - } - log.Printf("preparing backup %s", backup.ID) - return repo.Prepare(ctx, backup) + return rctx.Repo().Init(ctx, rctx) + //log.Printf("preparing backup %s", backup.ID) + //return repo.Prepare(ctx, backup) }) } -func (m *tabaccoManager) backupDatasetJob(h Handler, backup Backup, ds Dataset) jobs.Job { +func (m *tabaccoManager) wrapWithCommands(rctx RuntimeContext, initJob, backupJob jobs.Job, pre, post string) jobs.Job { var out []jobs.Job + if initJob != nil { + out = append(out, initJob) + } + if pre != "" { + out = append(out, m.commandJob(rctx, pre)) + } + out = append(out, backupJob) + if post != "" { + out = append(out, m.commandJob(rctx, post)) + } - // Let Handlers modify the Dataset if necessary, or generate - // more than one. - dsl := h.DatasetsForBackup(ds) + if len(out) == 1 { + return out[0] + } + return jobs.SyncGroup(out) +} - // Run pre_backup_command. - if cmd := h.Spec().PreBackupCommand; cmd != "" { - out = append(out, m.datasetCommandJob(cmd, backup, ds)) +func (m *tabaccoManager) makeBackupJob(ctx context.Context, rctx RuntimeContext, backup *Backup, src *SourceSpec) (jobs.Job, error) { + // Compile the source and the associated Handler. + dsl, err := src.Parse(ctx) + if err != nil { + return nil, err + } + hspec := rctx.HandlerSpec(src.Handler) + if hspec == nil { + return nil, fmt.Errorf("unknown handler '%s'", src.Handler) + } + h, err := hspec.Parse(src) + if err != nil { + return nil, err } // The actual backup operation. Assemble all the backup jobs @@ -107,22 +128,21 @@ func (m *tabaccoManager) backupDatasetJob(h Handler, backup Backup, ds Dataset) // // TODO: get the timeout from the SourceSpec. var backupJobs []jobs.Job - for _, realDS := range dsl { + for _, ds := range dsl { backupJobs = append(backupJobs, m.withMetadata( - h.BackupJob(m.configMgr.getRepository(), backup, realDS), + h.BackupJob(rctx, backup, ds), backup, - realDS, + ds, )) } - out = append(out, jobs.WithTimeout( - jobs.AsyncGroup(backupJobs), - 24*time.Hour, - )) - - // Run post_backup_command. - if cmd := h.Spec().PostBackupCommand; cmd != "" { - out = append(out, m.datasetCommandJob(cmd, backup, ds)) - } + + backupJob := m.wrapWithCommands( + rctx, + m.prepareBackupJob(rctx, backup), + jobs.WithTimeout(jobs.AsyncGroup(backupJobs), src.Timeout), + src.PreBackupCommand, + src.PostBackupCommand, + ) // Group the jobs (sequentially) if there's more than one of // them. Give the final job a status and a user-visible name, @@ -132,14 +152,14 @@ func (m *tabaccoManager) backupDatasetJob(h Handler, backup Backup, ds Dataset) // in the 'backup' queue for concurrency limiting. // // Oh, and here is where we add per-dataset instrumentation. - id := fmt.Sprintf("backup-dataset-%s", ds.Name) + id := fmt.Sprintf("backup-source-%s", src.Name) return m.WithQueue( m.WithStatus( m.WithExclusiveLock( m.withWorkDir( withInstrumentation( - jobs.SyncGroup(out), - ds.Name, + backupJob, + src.Name, ), ), id, @@ -147,51 +167,23 @@ func (m *tabaccoManager) backupDatasetJob(h Handler, backup Backup, ds Dataset) id, ), "backup", - ) + ), nil } -// BackupJob returns a single Job that backs up one or more sources to -// the configured destination repository. -func (m *tabaccoManager) BackupJob(ctx context.Context, sourceSpecs []SourceSpec) (Backup, jobs.Job, error) { - // Parse the source specs and obtain Datasets. Errors here are - // logged but *not* fatal, unless there are errors and the - // list of non-erroring sources is nil. - - backup := newBackup("") - prepJob := m.prepareBackupJob(backup) - var backupJobs []jobs.Job - - merr := new(util.MultiError) - for _, spec := range sourceSpecs { - h, ok := m.configMgr.getHandler(spec.Handler) - if !ok { - return Backup{}, nil, fmt.Errorf("inconsistency: no '%s' handler", spec.Handler) - } - - ds, err := spec.Parse(ctx) - if err != nil { - merr.Add(err) - continue - } +func (m *tabaccoManager) BackupJob(ctx context.Context, src *SourceSpec) (*Backup, jobs.Job, error) { + // Create a new Backup. + b := newBackup("") - // Create the backup job and add it to our list. - backupJobs = append(backupJobs, m.backupDatasetJob(h, backup, ds)) - } + // Create a RuntimeContext. + rctx := m.configMgr.NewRuntimeContext() - // Run the job to initialize the repository before anything else. - j := m.WithStatus( - jobs.SyncGroup([]jobs.Job{ - prepJob, - jobs.AsyncGroup(backupJobs), - }), - fmt.Sprintf("backup-%s", backup.ID), - ) - return backup, j, nil + j, err := m.makeBackupJob(ctx, rctx, b, src) + return b, j, err } // Backup just runs the BackupJob synchronously. -func (m *tabaccoManager) Backup(ctx context.Context, sourceSpecs []SourceSpec) (Backup, error) { - backup, job, err := m.BackupJob(ctx, sourceSpecs) +func (m *tabaccoManager) Backup(ctx context.Context, src *SourceSpec) (*Backup, error) { + backup, job, err := m.BackupJob(ctx, src) if err != nil { return backup, err } @@ -199,29 +191,35 @@ func (m *tabaccoManager) Backup(ctx context.Context, sourceSpecs []SourceSpec) ( return backup, err } -func (m *tabaccoManager) restoreDatasetJob(h Handler, backup Backup, ds Dataset, target string) jobs.Job { - var out []jobs.Job - - dsl := h.DatasetsForRestore(ds) - - // Run pre_restore_command. - if cmd := h.Spec().PreRestoreCommand; cmd != "" { - out = append(out, m.datasetCommandJob(cmd, backup, ds)) +func (m *tabaccoManager) makeRestoreJob(rctx RuntimeContext, backup *Backup, src *SourceSpec, dsl []*Dataset, target string) (jobs.Job, error) { + // Just need the Handler. + hspec := rctx.HandlerSpec(src.Handler) + if hspec == nil { + return nil, fmt.Errorf("unknown handler '%s'", src.Handler) + } + h, err := hspec.Parse(src) + if err != nil { + return nil, err } // The actual backup operation. Just a thin wrapper around // doBackupDataset() that binds together the context, backup, // ds and target via the closure. var restoreJobs []jobs.Job - for _, realDS := range dsl { - restoreJobs = append(restoreJobs, h.RestoreJob(m.configMgr.getRepository(), backup, realDS, target)) + for _, ds := range dsl { + restoreJobs = append( + restoreJobs, + h.RestoreJob(rctx, backup, ds, target), + ) } - out = append(out, jobs.AsyncGroup(restoreJobs)) - // Run post_restore_command. - if cmd := h.Spec().PostRestoreCommand; cmd != "" { - out = append(out, m.datasetCommandJob(cmd, backup, ds)) - } + restoreJob := m.wrapWithCommands( + rctx, + nil, + jobs.AsyncGroup(restoreJobs), + src.PreRestoreCommand, + src.PostRestoreCommand, + ) // Group the jobs (sequentially) if there's more than one of // them. Give the final job a status and a user-visible name, @@ -229,45 +227,71 @@ func (m *tabaccoManager) restoreDatasetJob(h Handler, backup Backup, ds Dataset, // leave-running policy, so no more than one restore per // datasource can run at any given time. Finally, the job runs // in the 'restore' queue for concurrency limiting. - id := fmt.Sprintf("restore_%s", ds.Name) + id := fmt.Sprintf("restore-source-%s", src.Name) return m.WithQueue( m.WithStatus( - m.WithExclusiveLock(jobs.SyncGroup(out), id, false), + m.WithExclusiveLock( + restoreJob, + id, + false), id, ), "restore", - ) + ), nil +} + +func groupDatasetsBySource(dsl []*Dataset) map[string][]*Dataset { + m := make(map[string][]*Dataset) + for _, ds := range dsl { + m[ds.Source] = append(m[ds.Source], ds) + } + return m } // RestoreJob creates a job that restores the results of the // FindRequest (with NumVersions=1) onto the given target directory. -func (m *tabaccoManager) RestoreJob(ctx context.Context, req FindRequest, target string) (jobs.Job, error) { - // Find the atoms relevant to this restore. +func (m *tabaccoManager) RestoreJob(ctx context.Context, req *FindRequest, target string) (jobs.Job, error) { + // Find the atoms relevant to this restore. The results will + // be grouped in Backups and Datasets that only include the + // relevant Atoms. req.NumVersions = 1 - versions, err := m.ms.FindAtoms(ctx, req) + backups, err := m.ms.FindAtoms(ctx, req) if err != nil { return nil, err } + if len(backups) == 0 { + return nil, errors.New("no results found for query") + } + + // Create a RuntimeContext. + rctx := m.configMgr.NewRuntimeContext() var restoreJobs []jobs.Job - for _, vv := range versions { - ds := vv[0].Dataset - backup := vv[0].Backup - - h, ok := m.configMgr.getHandler(ds.Handler) - if !ok { - log.Printf("%s: unknown handler '%s'", ds.Name, ds.Handler) - continue + merr := new(util.MultiError) + for _, b := range backups { + // Group the datasets by source, find the source and create the restore jobs. + for srcName, dsl := range groupDatasetsBySource(b.Datasets) { + + src := rctx.FindSource(srcName) + if src == nil { + merr.Add(fmt.Errorf("unknown source '%s'", srcName)) + continue + } + + j, err := m.makeRestoreJob(rctx, b, src, dsl, target) + if err != nil { + merr.Add(fmt.Errorf("source %s: %v", srcName, err)) + continue + } + restoreJobs = append(restoreJobs, j) } - - restoreJobs = append(restoreJobs, m.restoreDatasetJob(h, backup, ds, target)) } - return m.WithStatus(jobs.AsyncGroup(restoreJobs), fmt.Sprintf("restore_%s", util.RandomID())), nil + return m.WithStatus(jobs.AsyncGroup(restoreJobs), fmt.Sprintf("restore_%s", util.RandomID())), merr.OrNil() } // Restore just runs the RestoreJob synchronously. -func (m *tabaccoManager) Restore(ctx context.Context, req FindRequest, target string) error { +func (m *tabaccoManager) Restore(ctx context.Context, req *FindRequest, target string) error { job, err := m.RestoreJob(ctx, req, target) if err != nil { return err @@ -277,23 +301,19 @@ func (m *tabaccoManager) Restore(ctx context.Context, req FindRequest, target st // Create a new Backup object with its own unique ID (which actually // consists of 16 random bytes, hex-encoded). -func newBackup(host string) Backup { +func newBackup(host string) *Backup { if host == "" { host, _ = os.Hostname() // nolint } - return Backup{ + return &Backup{ ID: util.RandomID(), Host: host, Timestamp: time.Now(), } } -func (m *tabaccoManager) datasetCommandJob(cmd string, backup Backup, ds Dataset) jobs.Job { - env := map[string]string{ - "BACKUP_ID": backup.ID, - "DATASET_NAME": ds.Name, - } +func (m *tabaccoManager) commandJob(rctx RuntimeContext, cmd string) jobs.Job { return jobs.JobFunc(func(ctx context.Context) error { - return m.configMgr.getShell().RunWithEnv(ctx, cmd, env) + return rctx.Shell().Run(ctx, cmd) }) } diff --git a/manager_test.go b/manager_test.go index 7593fb11be6ea4a6f83d1002918dbfa23fbc4693..8e4ac8bad9f86739d0381ee56e456f5825c5c08c 100644 --- a/manager_test.go +++ b/manager_test.go @@ -2,7 +2,9 @@ package tabacco import ( "context" - "fmt" + "log" + "path/filepath" + "sync" "testing" "time" @@ -12,16 +14,18 @@ import ( type dummyMetadataEntry struct { backupID string backupTS time.Time - name string - dsName string + dsID string host string - handler string + source string + path string atom Atom } -func (e dummyMetadataEntry) match(req FindRequest) bool { - if req.Pattern != "" && !req.matchPattern(e.name) { - return false +func (e dummyMetadataEntry) match(req *FindRequest) bool { + if req.Pattern != "" { + if !req.matchPattern(e.path) { + return false + } } if req.Host != "" && req.Host != e.host { return false @@ -29,15 +33,15 @@ func (e dummyMetadataEntry) match(req FindRequest) bool { return true } -func (e dummyMetadataEntry) toDataset() Dataset { - return Dataset{ - Name: e.dsName, - Handler: e.handler, +func (e dummyMetadataEntry) toDataset() *Dataset { + return &Dataset{ + ID: e.dsID, + Source: e.source, } } -func (e dummyMetadataEntry) toBackup() Backup { - return Backup{ +func (e dummyMetadataEntry) toBackup() *Backup { + return &Backup{ ID: e.backupID, Timestamp: e.backupTS, Host: e.host, @@ -45,60 +49,104 @@ func (e dummyMetadataEntry) toBackup() Backup { } type dummyMetadataStore struct { + mx sync.Mutex log []dummyMetadataEntry } -func (d *dummyMetadataStore) FindAtoms(_ context.Context, req FindRequest) ([][]Version, error) { - tmp := make(map[string]map[string][]dummyMetadataEntry) - for _, l := range d.log { - if !l.match(req) { - continue +// Argh! This is copy&pasted from server/service.go, but with minor +// modifications due to the different types... terrible. +func keepNumVersions(dbAtoms []dummyMetadataEntry, numVersions int) []dummyMetadataEntry { + // numVersions == 0 is remapped to 1. + if numVersions < 1 { + numVersions = 1 + } + + count := 0 + tmp := make(map[string][]dummyMetadataEntry) + for _, a := range dbAtoms { + l := tmp[a.path] + if len(l) < numVersions { + l = append(l, a) + count++ + } + tmp[a.path] = l + } + out := make([]dummyMetadataEntry, 0, count) + for _, l := range tmp { + out = append(out, l...) + } + return out +} + +func groupByBackup(dbAtoms []dummyMetadataEntry) []*Backup { + // As we scan through dbAtoms, aggregate into Backups and Datasets. + backups := make(map[string]*Backup) + dsm := make(map[string]map[string]*Dataset) + + for _, atom := range dbAtoms { + // Create the Backup object if it does not exist. + b, ok := backups[atom.backupID] + if !ok { + b = atom.toBackup() + backups[atom.backupID] = b } - m, ok := tmp[l.name] + // Create the Dataset object for this Backup in the + // two-level map (creating the intermediate map if + // necessary). + tmp, ok := dsm[atom.backupID] if !ok { - m = make(map[string][]dummyMetadataEntry) - tmp[l.name] = m + tmp = make(map[string]*Dataset) + dsm[atom.backupID] = tmp } - m[l.backupID] = append(m[l.name], l) - } - - count := req.NumVersions - if count < 1 { - count = 1 - } - - var out [][]Version - for _, dsmap := range tmp { - var dsVersions []Version - for _, dslog := range dsmap { - v := Version{ - Dataset: dslog[0].toDataset(), - Backup: dslog[0].toBackup(), - } - if len(dslog) > count { - dslog = dslog[len(dslog)-count:] - } - for _, l := range dslog { - v.Dataset.Atoms = append(v.Dataset.Atoms, l.atom) - } - dsVersions = append(dsVersions, v) + // Match datasets by their unique ID. + ds, ok := tmp[atom.dsID] + if !ok { + ds = atom.toDataset() + tmp[atom.dsID] = ds + b.Datasets = append(b.Datasets, ds) } - out = append(out, dsVersions) + + // Finally, add the atom to the dataset. + ds.Atoms = append(ds.Atoms, atom.atom) + } + + out := make([]*Backup, 0, len(backups)) + for _, b := range backups { + out = append(out, b) } - return out, nil + return out } -func (d *dummyMetadataStore) AddDataset(_ context.Context, backup Backup, ds Dataset) error { +func (d *dummyMetadataStore) FindAtoms(_ context.Context, req *FindRequest) ([]*Backup, error) { + d.mx.Lock() + defer d.mx.Unlock() + + var tmp []dummyMetadataEntry + for _, l := range d.log { + if !l.match(req) { + continue + } + tmp = append(tmp, l) + } + + return groupByBackup(keepNumVersions(tmp, req.NumVersions)), nil +} + +func (d *dummyMetadataStore) AddDataset(_ context.Context, backup *Backup, ds *Dataset) error { + d.mx.Lock() + defer d.mx.Unlock() + + log.Printf("AddDataset: %+v", *ds) for _, atom := range ds.Atoms { - name := fmt.Sprintf("%s/%s", ds.Name, atom.Name) + path := filepath.Join(ds.Source, atom.Name) d.log = append(d.log, dummyMetadataEntry{ backupID: backup.ID, backupTS: backup.Timestamp, host: backup.Host, - name: name, - dsName: ds.Name, - handler: ds.Handler, + path: path, + dsID: ds.ID, + source: ds.Source, atom: atom, }) } @@ -115,46 +163,46 @@ func TestManager_Backup(t *testing.T) { "password": "testpass", }, } - handlerSpecs := []HandlerSpec{ - { + handlerSpecs := []*HandlerSpec{ + &HandlerSpec{ Name: "file1", Type: "file", Params: map[string]interface{}{ "path": "/source/of/file1", }, - PreBackupCommand: "echo hello", + //PreBackupCommand: "echo hello", }, - { + &HandlerSpec{ Name: "dbpipe", Type: "pipe", Params: map[string]interface{}{ - "backup_command": "echo ${backup.id} ${ds.name} ${atom.name}", + "backup_command": "echo ${backup.id} ${ds.name} ${atom.names}", "restore_command": "cat", }, }, } - sourceSpecs := []SourceSpec{ - { - Name: "source1", - Handler: "file1", - Atoms: []Atom{ - { - Name: "user1", - RelativePath: "user1", + sourceSpecs := []*SourceSpec{ + &SourceSpec{ + Name: "source1", + Handler: "file1", + Schedule: "@random_every 1h", + Datasets: []*DatasetSpec{ + &DatasetSpec{ + Atoms: []Atom{{Name: "user1"}}, }, - { - Name: "user2", - RelativePath: "user2", + &DatasetSpec{ + Atoms: []Atom{{Name: "user2"}}, }, }, }, - { - Name: "source2", - Handler: "dbpipe", - AtomsCommand: "echo user1 user1 ; echo user2 user2", + &SourceSpec{ + Name: "source2", + Handler: "dbpipe", + Schedule: "@random_every 1h", + DatasetsCommand: "echo '[{name: users, atoms: [{name: user1}, {name: user2}]}]'", }, } - queueSpec := jobs.QueueSpec{ + queueSpec := &jobs.QueueSpec{ Workers: map[string]int{"backup": 2}, } @@ -177,30 +225,35 @@ func TestManager_Backup(t *testing.T) { } defer m.Close() - backup, err := m.Backup(context.TODO(), configMgr.getSourceSpecs()) - if err != nil { - t.Fatal(err) - } - if backup.ID == "" || backup.Host == "" { - t.Fatalf("empty fields in backup: %+v", backup) + for _, src := range configMgr.current().SourceSpecs() { + backup, err := m.Backup(context.TODO(), src) + if err != nil { + t.Fatal(err) + } + if backup.ID == "" || backup.Host == "" { + t.Fatalf("empty fields in backup: %+v", backup) + } } // Try to find atoms in the metadata store. // Let's try with a pattern first. - resp, err := store.FindAtoms(context.TODO(), FindRequest{Pattern: "source1/*", NumVersions: 1}) + resp, err := store.FindAtoms(context.TODO(), &FindRequest{Pattern: "source1/*", NumVersions: 1}) if err != nil { t.Fatal("FindAtoms", err) } - if len(resp) != 2 { - t.Fatalf("bad response: %+v", resp) + if len(resp) != 1 { + t.Fatalf("bad FindAtoms(source1/*) response: %+v", resp) + } + if l := len(resp[0].Datasets); l != 2 { + t.Fatalf("bad number of datasets returned by FindAtoms(source1/*): got %d, expected 2", l) } // A pattern matching a single atom. - resp, err = store.FindAtoms(context.TODO(), FindRequest{Pattern: "source1/user2"}) + resp, err = store.FindAtoms(context.TODO(), &FindRequest{Pattern: "source1/user2"}) if err != nil { t.Fatal("FindAtoms", err) } if len(resp) != 1 { - t.Fatalf("bad response: %+v", resp) + t.Fatalf("bad FindAtoms(source1/user2) response: %+v", resp) } } diff --git a/metadb/client/client.go b/metadb/client/client.go index ace690310d85b7a83392dd049d8852a44b1475d0..2fd61d59f2c1e76473364c8db0eb6ca99b0d9e19 100644 --- a/metadb/client/client.go +++ b/metadb/client/client.go @@ -11,6 +11,7 @@ type metadbClient struct { backend clientutil.Backend } +// New creates a new client for a remote MetadataStore. func New(config *clientutil.BackendConfig) (tabacco.MetadataStore, error) { be, err := clientutil.NewBackend(config) if err != nil { @@ -20,11 +21,16 @@ func New(config *clientutil.BackendConfig) (tabacco.MetadataStore, error) { } type addDatasetRequest struct { - Backup tabacco.Backup `json:"backup"` - Dataset tabacco.Dataset `json:"dataset"` + Backup *tabacco.Backup `json:"backup"` + Dataset *tabacco.Dataset `json:"dataset"` } -func (c *metadbClient) AddDataset(ctx context.Context, backup tabacco.Backup, ds tabacco.Dataset) error { +func (c *metadbClient) AddDataset(ctx context.Context, backup *tabacco.Backup, ds *tabacco.Dataset) error { + // Ensure that the backup has no Datasets + if len(backup.Datasets) > 0 { + panic("AddDataset client called with non-empty backup.Datasets") + } + req := addDatasetRequest{ Backup: backup, Dataset: ds, @@ -32,8 +38,8 @@ func (c *metadbClient) AddDataset(ctx context.Context, backup tabacco.Backup, ds return c.backend.Call(ctx, "", "/api/add_dataset", &req, nil) } -func (c *metadbClient) FindAtoms(ctx context.Context, req tabacco.FindRequest) ([][]tabacco.Version, error) { - var resp [][]tabacco.Version - err := c.backend.Call(ctx, "", "/api/find_atoms", &req, &resp) +func (c *metadbClient) FindAtoms(ctx context.Context, req *tabacco.FindRequest) ([]*tabacco.Backup, error) { + var resp []*tabacco.Backup + err := c.backend.Call(ctx, "", "/api/find_atoms", req, &resp) return resp, err } diff --git a/metadb/migrations/1_initialize_schema.down.sql b/metadb/migrations/1_initialize_schema.down.sql index 2242f002e30cffc1b20a4d0b5833bad1e36d5556..698b5d08cd7d26bc1e56204bdbf4b40c4a5b68f0 100644 --- a/metadb/migrations/1_initialize_schema.down.sql +++ b/metadb/migrations/1_initialize_schema.down.sql @@ -1,5 +1,5 @@ -DROP INDEX idx_log_backup_id_and_dataset_name; +DROP INDEX idx_log_backup_id_and_dataset_id; DROP INDEX idx_log_backup_id; DROP INDEX idx_log_primary; DROP TABLE log; diff --git a/metadb/migrations/1_initialize_schema.up.sql b/metadb/migrations/1_initialize_schema.up.sql index fbf4d6e924bdcd1d7ca54ae33b3aee59cc12da71..5677d0f9dabdb39baf2217a4149c8228eafa6358 100644 --- a/metadb/migrations/1_initialize_schema.up.sql +++ b/metadb/migrations/1_initialize_schema.up.sql @@ -6,14 +6,13 @@ CREATE TABLE log ( backup_id VARCHAR(128), backup_timestamp DATETIME, backup_host VARCHAR(128), - dataset_name VARCHAR(128), - dataset_handler VARCHAR(128), + dataset_id VARCHAR(128), + dataset_source VARCHAR(128), atom_name VARCHAR(255), - atom_path VARCHAR(255), - atom_source_path TEXT, - atom_relative_path TEXT + atom_full_path VARCHAR(255), + atom_path VARCHAR(255) ); -CREATE UNIQUE INDEX idx_log_primary ON log (backup_id, dataset_name, atom_path); +CREATE UNIQUE INDEX idx_log_primary ON log (backup_id, dataset_id, atom_name); CREATE INDEX idx_log_backup_id ON log (backup_id); -CREATE INDEX idx_log_backup_id_and_dataset_name ON log (backup_id, dataset_name); +CREATE INDEX idx_log_backup_id_and_dataset_id ON log (backup_id, dataset_id); diff --git a/metadb/migrations/bindata.go b/metadb/migrations/bindata.go index dde27ea68c9efdf597558264ffb6f124229f97ea..45ce2f5fdb60bdd7116319141af6802edc792656 100644 --- a/metadb/migrations/bindata.go +++ b/metadb/migrations/bindata.go @@ -46,7 +46,7 @@ func (fi bindataFileInfo) Sys() interface{} { } var __1_initialize_schemaDownSql = []byte(` -DROP INDEX idx_log_backup_id_and_dataset_name; +DROP INDEX idx_log_backup_id_and_dataset_id; DROP INDEX idx_log_backup_id; DROP INDEX idx_log_primary; DROP TABLE log; @@ -63,7 +63,7 @@ func _1_initialize_schemaDownSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "1_initialize_schema.down.sql", size: 123, mode: os.FileMode(436), modTime: time.Unix(1532974389, 0)} + info := bindataFileInfo{name: "1_initialize_schema.down.sql", size: 121, mode: os.FileMode(420), modTime: time.Unix(1560930730, 0)} a := &asset{bytes: bytes, info: info} return a, nil } @@ -76,17 +76,16 @@ CREATE TABLE log ( backup_id VARCHAR(128), backup_timestamp DATETIME, backup_host VARCHAR(128), - dataset_name VARCHAR(128), - dataset_handler VARCHAR(128), + dataset_id VARCHAR(128), + dataset_source VARCHAR(128), atom_name VARCHAR(255), - atom_path VARCHAR(255), - atom_source_path TEXT, - atom_relative_path TEXT + atom_full_path VARCHAR(255), + atom_path VARCHAR(255) ); -CREATE UNIQUE INDEX idx_log_primary ON log (backup_id, dataset_name, atom_path); +CREATE UNIQUE INDEX idx_log_primary ON log (backup_id, dataset_id, atom_name); CREATE INDEX idx_log_backup_id ON log (backup_id); -CREATE INDEX idx_log_backup_id_and_dataset_name ON log (backup_id, dataset_name); +CREATE INDEX idx_log_backup_id_and_dataset_id ON log (backup_id, dataset_id); `) func _1_initialize_schemaUpSqlBytes() ([]byte, error) { @@ -99,7 +98,7 @@ func _1_initialize_schemaUpSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "1_initialize_schema.up.sql", size: 604, mode: os.FileMode(436), modTime: time.Unix(1532985344, 0)} + info := bindataFileInfo{name: "1_initialize_schema.up.sql", size: 568, mode: os.FileMode(420), modTime: time.Unix(1560930732, 0)} a := &asset{bytes: bytes, info: info} return a, nil } diff --git a/metadb/server/http.go b/metadb/server/http.go index 01e2c7caa55418f5b96f27bb0dff3ce8d1426061..beae5e926c82bcce2e7edb723004dbcf53cbf0bc 100644 --- a/metadb/server/http.go +++ b/metadb/server/http.go @@ -38,7 +38,7 @@ func (s *httpServer) handleFindAtoms(w http.ResponseWriter, r *http.Request) { return } - resp, err := s.FindAtoms(r.Context(), req) + resp, err := s.FindAtoms(r.Context(), &req) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) log.Printf("FindAtoms(%+v) error: %v", req, err) diff --git a/metadb/server/service.go b/metadb/server/service.go index 11a6fad7bb32a9a772318210b04b9dc6c41ee607..30b53cd0e5f21721fec5df1e34605eddda285138 100644 --- a/metadb/server/service.go +++ b/metadb/server/service.go @@ -14,28 +14,34 @@ import ( // An atom, as represented in the database, denormalized. type dbAtom struct { - BackupID string - BackupTimestamp time.Time - BackupHost string - DatasetName string - DatasetHandler string - AtomName string - AtomPath string - AtomSourcePath string - AtomRelativePath string + BackupID string + BackupTimestamp time.Time + BackupHost string + DatasetID string + DatasetSource string + AtomName string + AtomFullPath string + AtomPath string } func makeAtoms(backup tabacco.Backup, ds tabacco.Dataset) []dbAtom { var out []dbAtom for _, atom := range ds.Atoms { + + // It is here that we 'materialize' the concept of Atom names + // as paths, by concatenating source/dataset/atom and storing + // it as the atom name. + path := filepath.Join(ds.Source, atom.Name) + out = append(out, dbAtom{ BackupID: backup.ID, BackupTimestamp: backup.Timestamp, BackupHost: backup.Host, - DatasetName: ds.Name, - DatasetHandler: ds.Handler, + DatasetID: ds.ID, + DatasetSource: ds.Source, AtomName: atom.Name, - AtomPath: filepath.Join(ds.Name, atom.Name), + AtomPath: atom.Path, + AtomFullPath: path, }) } return out @@ -51,73 +57,76 @@ func (a *dbAtom) getBackup() *tabacco.Backup { func (a *dbAtom) getDataset() *tabacco.Dataset { return &tabacco.Dataset{ - Name: a.DatasetName, - Handler: a.DatasetHandler, + ID: a.DatasetID, + Source: a.DatasetSource, } } func (a *dbAtom) getAtom() tabacco.Atom { return tabacco.Atom{ - Name: a.AtomName, - RelativePath: a.AtomRelativePath, - SourcePath: a.AtomSourcePath, + Name: a.AtomName, + Path: a.AtomPath, } } -func normalizeAtoms(dbAtoms []dbAtom, numVersions int) [][]tabacco.Version { +func keepNumVersions(dbAtoms []*dbAtom, numVersions int) []*dbAtom { // numVersions == 0 is remapped to 1. if numVersions < 1 { numVersions = 1 } - // Accumulate versions keyed by backup ID first, dataset name - // next. Preserve the ordering of backups in dbAtoms, which we - // are going to use later to apply a per-dataset limit. - backupMap := make(map[string]*tabacco.Backup) - dsMap := make(map[string]map[string]*tabacco.Dataset) - var backupsInOrder []string + count := 0 + tmp := make(map[string][]*dbAtom) + for _, a := range dbAtoms { + l := tmp[a.AtomFullPath] + if len(l) < numVersions { + l = append(l, a) + count++ + } + tmp[a.AtomFullPath] = l + } + out := make([]*dbAtom, 0, count) + for _, l := range tmp { + out = append(out, l...) + } + return out +} + +func groupByBackup(dbAtoms []*dbAtom) []*tabacco.Backup { + // As we scan through dbAtoms, aggregate into Backups and Datasets. + backups := make(map[string]*tabacco.Backup) + dsm := make(map[string]map[string]*tabacco.Dataset) for _, atom := range dbAtoms { // Create the Backup object if it does not exist. - if _, ok := backupMap[atom.BackupID]; !ok { - backupMap[atom.BackupID] = atom.getBackup() - backupsInOrder = append(backupsInOrder, atom.BackupID) + b, ok := backups[atom.BackupID] + if !ok { + b = atom.getBackup() + backups[atom.BackupID] = b } // Create the Dataset object for this Backup in the // two-level map (creating the intermediate map if // necessary). - tmp, ok := dsMap[atom.BackupID] + tmp, ok := dsm[atom.BackupID] if !ok { tmp = make(map[string]*tabacco.Dataset) - dsMap[atom.BackupID] = tmp + dsm[atom.BackupID] = tmp } - ds, ok := tmp[atom.DatasetName] + ds, ok := tmp[atom.DatasetID] if !ok { ds = atom.getDataset() - tmp[atom.DatasetName] = ds + tmp[atom.DatasetID] = ds + b.Datasets = append(b.Datasets, ds) } // Finally, add the atom to the dataset. ds.Atoms = append(ds.Atoms, atom.getAtom()) } - // Now dump the maps to a Version array. - var out [][]tabacco.Version - dsCount := make(map[string]int) - for _, backupID := range backupsInOrder { - tmp := dsMap[backupID] - backup := backupMap[backupID] - var tmpv []tabacco.Version - for _, ds := range tmp { - if dsCount[ds.Name] < numVersions { - tmpv = append(tmpv, tabacco.Version{Backup: *backup, Dataset: *ds}) - dsCount[ds.Name]++ - } - } - if len(tmpv) > 0 { - out = append(out, tmpv) - } + out := make([]*tabacco.Backup, 0, len(backups)) + for _, b := range backups { + out = append(out, b) } return out } @@ -152,10 +161,10 @@ var statements = map[string]string{ "insert_atom": ` INSERT INTO log ( backup_id, backup_timestamp, backup_host, - dataset_name, dataset_handler, - atom_name, atom_path, atom_source_path, atom_relative_path + dataset_id, dataset_source, + atom_name, atom_path, atom_full_path ) VALUES ( - ?, ?, ?, ?, ?, ?, ?, ?, ? + ?, ?, ?, ?, ?, ?, ?, ? ) `, } @@ -174,12 +183,11 @@ func (s *Service) AddDataset(ctx context.Context, backup tabacco.Backup, ds taba dbAtom.BackupID, dbAtom.BackupTimestamp, dbAtom.BackupHost, - dbAtom.DatasetName, - dbAtom.DatasetHandler, + dbAtom.DatasetID, + dbAtom.DatasetSource, dbAtom.AtomName, dbAtom.AtomPath, - dbAtom.AtomSourcePath, - dbAtom.AtomRelativePath, + dbAtom.AtomFullPath, ); err != nil { return err } @@ -191,7 +199,7 @@ func (s *Service) AddDataset(ctx context.Context, backup tabacco.Backup, ds taba // FindAtoms searches for atoms meeting a particular criteria and // returns them grouped by backup and dataset (the atoms will be // contained within the dataset). -func (s *Service) FindAtoms(ctx context.Context, req tabacco.FindRequest) ([][]tabacco.Version, error) { +func (s *Service) FindAtoms(ctx context.Context, req *tabacco.FindRequest) ([]*tabacco.Backup, error) { tx, err := s.db.BeginTx(ctx, nil) if err != nil { return nil, err @@ -207,16 +215,16 @@ func (s *Service) FindAtoms(ctx context.Context, req tabacco.FindRequest) ([][]t args = append(args, req.Host) } if req.Pattern != "" { - where = append(where, "atom_path LIKE ?") + where = append(where, "atom_name LIKE ?") args = append(args, strings.Replace(req.Pattern, "*", "%", -1)) } // Build the final query and execute it. q := fmt.Sprintf( - `SELECT + `SELECT backup_id, backup_timestamp, backup_host, - dataset_name, dataset_handler, - atom_name, atom_path, atom_source_path, atom_relative_path + dataset_id, dataset_source, + atom_name, atom_path FROM log WHERE %s ORDER BY backup_timestamp DESC`, strings.Join(where, " AND "), @@ -227,22 +235,22 @@ func (s *Service) FindAtoms(ctx context.Context, req tabacco.FindRequest) ([][]t } defer rows.Close() // nolint - var atoms []dbAtom + var atoms []*dbAtom for rows.Next() { var a dbAtom if err := rows.Scan( &a.BackupID, &a.BackupTimestamp, &a.BackupHost, - &a.DatasetName, &a.DatasetHandler, - &a.AtomName, &a.AtomPath, &a.AtomSourcePath, &a.AtomRelativePath, + &a.DatasetID, &a.DatasetSource, + &a.AtomName, &a.AtomPath, ); err != nil { log.Printf("bad row: %v", err) continue } - atoms = append(atoms, a) + atoms = append(atoms, &a) } if err := rows.Err(); err != nil { return nil, err } - return normalizeAtoms(atoms, req.NumVersions), nil + return groupByBackup(keepNumVersions(atoms, req.NumVersions)), nil } diff --git a/metadb/server/service_test.go b/metadb/server/service_test.go index 2d3ddbd5e02f25a55a19bf8edf36674976a1763e..27766c9c2d9c27cd0853ef193235d1e17172127f 100644 --- a/metadb/server/service_test.go +++ b/metadb/server/service_test.go @@ -21,16 +21,15 @@ func addTestEntry(t *testing.T, svc *Service, backupID, host, dsName string) { Timestamp: time.Now(), }, tabacco.Dataset{ - Name: dsName, - Handler: "file", + Source: "file", Atoms: []tabacco.Atom{ { - Name: "sub1", - SourcePath: "/path/dataset1/sub1", + Name: dsName + "/sub1", + Path: "/path/dataset1/sub1", }, { - Name: "sub2", - SourcePath: "/path/dataset1/sub2", + Name: dsName + "/sub2", + Path: "/path/dataset1/sub2", }, }, }, @@ -48,7 +47,7 @@ func TestService_AddDataset(t *testing.T) { } defer svc.Close() - addTestEntry(t, svc, "1234", "host1", "dataset1") + addTestEntry(t, svc, "1234", "host1", "file/dataset1") } func TestService_FindAtoms(t *testing.T) { @@ -61,14 +60,14 @@ func TestService_FindAtoms(t *testing.T) { // Create 10 fake backups, which differ only in host. for i := 0; i < 10; i++ { - addTestEntry(t, svc, fmt.Sprintf("backup%06d", i), fmt.Sprintf("host%d", i), "dataset1") + addTestEntry(t, svc, fmt.Sprintf("backup%06d", i), fmt.Sprintf("host%d", i), "file/dataset1") } // Searching for a specific atom (common to all backups) // should return exactly 10 results. vv, err := svc.FindAtoms( context.Background(), - tabacco.FindRequest{ + &tabacco.FindRequest{ Pattern: "*/sub1", NumVersions: 10, }, @@ -84,7 +83,7 @@ func TestService_FindAtoms(t *testing.T) { // return a single result. vv, err = svc.FindAtoms( context.Background(), - tabacco.FindRequest{ + &tabacco.FindRequest{ Pattern: "*/sub1", Host: "host7", }, diff --git a/repository.go b/repository.go index cbf97c1cbe4cffe97c3fd2af90ec941c21e789cb..f88366ff376a919cf52e4c8356269a15fa7acbcd 100644 --- a/repository.go +++ b/repository.go @@ -13,14 +13,14 @@ type RepositorySpec struct { } // Parse a RepositorySpec and return a Repository instance. -func (spec *RepositorySpec) Parse(shell *Shell) (Repository, error) { +func (spec *RepositorySpec) Parse() (Repository, error) { if spec.Name == "" { return nil, errors.New("name is empty") } switch spec.Type { case "restic": - return newResticRepository(spec.Params, shell) + return newResticRepository(spec.Params) default: return nil, fmt.Errorf("unknown repository type '%s'", spec.Type) diff --git a/repository_restic.go b/repository_restic.go index 2570f5b40b62936724888e2a7bf6b9fd22ef888f..0e7122012f8dc61d35dbbf2fad3232475a7e3b50 100644 --- a/repository_restic.go +++ b/repository_restic.go @@ -7,11 +7,13 @@ import ( "fmt" "io" "io/ioutil" + "log" "os" "os/exec" "path/filepath" "regexp" "strings" + "sync" "time" "github.com/hashicorp/go-version" @@ -21,10 +23,11 @@ type resticRepository struct { bin string uri string passwordFile string - shell *Shell excludes []string excludeFiles []string autoPrune bool + + initialized sync.Once } func (r *resticRepository) resticCmd() string { @@ -69,7 +72,7 @@ func checkResticVersion(bin string) error { } // newResticRepository returns a restic repository. -func newResticRepository(params map[string]interface{}, shell *Shell) (Repository, error) { +func newResticRepository(params map[string]interface{}) (Repository, error) { uri, ok := params["uri"].(string) if !ok || uri == "" { return nil, errors.New("missing uri") @@ -112,7 +115,6 @@ func newResticRepository(params map[string]interface{}, shell *Shell) (Repositor passwordFile: tmpf.Name(), excludes: ex, excludeFiles: exf, - shell: shell, autoPrune: autoPrune, }, nil } @@ -121,48 +123,49 @@ func (r *resticRepository) Close() error { return os.Remove(r.passwordFile) } -func (r *resticRepository) Init(ctx context.Context) error { - // Restic init will fail the second time we run it, ignore - // errors. - return r.shell.Run(ctx, fmt.Sprintf( - "%s init --quiet || true", - r.resticCmd(), - )) +func (r *resticRepository) Init(ctx context.Context, rctx RuntimeContext) error { + r.initialized.Do(func() { + // Restic init will fail if the repository is already + // initialized, ignore errors (but log them). + if err := rctx.Shell().Run(ctx, fmt.Sprintf( + "%s init --quiet || true", + r.resticCmd(), + )); err != nil { + log.Printf("restic repository init failed (likely harmless): %v", err) + } + }) + return nil } -func (r *resticRepository) Prepare(ctx context.Context, backup Backup) error { +func (r *resticRepository) Prepare(ctx context.Context, rctx RuntimeContext, backup *Backup) error { if !r.autoPrune { return nil } - return r.shell.Run(ctx, fmt.Sprintf( + return rctx.Shell().Run(ctx, fmt.Sprintf( "%s forget --host %s --keep-last 10 --prune", r.resticCmd(), backup.Host, )) } -func (r *resticRepository) Backup(ctx context.Context, backup Backup, ds Dataset, sourcePath string) error { - cmd := fmt.Sprintf( - "%s backup --cleanup-cache --exclude-caches --one-file-system --tag %s --tag backup_id=%s", +func resticBackupTags(backup *Backup, ds *Dataset) string { + return fmt.Sprintf("--tag dataset_id=%s --tag backup_id=%s", ds.ID, backup.ID) +} + +func (r *resticRepository) BackupCmd(backup *Backup, ds *Dataset, sourcePaths []string) string { + return fmt.Sprintf( + "%s backup --cleanup-cache --exclude-caches --one-file-system %s %s", r.resticCmd(), - ds.Name, - backup.ID, + resticBackupTags(backup, ds), + strings.Join(sourcePaths, " "), ) - for _, atom := range ds.Atoms { - if atom.SourcePath == "" { - return errors.New("atom without source path") - } - cmd += fmt.Sprintf(" %s", atom.SourcePath) - } - return r.shell.Run(ctx, cmd) } -func (r *resticRepository) getSnapshotID(ctx context.Context, backup Backup, ds Dataset) (string, error) { - data, err := r.shell.Output(ctx, fmt.Sprintf( - "%s snapshots --json --tag backup_id=%s --tag %s", +func (r *resticRepository) getSnapshotID(ctx context.Context, rctx RuntimeContext, backup *Backup, ds *Dataset) (string, error) { + data, err := rctx.Shell().Output(ctx, fmt.Sprintf( + "%s snapshots --json %s", r.resticCmd(), - backup.ID, - ds.Name, + resticBackupTags(backup, ds), )) if err != nil { return "", err @@ -177,41 +180,59 @@ func (r *resticRepository) getSnapshotID(ctx context.Context, backup Backup, ds return snaps[0].ShortID, nil } -func (r *resticRepository) Restore(ctx context.Context, backup Backup, ds Dataset, target string) error { - snap, err := r.getSnapshotID(ctx, backup, ds) +func (r *resticRepository) RestoreCmd(ctx context.Context, rctx RuntimeContext, backup *Backup, ds *Dataset, paths []string, target string) (string, error) { + snap, err := r.getSnapshotID(ctx, rctx, backup, ds) if err != nil { - return err + return "", err } - cmd := fmt.Sprintf( - "%s restore", - r.resticCmd(), - ) - for _, atom := range ds.Atoms { - if atom.SourcePath != "" { - cmd += fmt.Sprintf(" --include %s", filepath.Join(atom.SourcePath)) - } + + cmd := []string{ + fmt.Sprintf("%s restore", r.resticCmd()), } - cmd += fmt.Sprintf(" --target %s", target) - cmd += fmt.Sprintf(" %s", snap) - return r.shell.Run(ctx, cmd) -} -func (r *resticRepository) BackupStream(ctx context.Context, backup Backup, ds Dataset, input io.Reader) error { - // Try to do the obvious thing with naming. - name := ds.Name - if len(ds.Atoms) == 1 { - name = fmt.Sprintf("%s.%s", ds.Name, ds.Atoms[0].Name) + for _, path := range paths { + cmd = append(cmd, fmt.Sprintf("--include %s", path)) } - return r.shell.Run(ctx, fmt.Sprintf( - "%s backup --stdin --stdin-filename %s", + + cmd = append(cmd, fmt.Sprintf("--target %s", target)) + cmd = append(cmd, snap) + return strings.Join(cmd, " "), nil +} + +// A special path for stdin datasets that is likely to be unused by the +// rest of the filesystem (the path namespace in Restic is global). +func datasetStdinPath(ds *Dataset) string { + dsPath := filepath.Join(ds.Source, ds.ID) + return fmt.Sprintf("/STDIN_%s", strings.Replace(dsPath, "/", "_", -1)) +} + +func (r *resticRepository) BackupStreamCmd(backup *Backup, ds *Dataset) string { + fakePath := datasetStdinPath(ds) + return fmt.Sprintf( + "%s backup --cleanup-cache --exclude-caches %s --stdin --stdin-filename %s", r.resticCmd(), - name, - )) + resticBackupTags(backup, ds), + fakePath, + ) } -func (r *resticRepository) RestoreStream(_ context.Context, backup Backup, ds Dataset, target string, output io.Writer) error { - // TODO. - return nil +func (r *resticRepository) RestoreStreamCmd(ctx context.Context, rctx RuntimeContext, backup *Backup, ds *Dataset, target string) (string, error) { + snap, err := r.getSnapshotID(ctx, rctx, backup, ds) + if err != nil { + return "", err + } + + fakePath := datasetStdinPath(ds) + targetPath := filepath.Base(fakePath) + + // Restore the file to a temporary directory, then pipe it. + return fmt.Sprintf( + "(%s restore --target %s %s 1>&2 && cat %s)", + r.resticCmd(), + target, + snap, + filepath.Join(target, targetPath), + ), nil } // Data about a snapshot, obtained from 'restic snapshots --json'. diff --git a/repository_restic_test.go b/repository_restic_test.go index 0df393f270f9b2406eca19e570696757e2841ce5..6b349918ff1e87b4cd464d10c2ad6693b1ca8045 100644 --- a/repository_restic_test.go +++ b/repository_restic_test.go @@ -39,7 +39,7 @@ func createTempDirWithData(t *testing.T) string { } // nolint: gocyclo -func TestRestic(t *testing.T) { +func runResticTest(t *testing.T, tmpdir string, source *SourceSpec, restorePattern string, checkFn func(testing.TB, string)) { // Check that we can actually run restic. if err := checkResticVersion("restic"); err != nil { t.Skip("can't run restic: ", err) @@ -47,9 +47,6 @@ func TestRestic(t *testing.T) { store := &dummyMetadataStore{} - tmpdir := createTempDirWithData(t) - defer os.RemoveAll(tmpdir) - repoSpec := RepositorySpec{ Name: "main", Type: "restic", @@ -58,34 +55,22 @@ func TestRestic(t *testing.T) { "password": "testpass", }, } - handlerSpecs := []HandlerSpec{ - { + handlerSpecs := []*HandlerSpec{ + // 'file' is predefined. + &HandlerSpec{ Name: "data", - Type: "file", + Type: "pipe", Params: map[string]interface{}{ - "path": filepath.Join(tmpdir, "data"), - }, - }, - } - sourceSpecs := []SourceSpec{ - { - Name: "source1", - Handler: "data", - Atoms: []Atom{ - { - Name: "f1", - RelativePath: "file1", - }, - { - Name: "f2", - RelativePath: "file2", - }, + "backup_command": "echo data", + // The restore command also verifies the data. + "restore_command": "read row ; test \"x$$row\" = xdata", }, }, } - queueSpec := jobs.QueueSpec{ + queueSpec := &jobs.QueueSpec{ Workers: map[string]int{"backup": 2, "restore": 1}, } + sourceSpecs := []*SourceSpec{source} // Run the backup. configMgr, err := NewConfigManager(&Config{ @@ -105,7 +90,7 @@ func TestRestic(t *testing.T) { } defer m.Close() - backup, err := m.Backup(context.TODO(), configMgr.getSourceSpecs()) + backup, err := m.Backup(context.TODO(), configMgr.current().SourceSpecs()[0]) if err != nil { t.Fatal(err) } @@ -133,12 +118,19 @@ func TestRestic(t *testing.T) { // Now try to restore. err = m.Restore( context.TODO(), - FindRequest{Pattern: "source1/*"}, + &FindRequest{Pattern: restorePattern}, tmpdir+"/restore", ) if err != nil { t.Fatal("Restore", err) } + + if checkFn != nil { + checkFn(t, tmpdir) + } +} + +func checkRestoredData(t testing.TB, tmpdir string) { data, err := ioutil.ReadFile(filepath.Join(tmpdir, "restore", tmpdir, "data", "file1")) if err != nil { t.Fatalf("data/file1 has not been restored: %v", err) @@ -147,3 +139,89 @@ func TestRestic(t *testing.T) { t.Fatalf("data/file1 has bad restored contents: %s", string(data)) } } + +func TestRestic(t *testing.T) { + tmpdir := createTempDirWithData(t) + defer os.RemoveAll(tmpdir) + + runResticTest( + t, tmpdir, + &SourceSpec{ + Name: "source1", + Handler: "file", + Schedule: "@random_every 1h", + Params: map[string]interface{}{ + "path": filepath.Join(tmpdir, "data"), + }, + Datasets: []*DatasetSpec{ + &DatasetSpec{ + Atoms: []Atom{ + { + Name: "f1", + Path: "file1", + }, + { + Name: "f2", + Path: "file2", + }, + }, + }, + }, + }, + "source1/*", + checkRestoredData, + ) +} + +func TestRestic_Stream(t *testing.T) { + tmpdir := createTempDirWithData(t) + defer os.RemoveAll(tmpdir) + + runResticTest( + t, tmpdir, + &SourceSpec{ + Name: "source1", + Handler: "data", + Schedule: "@random_every 1h", + Datasets: []*DatasetSpec{ + &DatasetSpec{ + Atoms: []Atom{ + { + Name: "f1", + }, + }, + }, + }, + }, + "source1/*", + nil, + ) +} + +func TestRestic_Stream_Compress(t *testing.T) { + tmpdir := createTempDirWithData(t) + defer os.RemoveAll(tmpdir) + + runResticTest( + t, tmpdir, + &SourceSpec{ + Name: "source1", + Handler: "data", + Schedule: "@random_every 1h", + Datasets: []*DatasetSpec{ + &DatasetSpec{ + Atoms: []Atom{ + { + Name: "f1", + }, + }, + }, + }, + Params: map[string]interface{}{ + "compress": true, + }, + }, + "source1/*", + nil, + ) +} diff --git a/shell.go b/shell.go index 8fda1cad709a953d13b078398593004a09b2d492..f26c3e88ff56d2b4e9a947fb8aaa61d233f3e005 100644 --- a/shell.go +++ b/shell.go @@ -2,8 +2,6 @@ package tabacco import ( "context" - "fmt" - "io" "io/ioutil" "log" "os" @@ -80,7 +78,10 @@ func (s *Shell) command(ctx context.Context, arg string) *exec.Cmd { if s.dryRun { args = []string{"/bin/echo", arg} } else { - args = []string{"/bin/sh", "-c", arg} + // The pipefail option is necessary for us to detect + // when the first command in a pipeline fails, but we + // need bash for that. + args = []string{"/bin/bash", "-o", "pipefail", "-c", arg} } if s.niceLevel != 0 { @@ -108,60 +109,6 @@ func (s *Shell) command(ctx context.Context, arg string) *exec.Cmd { return c } -// RunStdoutPipe runs a command with a function connected to its -// standard output via a pipe. -func (s *Shell) RunStdoutPipe(ctx context.Context, arg string, fn func(io.Reader) error) error { - cmd := s.command(ctx, arg) - log.Printf("stdout_pipe: %s", arg) - stdout, err := cmd.StdoutPipe() - if err != nil { - return err - } - if err := cmd.Start(); err != nil { - return err - } - - // The error from fn takes precedence over the command exit - // status. We still need to call cmd.Wait() in any case. - ferr := fn(stdout) - if ferr != nil { - // By calling stdout.Close() early we're hoping that - // the child process gets a SIGPIPE. - stdout.Close() - } - werr := cmd.Wait() - - if ferr != nil { - return ferr - } - return werr -} - -// RunStdinPipe runs a command with a function connected to its -// standard input via a pipe. -func (s *Shell) RunStdinPipe(ctx context.Context, arg string, fn func(io.Writer) error) error { - cmd := s.command(ctx, arg) - log.Printf("stdin_pipe: %s", arg) - stdin, err := cmd.StdinPipe() - if err != nil { - return err - } - if err := cmd.Start(); err != nil { - return err - } - - // The error from fn takes precedence over the command exit - // status. We still need to call cmd.Wait() in any case. - ferr := fn(stdin) - stdin.Close() // nolint - werr := cmd.Wait() - - if ferr != nil { - return ferr - } - return werr -} - // Run a command. func (s *Shell) Run(ctx context.Context, arg string) error { c := s.command(ctx, arg) @@ -169,16 +116,6 @@ func (s *Shell) Run(ctx context.Context, arg string) error { return c.Run() } -// RunWithEnv runs a command with additional environment variables. -func (s *Shell) RunWithEnv(ctx context.Context, arg string, envMap map[string]string) error { - c := s.command(ctx, arg) - for k, v := range envMap { - c.Env = append(c.Env, fmt.Sprintf("%s=%s", k, v)) - } - c.Stdout = os.Stdout - return c.Run() -} - // Output runs a command and returns the standard output. func (s *Shell) Output(ctx context.Context, arg string) ([]byte, error) { return s.command(ctx, arg).Output() diff --git a/source.go b/source.go index 28edecfbfee81d1210e1811d579497bd7628e930..9e7e7e7191f1000546fa3911a5b11d868c51ffcd 100644 --- a/source.go +++ b/source.go @@ -1,15 +1,62 @@ package tabacco import ( - "bufio" - "bytes" "context" "errors" "fmt" + "os" "os/exec" + "time" + + "git.autistici.org/ai3/tools/tabacco/util" + "gopkg.in/yaml.v2" ) -// SourceSpec defines the configuration for a data source. +// DatasetSpec describes a dataset in the configuration. +type DatasetSpec struct { + //Name string `yaml:"name"` + + Atoms []Atom `yaml:"atoms"` + AtomsCommand string `yaml:"atoms_command"` +} + +// Parse a DatasetSpec and return a Dataset. +func (spec *DatasetSpec) Parse(ctx context.Context, src *SourceSpec) (*Dataset, error) { + // Build the atoms list, invoking the atoms_command if necessary. + var atoms []Atom + atoms = append(atoms, spec.Atoms...) + if spec.AtomsCommand != "" { + var cmdAtoms []Atom + if err := runYAMLCommand(ctx, spec.AtomsCommand, &cmdAtoms); err != nil { + return nil, fmt.Errorf("source %s: error in atoms command: %v", src.Name, err) + } + atoms = append(atoms, cmdAtoms...) + } + + return &Dataset{ + ID: util.RandomID(), + Source: src.Name, + Atoms: atoms, + }, nil +} + +// Check syntactical validity of the DatasetSpec. +func (spec *DatasetSpec) Check() error { + if len(spec.Atoms) > 0 && spec.AtomsCommand != "" { + return errors.New("can't specify both 'atoms' and 'atoms_command'") + } + if len(spec.Atoms) == 0 && spec.AtomsCommand == "" { + return errors.New("must specify one of 'atoms' or 'atoms_command'") + } + return nil +} + +// SourceSpec defines the configuration for a data source. Data +// sources can dynamically or statically generate one or more +// Datasets, each containing one or more Atoms. +// +// Handlers are launched once per Dataset, and they know how to deal +// with backing up / restoring individual Atoms. type SourceSpec struct { Name string `yaml:"name"` Handler string `yaml:"handler"` @@ -17,93 +64,94 @@ type SourceSpec struct { // Schedule to run the backup on. Schedule string `yaml:"schedule"` - // Define atoms statically, or use a script to generate them + // Define Datasets statically, or use a script to generate them // dynamically on every new backup. - Atoms []Atom `yaml:"atoms"` - AtomsCommand string `yaml:"atoms_command"` + Datasets []*DatasetSpec `yaml:"datasets"` + DatasetsCommand string `yaml:"datasets_command"` + + // Commands to run before and after operations on the source. + PreBackupCommand string `yaml:"pre_backup_command"` + PostBackupCommand string `yaml:"post_backup_command"` + PreRestoreCommand string `yaml:"pre_restore_command"` + PostRestoreCommand string `yaml:"post_restore_command"` + + Params Params `yaml:"params"` - //Params map[string]interface{} `json:"params"` + // Timeout for execution of the entire backup operation. + Timeout time.Duration `yaml:"timeout"` } -// Parse a SourceSpec and return a Dataset instance. -func (spec *SourceSpec) Parse(ctx context.Context) (ds Dataset, err error) { - // Invoke the atoms_command if necessary. - atoms := spec.Atoms - if spec.AtomsCommand != "" { - atoms, err = runAtomsCommand(ctx, spec.AtomsCommand) - if err != nil { - return +// Parse a SourceSpec and return one or more Datasets. +func (spec *SourceSpec) Parse(ctx context.Context) ([]*Dataset, error) { + // Build the atoms list, invoking the atoms_command if + // necessary, and creating actual atoms with absolute names. + dspecs := append([]*DatasetSpec{}, spec.Datasets...) + if spec.DatasetsCommand != "" { + var cmdSpecs []*DatasetSpec + if err := runYAMLCommand(ctx, spec.DatasetsCommand, &cmdSpecs); err != nil { + return nil, fmt.Errorf("error in datasets command: %v", err) } + dspecs = append(dspecs, cmdSpecs...) } - ds = normalizeDataset(Dataset{ - Name: spec.Name, - Handler: spec.Handler, - Atoms: atoms, - }) - return + // Call Parse on all datasets. + datasets := make([]*Dataset, 0, len(dspecs)) + for _, dspec := range dspecs { + ds, err := dspec.Parse(ctx, spec) + if err != nil { + return nil, fmt.Errorf("error parsing dataset: %v", err) + } + datasets = append(datasets, ds) + } + return datasets, nil } -// Check that the configuration is valid. Not an alternative to +// Check syntactical validity of the SourceSpec. Not an alternative to // validation at usage time, but it provides an early warning to the // user. Checks the handler name against a string set of handler // names. -func (spec *SourceSpec) Check(handlers map[string]Handler) error { +func (spec *SourceSpec) Check(handlers map[string]*HandlerSpec) error { + if spec.Timeout == 0 { + spec.Timeout = 24 * time.Hour + } + if spec.Name == "" { - return errors.New("name is empty") + return errors.New("source name is not set") + } + if spec.Schedule == "" { + return errors.New("schedule is not set") + } + if spec.Handler == "" { + return errors.New("handler is not set") } if _, ok := handlers[spec.Handler]; !ok { return fmt.Errorf("unknown handler '%s'", spec.Handler) } - if len(spec.Atoms) > 0 && spec.AtomsCommand != "" { - return errors.New("can't specify both 'atoms' and 'atoms_command'") + if len(spec.Datasets) > 0 && spec.DatasetsCommand != "" { + return errors.New("can't specify both 'datasets' and 'datasets_command'") } - return nil -} - -func runAtomsCommand(ctx context.Context, cmd string) ([]Atom, error) { - c := exec.Command("/bin/sh", "-c", cmd) // #nosec - stdout, err := c.StdoutPipe() - if err != nil { - return nil, err - } - defer stdout.Close() // nolint: errcheck - if err := c.Start(); err != nil { - return nil, err + if len(spec.Datasets) == 0 && spec.DatasetsCommand == "" { + return errors.New("must specify one of 'datasets' or 'datasets_command'") } - var atoms []Atom - scanner := bufio.NewScanner(stdout) - for scanner.Scan() { - parts := bytes.Fields(scanner.Bytes()) - atom := Atom{Name: string(parts[0])} - if len(parts) == 2 { - atom.RelativePath = string(parts[1]) + // Check the datasets, at least those that are provided + // statically. + merr := new(util.MultiError) + for _, ds := range spec.Datasets { + if err := ds.Check(); err != nil { + merr.Add(err) } - atoms = append(atoms, atom) } - return atoms, scanner.Err() + return merr.OrNil() } -func normalizeDataset(ds Dataset) Dataset { - // If the Dataset has no atoms, add an empty one. - if len(ds.Atoms) == 0 { - ds.Atoms = []Atom{Atom{}} - } - - // If there are multiple atoms, and some (or all) have empty - // RelativePaths, just set their RelativePath equal to their - // Name. - if len(ds.Atoms) > 1 { - var atoms []Atom - for _, atom := range ds.Atoms { - if atom.RelativePath == "" { - atom.RelativePath = atom.Name - } - atoms = append(atoms, atom) - } - ds.Atoms = atoms +func runYAMLCommand(ctx context.Context, cmd string, obj interface{}) error { + c := exec.Command("/bin/sh", "-c", cmd) // #nosec + c.Stderr = os.Stderr + output, err := c.Output() + if err != nil { + return err } - return ds + return yaml.Unmarshal(output, obj) } diff --git a/testdata/sources/source.yml b/testdata/sources/source.yml index 5a5305a9b4d64599b846e478db8c004e79c4eaf6..f0c7875387bbad1e30c034518e54b83a87c5cf05 100644 --- a/testdata/sources/source.yml +++ b/testdata/sources/source.yml @@ -1,3 +1,7 @@ name: source1 handler: file schedule: "@random_every 2m" +params: + path: /usr/share/misc +datasets: + - atoms: [{name: magic}] diff --git a/types.go b/types.go index be837c7c55815d50663068084ee26fe73bf9da51..12767c66a86fe5c527a78a2e75716b43e65de2db 100644 --- a/types.go +++ b/types.go @@ -3,7 +3,6 @@ package tabacco import ( "context" "fmt" - "io" "regexp" "strings" "time" @@ -11,6 +10,34 @@ import ( "git.autistici.org/ai3/tools/tabacco/jobs" ) +// Params are configurable parameters in a format friendly to YAML +// representation. +type Params map[string]interface{} + +// Get a string value for a parameter. +func (p Params) Get(key string) string { + if s, ok := p[key].(string); ok { + return s + } + return "" +} + +// GetBool returns a boolean value for a parameter (may be a string). +// Returns value and presence. +func (p Params) GetBool(key string) (bool, bool) { + if b, ok := p[key].(bool); ok { + return b, true + } + if s, ok := p[key].(string); ok { + switch strings.ToLower(s) { + case "on", "yes", "true", "1": + return true, true + } + return false, true + } + return false, false +} + // Backup is the over-arching entity describing a high level backup // operation. Backups are initiated autonomously by individual hosts, // so each Backup belongs to a single Host. @@ -23,19 +50,22 @@ type Backup struct { // Host. Host string `json:"host"` + + // Datasets. + Datasets []*Dataset `json:"datasets"` } // An Atom is a bit of data that can be restored independently as part -// of a Dataset. +// of a Dataset. Atoms are identified uniquely by their absolute path +// in the global atom namespace: this path is built by concatenating +// the source name, the dataset name, and the atom name. type Atom struct { - // Name (path-like, not rooted). + // Name (path-like). Name string `json:"name"` - // Relative path with respect to the Dataset. - RelativePath string `json:"rel_path"` - - // Source path (used for restore). - SourcePath string `json:"source_path,omitempty"` + // Special attribute for the 'file' handler (path relative to + // source root path). + Path string `json:"path,omitempty"` } // A Dataset describes a data set as a high level structure containing @@ -45,13 +75,14 @@ type Atom struct { // databases (the atom we're interested in), which we might want to // restore independently. type Dataset struct { - // Name of the dataset. Will be prepended to target storage + // Name of the dataset (path-like). Will be prepended to atom // paths. - Name string `json:"name"` + //Name string `json:"name"` + ID string `json:"id"` - // Handler specifies the dataset type (which handler to use to - // backup/restore it). - Handler string `json:"handler"` + // Source is the name of the source that created this Dataset, + // stored so that the restore knows what to do. + Source string `json:"source"` // Atoms that are part of this dataset. Atoms []Atom `json:"atoms"` @@ -74,49 +105,39 @@ func (req *FindRequest) matchPattern(s string) bool { return req.patternRx.MatchString(s) } -// A Version ties together a Dataset and a Backup. -type Version struct { - Dataset Dataset `json:"dataset"` - Backup Backup `json:"backup"` -} - // MetadataStore is the client interface to the global metadata store. type MetadataStore interface { // Find the datasets that match a specific criteria. Only // atoms matching the criteria will be included in the Dataset // objects in the response. - FindAtoms(context.Context, FindRequest) ([][]Version, error) + FindAtoms(context.Context, *FindRequest) ([]*Backup, error) // Add a dataset entry (the Backup might already exist). - AddDataset(context.Context, Backup, Dataset) error + AddDataset(context.Context, *Backup, *Dataset) error } // Handler can backup and restore a specific class of datasets. type Handler interface { - DatasetsForBackup(Dataset) []Dataset - DatasetsForRestore(Dataset) []Dataset - BackupJob(Repository, Backup, Dataset) jobs.Job - RestoreJob(Repository, Backup, Dataset, string) jobs.Job - Spec() HandlerSpec + BackupJob(RuntimeContext, *Backup, *Dataset) jobs.Job + RestoreJob(RuntimeContext, *Backup, *Dataset, string) jobs.Job } // Repository is the interface to a remote repository. type Repository interface { - Init(context.Context) error - Prepare(context.Context, Backup) error - Backup(context.Context, Backup, Dataset, string) error - Restore(context.Context, Backup, Dataset, string) error - BackupStream(context.Context, Backup, Dataset, io.Reader) error - RestoreStream(context.Context, Backup, Dataset, string, io.Writer) error + Init(context.Context, RuntimeContext) error + BackupCmd(*Backup, *Dataset, []string) string + RestoreCmd(context.Context, RuntimeContext, *Backup, *Dataset, []string, string) (string, error) + BackupStreamCmd(*Backup, *Dataset) string + RestoreStreamCmd(context.Context, RuntimeContext, *Backup, *Dataset, string) (string, error) Close() error } // Manager for backups and restores. type Manager interface { - BackupJob(context.Context, []SourceSpec) (Backup, jobs.Job, error) - Backup(context.Context, []SourceSpec) (Backup, error) - RestoreJob(context.Context, FindRequest, string) (jobs.Job, error) - Restore(context.Context, FindRequest, string) error + BackupJob(context.Context, *SourceSpec) (*Backup, jobs.Job, error) + Backup(context.Context, *SourceSpec) (*Backup, error) + RestoreJob(context.Context, *FindRequest, string) (jobs.Job, error) + Restore(context.Context, *FindRequest, string) error Close() error // Debug interface.