Commit 04055321 authored by ale's avatar ale

Merge branch 'v2' into 'master'

V2

See merge request !1
parents a277f63f 25f279d0
Pipeline #3491 failed with stages
in 1 minute and 23 seconds
...@@ -12,6 +12,7 @@ test: ...@@ -12,6 +12,7 @@ test:
image: "ai/test:go" image: "ai/test:go"
script: script:
- "./install_restic_for_tests.sh" - "./install_restic_for_tests.sh"
- "apt-get install -y liblz4-tool"
- "go-test-runner ." - "go-test-runner ."
except: except:
- master - master
......
...@@ -35,7 +35,8 @@ func NewAgent(ctx context.Context, configMgr *ConfigManager, ms MetadataStore) ( ...@@ -35,7 +35,8 @@ func NewAgent(ctx context.Context, configMgr *ConfigManager, ms MetadataStore) (
case <-stopCh: case <-stopCh:
return return
case <-notifyCh: case <-notifyCh:
schedule, err := makeSchedule(ctx, mgr, configMgr.getSourceSpecs(), configMgr.getSeed()) config := configMgr.current()
schedule, err := makeSchedule(ctx, mgr, config.SourceSpecs(), config.Seed())
if err != nil { if err != nil {
log.Printf("error updating scheduler: %v", err) log.Printf("error updating scheduler: %v", err)
} }
...@@ -63,7 +64,7 @@ func (a *Agent) Close() { ...@@ -63,7 +64,7 @@ func (a *Agent) Close() {
// Create a new jobs.Schedule that will trigger a separate backup for // Create a new jobs.Schedule that will trigger a separate backup for
// each configured data source that includes a 'schedule' attribute. // each configured data source that includes a 'schedule' attribute.
func makeSchedule(ctx context.Context, m Manager, sourceSpecs []SourceSpec, hostSeed int64) (*jobs.Schedule, error) { func makeSchedule(ctx context.Context, m Manager, sourceSpecs []*SourceSpec, hostSeed int64) (*jobs.Schedule, error) {
sched := jobs.NewSchedule(ctx, hostSeed) sched := jobs.NewSchedule(ctx, hostSeed)
merr := new(util.MultiError) merr := new(util.MultiError)
var good int var good int
...@@ -72,9 +73,9 @@ func makeSchedule(ctx context.Context, m Manager, sourceSpecs []SourceSpec, host ...@@ -72,9 +73,9 @@ func makeSchedule(ctx context.Context, m Manager, sourceSpecs []SourceSpec, host
continue continue
} }
// Bind spec to a new closure. // Bind spec to a new closure.
err := func(spec SourceSpec) error { err := func(spec *SourceSpec) error {
return sched.Add(spec.Name, spec.Schedule, func() jobs.Job { return sched.Add(spec.Name, spec.Schedule, func() jobs.Job {
_, j, err := m.BackupJob(ctx, []SourceSpec{spec}) _, j, err := m.BackupJob(ctx, spec)
if err != nil { if err != nil {
log.Printf("%s: can't create backup job: %v", spec.Name, err) log.Printf("%s: can't create backup job: %v", spec.Name, err)
} }
......
...@@ -9,19 +9,19 @@ import ( ...@@ -9,19 +9,19 @@ import (
type fakeManager struct{} type fakeManager struct{}
func (m *fakeManager) BackupJob(context.Context, []SourceSpec) (Backup, jobs.Job, error) { func (m *fakeManager) BackupJob(context.Context, *SourceSpec) (*Backup, jobs.Job, error) {
return Backup{}, nil, nil return &Backup{}, nil, nil
} }
func (m *fakeManager) Backup(context.Context, []SourceSpec) (Backup, error) { func (m *fakeManager) Backup(context.Context, *SourceSpec) (*Backup, error) {
return Backup{}, nil return &Backup{}, nil
} }
func (m *fakeManager) RestoreJob(context.Context, FindRequest, string) (jobs.Job, error) { func (m *fakeManager) RestoreJob(context.Context, *FindRequest, string) (jobs.Job, error) {
return nil, nil return nil, nil
} }
func (m *fakeManager) Restore(context.Context, FindRequest, string) error { func (m *fakeManager) Restore(context.Context, *FindRequest, string) error {
return nil return nil
} }
...@@ -34,27 +34,31 @@ func (m *fakeManager) GetStatus() ([]jobs.Status, []jobs.Status, []jobs.Status) ...@@ -34,27 +34,31 @@ func (m *fakeManager) GetStatus() ([]jobs.Status, []jobs.Status, []jobs.Status)
} }
func TestMakeSchedule(t *testing.T) { func TestMakeSchedule(t *testing.T) {
sourceSpecs := []SourceSpec{ sourceSpecs := []*SourceSpec{
{ &SourceSpec{
Name: "source1", Name: "source1/users",
Handler: "file1", Handler: "file1",
Schedule: "@random_every 1d", Schedule: "@random_every 1d",
Atoms: []Atom{ Datasets: []*DatasetSpec{
{ &DatasetSpec{
Name: "user1", Atoms: []Atom{
RelativePath: "user1", {
}, Name: "user1",
{ Path: "user1",
Name: "user2", },
RelativePath: "user2", {
Name: "user2",
Path: "user2",
},
},
}, },
}, },
}, },
{ &SourceSpec{
Name: "source2", Name: "source2",
Handler: "dbpipe", Handler: "dbpipe",
Schedule: "35 3 * * *", Schedule: "35 3 * * *",
AtomsCommand: "echo user1 user1 ; echo user2 user2", DatasetsCommand: "echo user1 user1 ; echo user2 user2",
}, },
} }
......
package main
import (
"context"
"encoding/json"
"errors"
"flag"
"log"
"os"
"time"
"github.com/google/subcommands"
"git.autistici.org/ai3/tools/tabacco"
mdbc "git.autistici.org/ai3/tools/tabacco/metadb/client"
)
var rpcTimeout = 120 * time.Second
type queryCommand struct {
configPath string
host string
numVersions int
}
func (c *queryCommand) Name() string { return "query" }
func (c *queryCommand) Synopsis() string { return "query the backup metadata database" }
func (c *queryCommand) Usage() string {
return `query [<flags>] <atom_pattern>
Query the backup metadata database.
`
}
func (c *queryCommand) SetFlags(f *flag.FlagSet) {
f.StringVar(&c.configPath, "config", "/etc/tabacco/agent.yml", "configuration `file`")
f.StringVar(&c.host, "host", "", "filter by host")
f.IntVar(&c.numVersions, "num-versions", 1, "return the most recent `N` versions")
}
func (c *queryCommand) buildRequest(f *flag.FlagSet) (*tabacco.FindRequest, error) {
if f.NArg() != 1 {
return nil, errors.New("error: wrong number of arguments")
}
return &tabacco.FindRequest{
Pattern: f.Arg(0),
Host: c.host,
NumVersions: c.numVersions,
}, nil
}
func (c *queryCommand) Execute(ctx context.Context, f *flag.FlagSet, args ...interface{}) subcommands.ExitStatus {
req, err := c.buildRequest(f)
if err != nil {
log.Printf("error in request: %v", err)
return subcommands.ExitUsageError
}
// Parse configuration and connect to the metadata store.
config, err := tabacco.ReadConfig(c.configPath)
if err != nil {
log.Printf("error reading config: %v", err)
return subcommands.ExitFailure
}
store, err := mdbc.New(config.MetadataStoreBackend)
if err != nil {
log.Printf("error in metadata client config: %v", err)
return subcommands.ExitFailure
}
// Make the RPC.
rctx, cancel := context.WithTimeout(ctx, rpcTimeout)
defer cancel()
result, err := store.FindAtoms(rctx, req)
if err != nil {
log.Printf("FindAtoms() error: %v", err)
return subcommands.ExitFailure
}
data, _ := json.MarshalIndent(result, "", " ")
os.Stdout.Write(data)
return subcommands.ExitSuccess
}
func init() {
subcommands.Register(&queryCommand{}, "")
}
...@@ -77,8 +77,8 @@ func (c *restoreCommand) buildRestoreJob(ctx context.Context, mgr tabacco.Manage ...@@ -77,8 +77,8 @@ func (c *restoreCommand) buildRestoreJob(ctx context.Context, mgr tabacco.Manage
return jobs.AsyncGroup(restoreJobs), nil return jobs.AsyncGroup(restoreJobs), nil
} }
func (c *restoreCommand) newFindRequest(s string) tabacco.FindRequest { func (c *restoreCommand) newFindRequest(s string) *tabacco.FindRequest {
return tabacco.FindRequest{ return &tabacco.FindRequest{
Pattern: s, Pattern: s,
} }
} }
......
...@@ -23,7 +23,7 @@ var defaultSeedFile = "/var/tmp/.tabacco_scheduler_seed" ...@@ -23,7 +23,7 @@ var defaultSeedFile = "/var/tmp/.tabacco_scheduler_seed"
// holds it all together. // holds it all together.
type Config struct { type Config struct {
Hostname string `yaml:"hostname"` Hostname string `yaml:"hostname"`
Queue jobs.QueueSpec `yaml:"queue_config"` Queue *jobs.QueueSpec `yaml:"queue_config"`
Repository RepositorySpec `yaml:"repository"` Repository RepositorySpec `yaml:"repository"`
DryRun bool `yaml:"dry_run"` DryRun bool `yaml:"dry_run"`
DefaultNiceLevel int `yaml:"default_nice_level"` DefaultNiceLevel int `yaml:"default_nice_level"`
...@@ -32,43 +32,83 @@ type Config struct { ...@@ -32,43 +32,83 @@ type Config struct {
RandomSeedFile string `yaml:"random_seed_file"` RandomSeedFile string `yaml:"random_seed_file"`
MetadataStoreBackend *clientutil.BackendConfig `yaml:"metadb"` MetadataStoreBackend *clientutil.BackendConfig `yaml:"metadb"`
HandlerSpecs []HandlerSpec HandlerSpecs []*HandlerSpec
SourceSpecs []SourceSpec SourceSpecs []*SourceSpec
} }
type runtimeAssets struct { // RuntimeContext provides access to runtime objects whose lifetime is
handlerMap map[string]Handler // ultimately tied to the configuration. Configuration can change
repo Repository // during the lifetime of the process, but we want backup jobs to have
seed int64 // a consistent view of the configuration while they execute, so
shell *Shell // access to the current version of the configuration is controlled to
// the ConfigManager.
type RuntimeContext interface {
Shell() *Shell
Repo() Repository
QueueSpec() *jobs.QueueSpec
Seed() int64
WorkDir() string
SourceSpecs() []*SourceSpec
FindSource(string) *SourceSpec
HandlerSpec(string) *HandlerSpec
Close()
} }
func (a *runtimeAssets) Close() { // The set of objects that are created from a Config and that the main
// code cares about.
type parsedConfig struct {
handlerMap map[string]*HandlerSpec
sourceSpecs []*SourceSpec
sourceSpecsByName map[string]*SourceSpec
queue *jobs.QueueSpec
repo Repository
seed int64
shell *Shell
workDir string
}
func (a *parsedConfig) Close() {
a.repo.Close() // nolint a.repo.Close() // nolint
} }
func buildHandlerMap(specs []HandlerSpec, shell *Shell) (map[string]Handler, error) { func (a *parsedConfig) Shell() *Shell { return a.shell }
m := make(map[string]Handler) func (a *parsedConfig) Repo() Repository { return a.repo }
merr := new(util.MultiError) func (a *parsedConfig) QueueSpec() *jobs.QueueSpec { return a.queue }
for _, spec := range specs { func (a *parsedConfig) Seed() int64 { return a.seed }
h, err := spec.Parse(shell) func (a *parsedConfig) WorkDir() string { return a.workDir }
if err != nil { func (a *parsedConfig) SourceSpecs() []*SourceSpec { return a.sourceSpecs }
merr.Add(err)
continue func (a *parsedConfig) HandlerSpec(name string) *HandlerSpec {
} return a.handlerMap[name]
m[spec.Name] = h }
func (a *parsedConfig) FindSource(name string) *SourceSpec {
return a.sourceSpecsByName[name]
}
func buildHandlerMap(specs []*HandlerSpec) map[string]*HandlerSpec {
// Create a handler map with a default 'file' spec.
m := map[string]*HandlerSpec{
"file": &HandlerSpec{
Name: "file",
Type: "file",
},
}
for _, h := range specs {
m[h.Name] = h
} }
return m, merr.OrNil() return m
} }
func (c *Config) parse() (*runtimeAssets, error) { func (c *Config) parse() (*parsedConfig, error) {
shell := NewShell(c.DryRun) shell := NewShell(c.DryRun)
shell.SetNiceLevel(c.DefaultNiceLevel) shell.SetNiceLevel(c.DefaultNiceLevel)
shell.SetIOClass(c.DefaultIOClass) shell.SetIOClass(c.DefaultIOClass)
// Parse the repository config. An error here is fatal, as we // Parse the repository config. An error here is fatal, as we
// don't have a way to operate without a repository. // don't have a way to operate without a repository.
repo, err := c.Repository.Parse(shell) repo, err := c.Repository.Parse()
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -76,21 +116,26 @@ func (c *Config) parse() (*runtimeAssets, error) { ...@@ -76,21 +116,26 @@ func (c *Config) parse() (*runtimeAssets, error) {
merr := new(util.MultiError) merr := new(util.MultiError)
// Build the handlers. // Build the handlers.
handlerMap, err := buildHandlerMap(c.HandlerSpecs, shell) handlerMap := buildHandlerMap(c.HandlerSpecs)
if err != nil {
merr.Add(err)
}
// Validate the sources (Parse is called later at runtime). // Validate the sources (Parse is called later at runtime).
var srcs []SourceSpec // Sources that fail the check are removed from the
// SourceSpecs array. We also check that sources have unique
// names.
srcMap := make(map[string]*SourceSpec)
var srcs []*SourceSpec
for _, spec := range c.SourceSpecs { for _, spec := range c.SourceSpecs {
if err := spec.Check(handlerMap); err != nil { if err := spec.Check(handlerMap); err != nil {
merr.Add(err) merr.Add(fmt.Errorf("source %s: %v", spec.Name, err))
continue continue
} }
if _, ok := srcMap[spec.Name]; ok {
merr.Add(fmt.Errorf("duplicated source %s", spec.Name))
continue
}
srcMap[spec.Name] = spec
srcs = append(srcs, spec) srcs = append(srcs, spec)
} }
c.SourceSpecs = srcs
// Read (or create) the seed file. // Read (or create) the seed file.
seedFile := defaultSeedFile seedFile := defaultSeedFile
...@@ -99,28 +144,32 @@ func (c *Config) parse() (*runtimeAssets, error) { ...@@ -99,28 +144,32 @@ func (c *Config) parse() (*runtimeAssets, error) {
} }
seed := mustGetSeed(seedFile) seed := mustGetSeed(seedFile)
return &runtimeAssets{ return &parsedConfig{
shell: shell, handlerMap: handlerMap,
repo: repo, sourceSpecs: srcs,
handlerMap: handlerMap, sourceSpecsByName: srcMap,
seed: seed, queue: c.Queue,
shell: shell,
repo: repo,
seed: seed,
workDir: c.WorkDir,
}, merr.OrNil() }, merr.OrNil()
} }
// The following functions read YAML files from .d-style directories. To be nice // The following functions read YAML files from .d-style directories. To be nice
// to the user, each file can contain either a single object or a list of // to the user, each file can contain either a single object or a list of
// multiple objects. // multiple objects.
func readHandlersFromDir(dir string) ([]HandlerSpec, error) { func readHandlersFromDir(dir string) ([]*HandlerSpec, error) {
var out []HandlerSpec var out []*HandlerSpec
err := foreachYAMLFile(dir, func(path string) error { err := foreachYAMLFile(dir, func(path string) error {
var specs []HandlerSpec var specs []*HandlerSpec
log.Printf("reading handler: %s", path) log.Printf("reading handler: %s", path)
if err := readYAMLFile(path, &specs); err != nil { if err := readYAMLFile(path, &specs); err != nil {
var spec HandlerSpec var spec HandlerSpec
if err := readYAMLFile(path, &spec); err != nil { if err := readYAMLFile(path, &spec); err != nil {
return err return err
} }
specs = []HandlerSpec{spec} specs = append(specs, &spec)
} }
out = append(out, specs...) out = append(out, specs...)
return nil return nil
...@@ -128,17 +177,17 @@ func readHandlersFromDir(dir string) ([]HandlerSpec, error) { ...@@ -128,17 +177,17 @@ func readHandlersFromDir(dir string) ([]HandlerSpec, error) {
return out, err return out, err
} }
func readSourcesFromDir(dir string) ([]SourceSpec, error) { func readSourcesFromDir(dir string) ([]*SourceSpec, error) {
var out []SourceSpec var out []*SourceSpec
err := foreachYAMLFile(dir, func(path string) error { err := foreachYAMLFile(dir, func(path string) error {
var specs []SourceSpec var specs []*SourceSpec
log.Printf("reading source: %s", path) log.Printf("reading source: %s", path)
if err := readYAMLFile(path, &specs); err != nil { if err := readYAMLFile(path, &specs); err != nil {
var spec SourceSpec var spec SourceSpec
if err := readYAMLFile(path, &spec); err != nil { if err := readYAMLFile(path, &spec); err != nil {
return err return err
} }
specs = []SourceSpec{spec} specs = append(specs, &spec)
} }
out = append(out, specs...) out = append(out, specs...)
return nil return nil
...@@ -215,6 +264,7 @@ func foreachYAMLFile(dir string, f func(string) error) error { ...@@ -215,6 +264,7 @@ func foreachYAMLFile(dir string, f func(string) error) error {
merr := new(util.MultiError) merr := new(util.MultiError)
for _, path := range files { for _, path := range files {
if err := f(path); err != nil { if err := f(path); err != nil {
log.Printf("error loading yaml file %s: %v", path, err)
merr.Add(err) merr.Add(err)
} }
} }
...@@ -228,8 +278,7 @@ func foreachYAMLFile(dir string, f func(string) error) error { ...@@ -228,8 +278,7 @@ func foreachYAMLFile(dir string, f func(string) error) error {
// unregister). // unregister).
type ConfigManager struct { type ConfigManager struct {
mx sync.Mutex mx sync.Mutex
config *Config parsed *parsedConfig
assets *runtimeAssets
// Listeners are notified on every reload. // Listeners are notified on every reload.
notifyCh chan struct{} notifyCh chan struct{}
...@@ -246,12 +295,14 @@ func NewConfigManager(config *Config) (*ConfigManager, error) { ...@@ -246,12 +295,14 @@ func NewConfigManager(config *Config) (*ConfigManager, error) {
} }
go func() { go func() {
for range m.notifyCh { for range m.notifyCh {
m.mx.Lock()
for _, lch := range m.listeners { for _, lch := range m.listeners {
select { select {
case lch <- struct{}{}: case lch <- struct{}{}:
default: default:
} }
} }
m.mx.Unlock()
} }
}() }()
return m, nil return m, nil
...@@ -260,22 +311,23 @@ func NewConfigManager(config *Config) (*ConfigManager, error) { ...@@ -260,22 +311,23 @@ func NewConfigManager(config *Config) (*ConfigManager, error) {
// Reload the configuration (at least, the parts of it that can be // Reload the configuration (at least, the parts of it that can be
// dynamically reloaded). // dynamically reloaded).
func (m *ConfigManager) Reload(config *Config) error { func (m *ConfigManager) Reload(config *Config) error {
assets, err := config.parse() parsed, err := config.parse()
if assets == nil { if parsed == nil {
return err return err
} else if err != nil {
log.Printf("warning: errors in configuration: %v", err)
} }
// Update config and notify listeners (in a separate // Update config and notify listeners (in a separate
// goroutine, that does not hold the lock). // goroutine, that does not hold the lock).
m.mx.Lock() m.mx.Lock()
defer m.mx.Unlock() defer m.mx.Unlock()
if m.assets != nil { if m.parsed != nil {
m.assets.Close() // nolint m.parsed.Close() // nolint
} }
log.Printf("loaded new config: %d handlers, %d sources", len(assets.handlerMap), len(config.SourceSpecs)) log.Printf("loaded new config: %d handlers, %d sources", len(parsed.handlerMap), len(parsed.sourceSpecs))
m.assets = assets m.parsed = parsed
m.config = config
m.notifyCh <- struct{}{} m.notifyCh <- struct{}{}
return nil return nil
} }
...@@ -284,8 +336,8 @@ func (m *ConfigManager) Reload(config *Config) error { ...@@ -284,8 +336,8 @@ func (m *ConfigManager) Reload(config *Config) error {
func (m *ConfigManager) Close() { func (m *ConfigManager) Close() {
m.mx.Lock() m.mx.Lock()
close(m.notifyCh) close(m.notifyCh)
if m.assets != nil { if m.parsed != nil {
m.assets.Close() m.parsed.Close()
} }
m.mx.Unlock() m.mx.Unlock()
} }
...@@ -303,47 +355,16 @@ func (m *ConfigManager) Notify() <-chan struct{} { ...@@ -303,47 +355,16 @@ func (m *ConfigManager) Notify() <-chan struct{} {
return ch return ch
} }
func (m *ConfigManager) getHandler(name string) (Handler, bool) { // NewRuntimeContext returns a new RuntimeContext, capturing current
m.mx.Lock() // configuration and runtime assets.
defer m.mx.Unlock() func (m *ConfigManager) NewRuntimeContext() RuntimeContext {
h, ok := m.assets.handlerMap[name] return m.current()
return h, ok
}
func (m *ConfigManager) getRepository() Repository {
m.mx.Lock()
defer m.mx.Unlock()
return m.assets.repo