config.go 8.89 KB
Newer Older
ale's avatar
ale committed
1 2 3
package tabacco

import (
4
	"encoding/binary"
ale's avatar
ale committed
5
	"errors"
ale's avatar
ale committed
6
	"fmt"
ale's avatar
ale committed
7 8 9 10 11
	"io/ioutil"
	"log"
	"path/filepath"
	"sync"

ale's avatar
ale committed
12
	"git.autistici.org/ai3/go-common/clientutil"
ale's avatar
ale committed
13 14
	"gopkg.in/yaml.v2"

ale's avatar
ale committed
15 16
	"git.autistici.org/ai3/tools/tabacco/jobs"
	"git.autistici.org/ai3/tools/tabacco/util"
ale's avatar
ale committed
17 18
)

19 20
var defaultSeedFile = "/var/tmp/.tabacco_scheduler_seed"

ale's avatar
ale committed
21 22 23 24
// Config is the global configuration object. While the actual
// configuration is spread over multiple files and directories, this
// holds it all together.
type Config struct {
ale's avatar
ale committed
25 26 27 28 29 30 31 32
	Hostname             string                    `yaml:"hostname"`
	Queue                jobs.QueueSpec            `yaml:"queue_config"`
	Repository           RepositorySpec            `yaml:"repository"`
	DryRun               bool                      `yaml:"dry_run"`
	DefaultNiceLevel     int                       `yaml:"default_nice_level"`
	DefaultIOClass       int                       `yaml:"default_io_class"`
	WorkDir              string                    `yaml:"work_dir"`
	RandomSeedFile       string                    `yaml:"random_seed_file"`
ale's avatar
ale committed
33 34
	MetadataStoreBackend *clientutil.BackendConfig `yaml:"metadb"`

ale's avatar
ale committed
35 36 37 38
	HandlerSpecs []HandlerSpec
	SourceSpecs  []SourceSpec
}

ale's avatar
ale committed
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
type runtimeAssets struct {
	handlerMap map[string]Handler
	repo       Repository
	seed       int64
	shell      *Shell
}

func (a *runtimeAssets) Close() {
	a.repo.Close() // nolint
}

func buildHandlerMap(specs []HandlerSpec, shell *Shell) (map[string]Handler, error) {
	m := make(map[string]Handler)
	merr := new(util.MultiError)
	for _, spec := range specs {
		h, err := spec.Parse(shell)
		if err != nil {
			merr.Add(err)
			continue
		}
		m[spec.Name] = h
	}
	return m, merr.OrNil()
}

func (c *Config) parse() (*runtimeAssets, error) {
	shell := NewShell(c.DryRun)
	shell.SetNiceLevel(c.DefaultNiceLevel)
	shell.SetIOClass(c.DefaultIOClass)

	// Parse the repository config. An error here is fatal, as we
	// don't have a way to operate without a repository.
	repo, err := c.Repository.Parse(shell)
	if err != nil {
		return nil, err
	}

	merr := new(util.MultiError)

	// Build the handlers.
	handlerMap, err := buildHandlerMap(c.HandlerSpecs, shell)
	if err != nil {
		merr.Add(err)
	}

	// Validate the sources (Parse is called later at runtime).
	var srcs []SourceSpec
	for _, spec := range c.SourceSpecs {
		if err := spec.Check(handlerMap); err != nil {
			merr.Add(err)
			continue
		}
		srcs = append(srcs, spec)
	}
	c.SourceSpecs = srcs

	// Read (or create) the seed file.
	seedFile := defaultSeedFile
	if c.RandomSeedFile != "" {
		seedFile = c.RandomSeedFile
	}
	seed := mustGetSeed(seedFile)

	return &runtimeAssets{
		shell:      shell,
		repo:       repo,
		handlerMap: handlerMap,
		seed:       seed,
	}, merr.OrNil()
}

110 111 112
// The following functions read YAML files from .d-style directories. To be nice
// to the user, each file can contain either a single object or a list of
// multiple objects.
ale's avatar
ale committed
113 114 115
func readHandlersFromDir(dir string) ([]HandlerSpec, error) {
	var out []HandlerSpec
	err := foreachYAMLFile(dir, func(path string) error {
116
		var specs []HandlerSpec
ale's avatar
ale committed
117
		log.Printf("reading handler: %s", path)
118 119 120 121 122 123
		if err := readYAMLFile(path, &specs); err != nil {
			var spec HandlerSpec
			if err := readYAMLFile(path, &spec); err != nil {
				return err
			}
			specs = []HandlerSpec{spec}
ale's avatar
ale committed
124
		}
125
		out = append(out, specs...)
ale's avatar
ale committed
126 127 128 129 130
		return nil
	})
	return out, err
}

ale's avatar
ale committed
131
func readSourcesFromDir(dir string) ([]SourceSpec, error) {
ale's avatar
ale committed
132 133
	var out []SourceSpec
	err := foreachYAMLFile(dir, func(path string) error {
134
		var specs []SourceSpec
ale's avatar
ale committed
135
		log.Printf("reading source: %s", path)
136 137 138 139 140 141
		if err := readYAMLFile(path, &specs); err != nil {
			var spec SourceSpec
			if err := readYAMLFile(path, &spec); err != nil {
				return err
			}
			specs = []SourceSpec{spec}
ale's avatar
ale committed
142
		}
143
		out = append(out, specs...)
ale's avatar
ale committed
144 145 146 147 148 149 150 151
		return nil
	})
	return out, err
}

// ReadConfig reads the configuration from the given path. Sources and
// handlers are read from the 'sources' and 'handlers' subdirectories
// of the directory containing the main configuration file.
ale's avatar
ale committed
152 153
//
// Performs a first level of static validation.
ale's avatar
ale committed
154
func ReadConfig(path string) (*Config, error) {
ale's avatar
ale committed
155
	// Read and validate the main configuration from 'path'.
ale's avatar
ale committed
156
	var config Config
ale's avatar
ale committed
157
	log.Printf("reading config: %s", path)
ale's avatar
ale committed
158 159 160 161
	if err := readYAMLFile(path, &config); err != nil {
		return nil, err
	}

ale's avatar
ale committed
162 163
	// Read handlers and sources from subdirectories of the
	// directory containing 'path'.
ale's avatar
ale committed
164 165 166 167 168 169 170 171 172 173 174
	dir := filepath.Dir(path)

	handlerSpecs, err := readHandlersFromDir(filepath.Join(dir, "handlers"))
	if err != nil {
		logMultiError("warning: handler configuration error: ", err)
		if len(handlerSpecs) == 0 {
			return nil, errors.New("no configured handlers")
		}
	}
	config.HandlerSpecs = handlerSpecs

ale's avatar
ale committed
175
	sourceSpecs, err := readSourcesFromDir(filepath.Join(dir, "sources"))
ale's avatar
ale committed
176 177 178 179 180 181 182 183
	if err != nil {
		logMultiError("warning: source configuration error: ", err)
		if len(sourceSpecs) == 0 {
			return nil, errors.New("no configured sources")
		}
	}
	config.SourceSpecs = sourceSpecs

ale's avatar
ale committed
184 185 186 187
	return &config, nil
}

func logMultiError(prefix string, err error) {
188
	if merr, ok := err.(*util.MultiError); ok {
ale's avatar
ale committed
189 190 191 192 193 194 195 196 197 198 199 200 201
		for _, e := range merr.Errors() {
			log.Printf("%s%v", prefix, e)
		}
	} else {
		log.Printf("%s%v", prefix, err)
	}
}

func readYAMLFile(path string, obj interface{}) error {
	data, err := ioutil.ReadFile(path) // nolint: gosec
	if err != nil {
		return err
	}
ale's avatar
ale committed
202 203 204 205 206
	// Slightly improve the user experience (you get a weird error
	// otherwise).
	if len(data) == 0 {
		return fmt.Errorf("%s: empty file", path)
	}
ale's avatar
ale committed
207 208 209 210 211 212 213 214
	return yaml.UnmarshalStrict(data, obj)
}

func foreachYAMLFile(dir string, f func(string) error) error {
	files, err := filepath.Glob(filepath.Join(dir, "*.yml"))
	if err != nil {
		return err
	}
215
	merr := new(util.MultiError)
ale's avatar
ale committed
216 217 218 219 220
	for _, path := range files {
		if err := f(path); err != nil {
			merr.Add(err)
		}
	}
221
	return merr.OrNil()
ale's avatar
ale committed
222 223
}

ale's avatar
ale committed
224
// ConfigManager holds all runtime data derived from the configuration
225 226 227 228
// itself, so it can be easily reloaded by calling Reload(). Listeners
// should register themselves with Notify() in order to be updated
// when the configuration changes (there is currently no way to
// unregister).
ale's avatar
ale committed
229
type ConfigManager struct {
ale's avatar
ale committed
230 231 232
	mx     sync.Mutex
	config *Config
	assets *runtimeAssets
ale's avatar
ale committed
233 234 235

	// Listeners are notified on every reload.
	notifyCh  chan struct{}
236
	listeners []chan struct{}
ale's avatar
ale committed
237 238
}

ale's avatar
ale committed
239 240
// NewConfigManager creates a new ConfigManager.
func NewConfigManager(config *Config) (*ConfigManager, error) {
ale's avatar
ale committed
241 242 243
	m := &ConfigManager{
		notifyCh: make(chan struct{}, 1),
	}
ale's avatar
ale committed
244
	if err := m.Reload(config); err != nil {
ale's avatar
ale committed
245 246
		return nil, err
	}
ale's avatar
ale committed
247 248
	go func() {
		for range m.notifyCh {
249 250 251 252 253
			for _, lch := range m.listeners {
				select {
				case lch <- struct{}{}:
				default:
				}
ale's avatar
ale committed
254 255 256
			}
		}
	}()
ale's avatar
ale committed
257 258 259
	return m, nil
}

ale's avatar
ale committed
260 261 262
// Reload the configuration (at least, the parts of it that can be
// dynamically reloaded).
func (m *ConfigManager) Reload(config *Config) error {
ale's avatar
ale committed
263 264
	assets, err := config.parse()
	if assets == nil {
ale's avatar
ale committed
265 266
		return err
	}
ale's avatar
ale committed
267 268 269 270 271

	// Update config and notify listeners (in a separate
	// goroutine, that does not hold the lock).
	m.mx.Lock()
	defer m.mx.Unlock()
ale's avatar
ale committed
272 273
	if m.assets != nil {
		m.assets.Close() // nolint
ale's avatar
ale committed
274
	}
ale's avatar
ale committed
275 276 277

	log.Printf("loaded new config: %d handlers, %d sources", len(assets.handlerMap), len(config.SourceSpecs))
	m.assets = assets
ale's avatar
ale committed
278
	m.config = config
ale's avatar
ale committed
279
	m.notifyCh <- struct{}{}
ale's avatar
ale committed
280 281 282
	return nil
}

ale's avatar
ale committed
283 284
// Close the ConfigManager and all associated resources.
func (m *ConfigManager) Close() {
ale's avatar
ale committed
285
	m.mx.Lock()
ale's avatar
ale committed
286
	close(m.notifyCh)
ale's avatar
ale committed
287 288
	if m.assets != nil {
		m.assets.Close()
ale's avatar
ale committed
289 290 291 292
	}
	m.mx.Unlock()
}

ale's avatar
ale committed
293
// Notify the caller when the configuration is reloaded.
294
func (m *ConfigManager) Notify() <-chan struct{} {
ale's avatar
ale committed
295
	m.mx.Lock()
296 297
	defer m.mx.Unlock()

298 299
	// Create the channel and prime it with a value so the
	// listener loads its initial configuration.
300
	ch := make(chan struct{}, 1)
301
	ch <- struct{}{}
302 303
	m.listeners = append(m.listeners, ch)
	return ch
ale's avatar
ale committed
304 305
}

306
func (m *ConfigManager) getHandler(name string) (Handler, bool) {
ale's avatar
ale committed
307 308
	m.mx.Lock()
	defer m.mx.Unlock()
ale's avatar
ale committed
309
	h, ok := m.assets.handlerMap[name]
310
	return h, ok
ale's avatar
ale committed
311 312
}

ale's avatar
ale committed
313
func (m *ConfigManager) getRepository() Repository {
ale's avatar
ale committed
314 315
	m.mx.Lock()
	defer m.mx.Unlock()
ale's avatar
ale committed
316
	return m.assets.repo
ale's avatar
ale committed
317 318
}

319
func (m *ConfigManager) getQueueSpec() jobs.QueueSpec {
ale's avatar
ale committed
320 321 322 323 324
	m.mx.Lock()
	defer m.mx.Unlock()
	return m.config.Queue
}

ale's avatar
ale committed
325
func (m *ConfigManager) getSourceSpecs() []SourceSpec {
ale's avatar
ale committed
326 327 328 329
	m.mx.Lock()
	defer m.mx.Unlock()
	return m.config.SourceSpecs
}
ale's avatar
ale committed
330

331
func (m *ConfigManager) getSeed() int64 {
ale's avatar
ale committed
332 333
	m.mx.Lock()
	defer m.mx.Unlock()
ale's avatar
ale committed
334
	return m.assets.seed
335 336
}

337 338 339
func (m *ConfigManager) getShell() *Shell {
	m.mx.Lock()
	defer m.mx.Unlock()
ale's avatar
ale committed
340 341 342 343 344 345 346
	return m.assets.shell
}

func (m *ConfigManager) getWorkDir() string {
	m.mx.Lock()
	defer m.mx.Unlock()
	return m.config.WorkDir
347 348
}

349 350 351 352 353 354
func mustGetSeed(path string) int64 {
	if data, err := ioutil.ReadFile(path); err == nil && len(data) == 8 { // nolint: gosec
		if seed := binary.LittleEndian.Uint64(data); seed > 0 {
			return int64(seed)
		}
	}
ale's avatar
ale committed
355
	log.Printf("generating new random seed for this host")
ale's avatar
ale committed
356
	seed, data := util.RandomSeed()
357 358 359
	if err := ioutil.WriteFile(path, data, 0600); err != nil {
		log.Printf("warning: can't write random seed file: %v", err)
	}
ale's avatar
ale committed
360
	return seed
ale's avatar
ale committed
361
}