Commit 461b441a authored by ale's avatar ale

Make ConfigManager public

parent dc583566
......@@ -156,7 +156,7 @@ func TestBackup(t *testing.T) {
}
// Run the backup.
configMgr, err := newConfigManager(&Config{
configMgr, err := NewConfigManager(&Config{
Queue: queueSpec,
Repository: repoSpec,
HandlerSpecs: handlerSpecs,
......@@ -174,7 +174,7 @@ func TestBackup(t *testing.T) {
}
defer m.Close()
backup, err := m.Backup(context.TODO(), configMgr.SourceSpecs())
backup, err := m.Backup(context.TODO(), configMgr.getSourceSpecs())
if err != nil {
t.Fatal(err)
}
......
......@@ -7,6 +7,7 @@ import (
"path/filepath"
"sync"
"git.autistici.org/ai3/go-common/clientutil"
"gopkg.in/yaml.v2"
)
......@@ -22,6 +23,8 @@ type Config struct {
DefaultNiceLevel int `yaml:"default_nice_level"`
DefaultIOClass int `yaml:"default_io_class"`
MetadataStoreBackend *clientutil.BackendConfig `yaml:"metadb"`
HandlerSpecs []HandlerSpec
SourceSpecs []SourceSpec
}
......@@ -116,33 +119,33 @@ func foreachYAMLFile(dir string, f func(string) error) error {
return merr.orNil()
}
// The config manager holds all runtime data derived from the
// configuration itself, so it can be easily reloaded in a single
// place.
type configManager struct {
// ConfigManager holds all runtime data derived from the configuration
// itself, so it can be easily reloaded in a single place.
type ConfigManager struct {
mx sync.Mutex
config *Config
handlerMap map[string]Handler
repo Repository
shell *Shell
}
func newConfigManager(config *Config) (*configManager, error) {
m := new(configManager)
if err := m.reload(config); err != nil {
// NewConfigManager creates a new ConfigManager.
func NewConfigManager(config *Config) (*ConfigManager, error) {
m := new(ConfigManager)
if err := m.Reload(config); err != nil {
return nil, err
}
return m, nil
}
func (m *configManager) reload(config *Config) error {
// Reload the configuration (at least, the parts of it that can be
// dynamically reloaded).
func (m *ConfigManager) Reload(config *Config) error {
m.mx.Lock()
defer m.mx.Unlock()
shell := NewShell(config.DryRun)
shell.SetNiceLevel(config.DefaultNiceLevel)
shell.SetIOClass(config.DefaultIOClass)
m.shell = shell
handlerMap, err := buildHandlerMap(config.HandlerSpecs, shell)
if err != nil {
......@@ -164,7 +167,8 @@ func (m *configManager) reload(config *Config) error {
return nil
}
func (m *configManager) Close() {
// Close the ConfigManager and all associated resources.
func (m *ConfigManager) Close() {
m.mx.Lock()
if m.repo != nil {
m.repo.Close() // nolint
......@@ -172,31 +176,25 @@ func (m *configManager) Close() {
m.mx.Unlock()
}
func (m *configManager) Shell() *Shell {
m.mx.Lock()
defer m.mx.Unlock()
return m.shell
}
func (m *configManager) HandlerMap() map[string]Handler {
func (m *ConfigManager) getHandlerMap() map[string]Handler {
m.mx.Lock()
defer m.mx.Unlock()
return m.handlerMap
}
func (m *configManager) Repository() Repository {
func (m *ConfigManager) getRepository() Repository {
m.mx.Lock()
defer m.mx.Unlock()
return m.repo
}
func (m *configManager) QueueSpec() MultiQueueSpec {
func (m *ConfigManager) getQueueSpec() MultiQueueSpec {
m.mx.Lock()
defer m.mx.Unlock()
return m.config.Queue
}
func (m *configManager) SourceSpecs() []SourceSpec {
func (m *ConfigManager) getSourceSpecs() []SourceSpec {
m.mx.Lock()
defer m.mx.Unlock()
return m.config.SourceSpecs
......
......@@ -184,27 +184,45 @@ const (
jobStatusDone
)
type jobStatus struct {
id string
status int
startedAt time.Time
completedAt time.Time
err error
job Job
type jobStatusEnum int
func (s jobStatusEnum) String() string {
switch s {
case jobStatusPending:
return "PENDING"
case jobStatusRunning:
return "RUNNING"
case jobStatusDone:
return "DONE"
default:
return "UNKNOWN"
}
}
// JobStatus represents the current state of a job.
type JobStatus struct {
ID string
Status jobStatusEnum
StartedAt time.Time
CompletedAt time.Time
Err error
Job Job
}
// Manager that adds a state to jobs and keeps track of it.
// Manager that adds a state to jobs and keeps track of it. This is
// basically a way to keep track of running goroutines at the level of
// granularity that we desire.
type stateManager struct {
mx sync.Mutex
pending map[string]*jobStatus
running map[string]*jobStatus
pending map[string]*JobStatus
running map[string]*JobStatus
done *list.List
}
func newStateManager() *stateManager {
return &stateManager{
pending: make(map[string]*jobStatus),
running: make(map[string]*jobStatus),
pending: make(map[string]*JobStatus),
running: make(map[string]*JobStatus),
done: list.New(),
}
}
......@@ -213,10 +231,10 @@ func (m *stateManager) setStatusPending(jobID string, j Job) {
m.mx.Lock()
defer m.mx.Unlock()
m.pending[jobID] = &jobStatus{
id: jobID,
status: jobStatusPending,
job: j,
m.pending[jobID] = &JobStatus{
ID: jobID,
Status: jobStatusPending,
Job: j,
}
}
......@@ -225,8 +243,8 @@ func (m *stateManager) setStatusRunning(jobID string) {
defer m.mx.Unlock()
js := m.pending[jobID]
js.startedAt = time.Now()
js.status = jobStatusRunning
js.StartedAt = time.Now()
js.Status = jobStatusRunning
delete(m.pending, jobID)
m.running[jobID] = js
}
......@@ -239,9 +257,9 @@ func (m *stateManager) setStatusDone(jobID string, err error) {
js := m.running[jobID]
delete(m.running, jobID)
js.completedAt = time.Now()
js.status = jobStatusDone
js.err = err
js.CompletedAt = time.Now()
js.Status = jobStatusDone
js.Err = err
m.done.PushFront(js)
for m.done.Len() > logsToKeep {
......@@ -259,24 +277,24 @@ func (m *stateManager) withTrackedStatus(j Job) Job {
return sj
}
func jobStatusMapToSlice(m map[string]*jobStatus) []jobStatus {
out := make([]jobStatus, 0, len(m))
func jobStatusMapToSlice(m map[string]*JobStatus) []JobStatus {
out := make([]JobStatus, 0, len(m))
for _, js := range m {
out = append(out, *js)
}
return out
}
func jobStatusListToSlice(l *list.List) []jobStatus {
out := make([]jobStatus, 0, l.Len())
func jobStatusListToSlice(l *list.List) []JobStatus {
out := make([]JobStatus, 0, l.Len())
for e := l.Front(); e != nil; e = e.Next() {
js := e.Value.(*jobStatus)
js := e.Value.(*JobStatus)
out = append(out, *js)
}
return out
}
func (m *stateManager) getJobsStatus() ([]jobStatus, []jobStatus, []jobStatus) {
func (m *stateManager) getJobsStatus() ([]JobStatus, []JobStatus, []JobStatus) {
m.mx.Lock()
defer m.mx.Unlock()
......
......@@ -15,16 +15,16 @@ type tabaccoManager struct {
*multiQueueManager
*stateManager
configMgr *configManager
configMgr *ConfigManager
ms MetadataStore
}
// NewManager creates a new Manager.
func NewManager(ctx context.Context, configMgr *configManager, ms MetadataStore) (Manager, error) {
func NewManager(ctx context.Context, configMgr *ConfigManager, ms MetadataStore) (Manager, error) {
// Note: the queue configuration won't be reloaded.
return &tabaccoManager{
exclusiveLockManager: newExclusiveLockManager(),
multiQueueManager: newMultiQueueManager(configMgr.QueueSpec()),
multiQueueManager: newMultiQueueManager(configMgr.getQueueSpec()),
stateManager: newStateManager(),
configMgr: configMgr,
......@@ -43,7 +43,7 @@ func (m *tabaccoManager) Close() error {
// backup tasks too soon.
func (m *tabaccoManager) prepareBackupJob(backup Backup) Job {
return newJob("backup_prep", func(ctx context.Context) error {
repo := m.configMgr.Repository()
repo := m.configMgr.getRepository()
if err := repo.Init(ctx); err != nil {
log.Printf("repository init failed: %v", err)
return err
......@@ -57,12 +57,12 @@ func (m *tabaccoManager) prepareBackupJob(backup Backup) Job {
func (m *tabaccoManager) backupDatasetJob(backup Backup, ds Dataset) Job {
jobID := fmt.Sprintf("backup_%s", ds.Name)
j := newJob(jobID, func(ctx context.Context) (err error) {
h, ok := m.configMgr.HandlerMap()[ds.Handler]
h, ok := m.configMgr.getHandlerMap()[ds.Handler]
if !ok {
return fmt.Errorf("%s: unknown handler '%s'", ds.Name, ds.Handler)
}
ds, err = h.Backup(ctx, m.configMgr.Repository(), backup, ds)
ds, err = h.Backup(ctx, m.configMgr.getRepository(), backup, ds)
if err != nil {
return fmt.Errorf("%s: backup failed: %v", ds.Name, err)
}
......@@ -127,11 +127,11 @@ func (m *tabaccoManager) Backup(ctx context.Context, sourceSpecs []SourceSpec) (
func (m *tabaccoManager) restoreDatasetJob(backup Backup, ds Dataset, target string) Job {
j := newJob(fmt.Sprintf("restore_%s_%s", ds.Name, backup.ID), func(ctx context.Context) error {
log.Printf("restoring %+v %+v", ds, backup)
h, ok := m.configMgr.HandlerMap()[ds.Handler]
h, ok := m.configMgr.getHandlerMap()[ds.Handler]
if !ok {
return fmt.Errorf("%s: unknown handler '%s'", ds.Name, ds.Handler)
}
return h.Restore(ctx, m.configMgr.Repository(), backup, ds, target)
return h.Restore(ctx, m.configMgr.getRepository(), backup, ds, target)
})
return m.withDefaults(j, "restore")
}
......
......@@ -79,7 +79,7 @@ func TestRestic(t *testing.T) {
}
// Run the backup.
configMgr, err := newConfigManager(&Config{
configMgr, err := NewConfigManager(&Config{
Queue: queueSpec,
Repository: repoSpec,
HandlerSpecs: handlerSpecs,
......@@ -96,7 +96,7 @@ func TestRestic(t *testing.T) {
}
defer m.Close()
backup, err := m.Backup(context.TODO(), configMgr.SourceSpecs())
backup, err := m.Backup(context.TODO(), configMgr.getSourceSpecs())
if err != nil {
t.Fatal(err)
}
......
......@@ -118,9 +118,9 @@ func (s *Scheduler) Stop() {
s.c.Stop()
}
// JobStatus represents the status of a job, either scheduled,
// CronJobStatus represents the status of a job, either scheduled,
// running, or terminated in the past.
type JobStatus struct {
type CronJobStatus struct {
Name string `json:"name"`
Running bool `json:"running"`
Schedule string `json:"schedule"`
......@@ -130,17 +130,17 @@ type JobStatus struct {
Error string `json:"error,omitempty"`
}
type jobStatusList struct {
list []JobStatus
type cronJobStatusList struct {
list []CronJobStatus
lessFn func(int, int) bool
}
func (l *jobStatusList) Len() int { return len(l.list) }
func (l *jobStatusList) Swap(i, j int) { l.list[i], l.list[j] = l.list[j], l.list[i] }
func (l *jobStatusList) Less(i, j int) bool { return l.lessFn(i, j) }
func (l *cronJobStatusList) Len() int { return len(l.list) }
func (l *cronJobStatusList) Swap(i, j int) { l.list[i], l.list[j] = l.list[j], l.list[i] }
func (l *cronJobStatusList) Less(i, j int) bool { return l.lessFn(i, j) }
func jobStatusListOrderByName(list []JobStatus) *jobStatusList {
return &jobStatusList{
func cronJobStatusListOrderByName(list []CronJobStatus) *cronJobStatusList {
return &cronJobStatusList{
list: list,
lessFn: func(i, j int) bool {
return list[i].Name < list[j].Name
......@@ -151,7 +151,7 @@ func jobStatusListOrderByName(list []JobStatus) *jobStatusList {
// SchedulerStatus holds information about the scheduler state, and
// the past executions.
type SchedulerStatus struct {
Scheduled []JobStatus `json:"scheduled"`
Scheduled []CronJobStatus `json:"scheduled"`
}
// GetStatus returns the current status of the scheduled jobs.
......@@ -161,7 +161,7 @@ func (s *Scheduler) GetStatus() *SchedulerStatus {
// Get status of scheduled jobs.
for _, entry := range s.c.Entries() {
if job, ok := entry.Job.(*startBackupCronJob); ok {
status.Scheduled = append(status.Scheduled, JobStatus{
status.Scheduled = append(status.Scheduled, CronJobStatus{
Name: job.spec.Name,
//Running: job.isRunning(),
Schedule: job.spec.Schedule,
......@@ -172,7 +172,7 @@ func (s *Scheduler) GetStatus() *SchedulerStatus {
}
// Sort everything.
sort.Sort(jobStatusListOrderByName(status.Scheduled))
sort.Sort(cronJobStatusListOrderByName(status.Scheduled))
return status
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment