Commit cf0fc44a authored by ale's avatar ale
Browse files

Add a bunch of config validation

parent 6e2c354b
......@@ -2,6 +2,7 @@ package tabacco
import (
"errors"
"fmt"
"io/ioutil"
"log"
"path/filepath"
......@@ -25,6 +26,8 @@ type Config struct {
MetadataStoreBackend *clientutil.BackendConfig `yaml:"metadb"`
RandomSeedFile string `yaml:"random_seed_file"`
HandlerSpecs []HandlerSpec
SourceSpecs []SourceSpec
}
......@@ -36,19 +39,32 @@ func readHandlersFromDir(dir string) ([]HandlerSpec, error) {
if err := readYAMLFile(path, &spec); err != nil {
return err
}
if err := spec.Check(); err != nil {
return fmt.Errorf("%s: %v", path, err)
}
out = append(out, spec)
return nil
})
return out, err
}
func readSourcesFromDir(dir string) ([]SourceSpec, error) {
func readSourcesFromDir(dir string, handlerSpecs []HandlerSpec) ([]SourceSpec, error) {
// Build a temporary set of handlers, so we can check for
// non-existing ones.
tmp := make(map[string]struct{})
for _, h := range handlerSpecs {
tmp[h.Name] = struct{}{}
}
var out []SourceSpec
err := foreachYAMLFile(dir, func(path string) error {
var spec SourceSpec
if err := readYAMLFile(path, &spec); err != nil {
return err
}
if err := spec.Check(tmp); err != nil {
return fmt.Errorf("%s: %v", path, err)
}
out = append(out, spec)
return nil
})
......@@ -58,23 +74,22 @@ func readSourcesFromDir(dir string) ([]SourceSpec, error) {
// ReadConfig reads the configuration from the given path. Sources and
// handlers are read from the 'sources' and 'handlers' subdirectories
// of the directory containing the main configuration file.
//
// Performs a first level of static validation.
func ReadConfig(path string) (*Config, error) {
// Read and validate the main configuration from 'path'.
var config Config
if err := readYAMLFile(path, &config); err != nil {
return nil, err
}
if err := config.Repository.Check(); err != nil {
return nil, err
}
// Read handlers and sources from subdirectories of the
// directory containing 'path'.
dir := filepath.Dir(path)
sourceSpecs, err := readSourcesFromDir(filepath.Join(dir, "sources"))
if err != nil {
logMultiError("warning: source configuration error: ", err)
if len(sourceSpecs) == 0 {
return nil, errors.New("no configured sources")
}
}
config.SourceSpecs = sourceSpecs
handlerSpecs, err := readHandlersFromDir(filepath.Join(dir, "handlers"))
if err != nil {
logMultiError("warning: handler configuration error: ", err)
......@@ -84,6 +99,15 @@ func ReadConfig(path string) (*Config, error) {
}
config.HandlerSpecs = handlerSpecs
sourceSpecs, err := readSourcesFromDir(filepath.Join(dir, "sources"), handlerSpecs)
if err != nil {
logMultiError("warning: source configuration error: ", err)
if len(sourceSpecs) == 0 {
return nil, errors.New("no configured sources")
}
}
config.SourceSpecs = sourceSpecs
return &config, nil
}
......@@ -102,6 +126,11 @@ func readYAMLFile(path string, obj interface{}) error {
if err != nil {
return err
}
// Slightly improve the user experience (you get a weird error
// otherwise).
if len(data) == 0 {
return fmt.Errorf("%s: empty file", path)
}
return yaml.UnmarshalStrict(data, obj)
}
......@@ -126,56 +155,80 @@ type ConfigManager struct {
config *Config
handlerMap map[string]Handler
repo Repository
// Listeners are notified on every reload.
notifyCh chan struct{}
listeners []func()
}
// NewConfigManager creates a new ConfigManager.
func NewConfigManager(config *Config) (*ConfigManager, error) {
m := new(ConfigManager)
m := &ConfigManager{
notifyCh: make(chan struct{}, 1),
}
if err := m.Reload(config); err != nil {
return nil, err
}
go func() {
for range m.notifyCh {
for _, f := range m.listeners {
f()
}
}
}()
return m, nil
}
// Reload the configuration (at least, the parts of it that can be
// dynamically reloaded).
func (m *ConfigManager) Reload(config *Config) error {
m.mx.Lock()
defer m.mx.Unlock()
shell := NewShell(config.DryRun)
shell.SetNiceLevel(config.DefaultNiceLevel)
shell.SetIOClass(config.DefaultIOClass)
// First, parse the bits that need to be parsed, so that we
// can return an error early and config reloads are atomic.
handlerMap, err := buildHandlerMap(config.HandlerSpecs, shell)
if err != nil {
return err
}
m.handlerMap = handlerMap
repo, err := config.Repository.Parse(shell)
if err != nil {
return err
}
// Update config and notify listeners (in a separate
// goroutine, that does not hold the lock).
m.mx.Lock()
defer m.mx.Unlock()
if m.repo != nil {
m.repo.Close() // nolint
}
m.repo = repo
m.handlerMap = handlerMap
m.config = config
m.notifyCh <- struct{}{}
return nil
}
// Close the ConfigManager and all associated resources.
func (m *ConfigManager) Close() {
m.mx.Lock()
close(m.notifyCh)
if m.repo != nil {
m.repo.Close() // nolint
}
m.mx.Unlock()
}
// Notify the caller when the configuration is reloaded.
func (m *ConfigManager) Notify(f func()) {
m.mx.Lock()
m.listeners = append(m.listeners, f)
m.mx.Unlock()
}
func (m *ConfigManager) getHandlerMap() map[string]Handler {
m.mx.Lock()
defer m.mx.Unlock()
......@@ -199,3 +252,9 @@ func (m *ConfigManager) getSourceSpecs() []SourceSpec {
defer m.mx.Unlock()
return m.config.SourceSpecs
}
func (m *ConfigManager) getSeedFile() string {
m.mx.Lock()
defer m.mx.Unlock()
return m.config.RandomSeedFile
}
......@@ -116,7 +116,7 @@ func (j *stateManager) ServeHTTP(w http.ResponseWriter, r *http.Request) {
pending, running, done := j.getJobsStatus()
w.Header().Set("Content-Type", "text/html")
debugTpl.Lookup("state_manager_debug_page").Execute(w, map[string]interface{}{
_ = debugTpl.Lookup("state_manager_debug_page").Execute(w, map[string]interface{}{
"Pending": pending,
"NumPending": len(pending),
"Running": running,
......@@ -130,7 +130,7 @@ func (j *stateManager) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// stateManager object match the http.Handler interface.
func (s *Scheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/html")
debugTpl.Lookup("scheduler_debug_page").Execute(w, map[string]interface{}{
_ = debugTpl.Lookup("scheduler_debug_page").Execute(w, map[string]interface{}{
"Schedule": s.getStatus(),
})
}
package tabacco
import (
"errors"
"fmt"
)
......@@ -29,6 +30,16 @@ func (spec *HandlerSpec) Parse(shell *Shell) (Handler, error) {
}
}
// Check that the configuration is valid. Not an alternative to
// validation at usage time, but it provides an early warning to the
// user.
func (spec *HandlerSpec) Check() error {
if spec.Name == "" {
return errors.New("name is empty")
}
return nil
}
func buildHandlerMap(specs []HandlerSpec, shell *Shell) (map[string]Handler, error) {
m := make(map[string]Handler)
for _, spec := range specs {
......
package tabacco
import (
"errors"
"fmt"
)
......@@ -21,3 +22,13 @@ func (spec *RepositorySpec) Parse(shell *Shell) (Repository, error) {
return nil, fmt.Errorf("unknown repository type '%s'", spec.Type)
}
}
// Check that the configuration is valid. Not an alternative to
// validation at usage time, but it provides an early warning to the
// user.
func (spec *RepositorySpec) Check() error {
if spec.Name == "" {
return errors.New("name is empty")
}
return nil
}
......@@ -16,6 +16,8 @@ import (
"github.com/robfig/cron"
)
var defaultSeedFile = "/var/tmp/.tabacco_scheduler_seed"
// The Scheduler runs backup jobs periodically, according to the
// schedule specified in the source spec.
//
......@@ -41,7 +43,7 @@ type Scheduler struct {
// NewScheduler creates a new Scheduler.
func NewScheduler(ctx context.Context, m Manager, sourceSpecs []SourceSpec, seedFile string) (*Scheduler, error) {
if seedFile == "" {
seedFile = "/var/tmp/.tabacco_scheduler_seed"
seedFile = defaultSeedFile
}
hostSeed := mustGetSeed(seedFile)
......@@ -66,6 +68,7 @@ func (s *Scheduler) updateSchedule(sourceSpecs []SourceSpec) error {
merr := new(multiError)
var tmp []scheduleAndJob
for _, spec := range sourceSpecs {
// Only schedule sources that have a 'schedule' attribute defined.
if spec.Schedule != "" {
sched, err := parseSchedule(spec.Schedule, s.hostSeed)
if err != nil {
......
......@@ -5,6 +5,7 @@ import (
"bytes"
"context"
"errors"
"fmt"
"os/exec"
)
......@@ -55,6 +56,23 @@ func (spec *SourceSpec) Parse(ctx context.Context) (ds Dataset, err error) {
return
}
// Check that the configuration is valid. Not an alternative to
// validation at usage time, but it provides an early warning to the
// user. Checks the handler name against a string set of handler
// names.
func (spec *SourceSpec) Check(handlers map[string]struct{}) error {
if spec.Name == "" {
return errors.New("name is empty")
}
if _, ok := handlers[spec.Handler]; !ok {
return fmt.Errorf("unknown handler '%s'", spec.Handler)
}
if len(spec.Atoms) == 0 && spec.AtomsScript == "" {
return errors.New("no atoms specified (either directly or via atoms_script)")
}
return nil
}
// Parse multiple SourceSpec objects. Parsing can fail for each source
// independently, so it's possible that this function returns a
// non-nil Dataset list even when err != nil.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment