Commit eaa119ca authored by ale's avatar ale

Add support for pre/post commands on handlers

parent 1861eeb8
......@@ -109,6 +109,7 @@ type MetadataStore interface {
type Handler interface {
Backup(context.Context, Repository, Backup, Dataset) (Dataset, error)
Restore(context.Context, Repository, Backup, Dataset, string) error
Spec() HandlerSpec
}
// Repository is the interface to a remote repository.
......
......@@ -162,6 +162,7 @@ type ConfigManager struct {
handlerMap map[string]Handler
repo Repository
seed int64
shell *Shell
// Listeners are notified on every reload.
notifyCh chan struct{}
......@@ -222,6 +223,7 @@ func (m *ConfigManager) Reload(config *Config) error {
m.handlerMap = handlerMap
m.config = config
m.seed = seed
m.shell = shell
m.notifyCh <- struct{}{}
return nil
}
......@@ -243,10 +245,11 @@ func (m *ConfigManager) Notify(f func()) {
m.mx.Unlock()
}
func (m *ConfigManager) getHandlerMap() map[string]Handler {
func (m *ConfigManager) getHandler(name string) (Handler, bool) {
m.mx.Lock()
defer m.mx.Unlock()
return m.handlerMap
h, ok := m.handlerMap[name]
return h, ok
}
func (m *ConfigManager) getRepository() Repository {
......@@ -273,6 +276,12 @@ func (m *ConfigManager) getSeed() int64 {
return m.seed
}
func (m *ConfigManager) getShell() *Shell {
m.mx.Lock()
defer m.mx.Unlock()
return m.shell
}
func mustGetSeed(path string) int64 {
if data, err := ioutil.ReadFile(path); err == nil && len(data) == 8 { // nolint: gosec
if seed := binary.LittleEndian.Uint64(data); seed > 0 {
......
......@@ -8,14 +8,19 @@ import (
type fileHandler struct {
path string
spec HandlerSpec
}
func newFileHandler(params map[string]interface{}, _ *Shell) (Handler, error) {
path, ok := params["path"].(string)
func newFileHandler(spec HandlerSpec, _ *Shell) (Handler, error) {
path, ok := spec.Params["path"].(string)
if !ok || path == "" {
return nil, errors.New("missing path")
}
return &fileHandler{path}, nil
return &fileHandler{path: path, spec: spec}, nil
}
func (h *fileHandler) Spec() HandlerSpec {
return h.spec
}
func (h *fileHandler) Backup(ctx context.Context, repo Repository, backup Backup, ds Dataset) (Dataset, error) {
......
......@@ -14,24 +14,30 @@ import (
type pipeHandler struct {
backupCmd, restoreCmd string
shell *Shell
spec HandlerSpec
}
func newPipeHandler(params map[string]interface{}, shell *Shell) (Handler, error) {
backupCmd, ok := params["backup_cmd"].(string)
func newPipeHandler(spec HandlerSpec, shell *Shell) (Handler, error) {
backupCmd, ok := spec.Params["backup_cmd"].(string)
if !ok || backupCmd == "" {
return nil, errors.New("missing backup_cmd")
}
restoreCmd, ok := params["restore_cmd"].(string)
restoreCmd, ok := spec.Params["restore_cmd"].(string)
if !ok || restoreCmd == "" {
return nil, errors.New("missing restore_cmd")
}
return &pipeHandler{
spec: spec,
backupCmd: backupCmd,
restoreCmd: restoreCmd,
shell: shell,
}, nil
}
func (h *pipeHandler) Spec() HandlerSpec {
return h.spec
}
func (h *pipeHandler) Backup(ctx context.Context, repo Repository, backup Backup, ds Dataset) (Dataset, error) {
var atoms []Atom
for _, atom := range ds.Atoms {
......
......@@ -6,18 +6,19 @@ import (
)
type mysqlHandler struct {
spec HandlerSpec
addr string
user string
password string
}
func newMySQLHandler(params map[string]interface{}, _ *Shell) (Handler, error) {
addr, ok := params["host"].(string)
func newMySQLHandler(spec HandlerSpec, _ *Shell) (Handler, error) {
addr, ok := spec.Params["host"].(string)
if !ok || addr == "" {
return nil, errors.New("missing host")
}
user, _ := params["user"].(string)
password, _ := params["password"].(string)
user, _ := spec.Params["user"].(string)
password, _ := spec.Params["password"].(string)
return &mysqlHandler{
addr: addr,
user: user,
......@@ -25,6 +26,10 @@ func newMySQLHandler(params map[string]interface{}, _ *Shell) (Handler, error) {
}, nil
}
func (h *mysqlHandler) Spec() HandlerSpec {
return h.spec
}
func (h *mysqlHandler) Backup(ctx context.Context, repo Repository, backup Backup, ds Dataset) (Dataset, error) {
return ds, nil
}
......
......@@ -14,17 +14,22 @@ type HandlerSpec struct {
Type string `yaml:"type"`
Params map[string]interface{} `yaml:"params"`
PreBackupCommand string `yaml:"pre_backup_command"`
PostBackupCommand string `yaml:"post_backup_command"`
PreRestoreCommand string `yaml:"pre_restore_command"`
PostRestoreCommand string `yaml:"post_restore_command"`
}
// Parse a HandlerSpec and return a Handler instance.
func (spec *HandlerSpec) Parse(shell *Shell) (Handler, error) {
switch spec.Type {
case "file":
return newFileHandler(spec.Params, shell)
return newFileHandler(*spec, shell)
case "pipe":
return newPipeHandler(spec.Params, shell)
return newPipeHandler(*spec, shell)
case "mysql":
return newMySQLHandler(spec.Params, shell)
return newMySQLHandler(*spec, shell)
default:
return nil, fmt.Errorf("%s: unknown handler type '%s'", spec.Name, spec.Type)
}
......
......@@ -236,6 +236,7 @@ func (s JobStatus) String() string {
// Status holds information on the current state of a job.
type Status struct {
ID string
Name string
Status JobStatus
StartedAt time.Time
CompletedAt time.Time
......@@ -268,34 +269,35 @@ func NewStateManager() *StateManager {
}
}
func (m *StateManager) setStatusPending(jobID string, j Job) {
func (m *StateManager) setStatusPending(id, name string, j Job) {
m.mx.Lock()
defer m.mx.Unlock()
m.pending[jobID] = &Status{
ID: jobID,
m.pending[id] = &Status{
ID: id,
Name: name,
Status: JobStatusPending,
Job: j,
}
}
func (m *StateManager) setStatusRunning(jobID string) {
func (m *StateManager) setStatusRunning(id string) {
m.mx.Lock()
defer m.mx.Unlock()
js := m.pending[jobID]
js := m.pending[id]
js.StartedAt = time.Now()
js.Status = JobStatusRunning
delete(m.pending, jobID)
m.running[jobID] = js
delete(m.pending, id)
m.running[id] = js
}
func (m *StateManager) setStatusDone(jobID string, err error) {
func (m *StateManager) setStatusDone(id string, err error) {
m.mx.Lock()
defer m.mx.Unlock()
js := m.running[jobID]
delete(m.running, jobID)
js := m.running[id]
delete(m.running, id)
js.CompletedAt = time.Now()
js.Status = JobStatusDone
js.Err = err
......@@ -306,14 +308,15 @@ func (m *StateManager) setStatusDone(jobID string, err error) {
}
}
// WithStatus tracks a job through its lifetime.
func (m *StateManager) WithStatus(j Job) Job {
// WithStatus tracks a job through its lifetime. The name is used for
// display purposes and needs not be unique.
func (m *StateManager) WithStatus(j Job, name string) Job {
sj := &statusJob{
Job: j,
id: util.RandomID(),
mgr: m,
}
m.setStatusPending(sj.id, sj)
m.setStatusPending(sj.id, name, sj)
return sj
}
......@@ -360,6 +363,9 @@ func (j *statusJob) RunContext(ctx context.Context) error {
// SyncGroup runs all the given jobs synchronously, aborting on the
// first error.
func SyncGroup(jobs []Job) Job {
if len(jobs) == 1 {
return jobs[0]
}
return JobFunc(func(ctx context.Context) error {
for _, j := range jobs {
if err := j.RunContext(ctx); err != nil {
......@@ -373,6 +379,9 @@ func SyncGroup(jobs []Job) Job {
// AsyncGroup runs all the given jobs asynchronously, and waits for
// all of them to terminate. It fails if any return an error.
func AsyncGroup(jobs []Job) Job {
if len(jobs) == 1 {
return jobs[0]
}
return JobFunc(func(ctx context.Context) error {
errCh := make(chan error)
defer close(errCh)
......
......@@ -56,29 +56,61 @@ func (m *tabaccoManager) prepareBackupJob(backup Backup) jobs.Job {
})
}
// Backup a single dataset to the repository.
func (m *tabaccoManager) backupDatasetJob(backup Backup, ds Dataset) jobs.Job {
jobID := fmt.Sprintf("backup_%s", ds.Name)
j := jobs.JobFunc(func(ctx context.Context) (err error) {
h, ok := m.configMgr.getHandlerMap()[ds.Handler]
if !ok {
return fmt.Errorf("%s: unknown handler '%s'", ds.Name, ds.Handler)
}
func (m *tabaccoManager) doBackupDataset(ctx context.Context, backup Backup, ds Dataset) error {
h, ok := m.configMgr.getHandler(ds.Handler)
if !ok {
return fmt.Errorf("%s: unknown handler '%s'", ds.Name, ds.Handler)
}
ds, err = h.Backup(ctx, m.configMgr.getRepository(), backup, ds)
if err != nil {
return fmt.Errorf("%s: backup failed: %v", ds.Name, err)
}
if err = m.ms.AddDataset(ctx, backup, ds); err != nil {
return fmt.Errorf("%s: error saving metadata: %v", ds.Name, err)
}
return nil
})
return m.withDefaults(m.WithExclusiveLock(j, jobID, false), "backup")
var err error
ds, err = h.Backup(ctx, m.configMgr.getRepository(), backup, ds)
if err != nil {
return fmt.Errorf("%s: backup failed: %v", ds.Name, err)
}
if err = m.ms.AddDataset(ctx, backup, ds); err != nil {
return fmt.Errorf("%s: error saving metadata: %v", ds.Name, err)
}
return nil
}
func (m *tabaccoManager) withDefaults(j jobs.Job, queue string) jobs.Job {
return m.WithQueue(m.WithStatus(jobs.WithTimeout(j, 24*time.Hour)), queue)
func (m *tabaccoManager) backupDatasetJob(h Handler, backup Backup, ds Dataset) jobs.Job {
var out []jobs.Job
// Run pre_backup_command.
if cmd := h.Spec().PreBackupCommand; cmd != "" {
out = append(out, m.datasetCommandJob(cmd, backup, ds))
}
// The actual backup operation. Just a thin wrapper around
// doBackupDataset() that binds together the context, backup
// and ds via the closure.
out = append(out, jobs.WithTimeout(
jobs.JobFunc(func(ctx context.Context) (err error) {
return m.doBackupDataset(ctx, backup, ds)
}),
24*time.Hour,
))
// Run post_backup_command.
if cmd := h.Spec().PostBackupCommand; cmd != "" {
out = append(out, m.datasetCommandJob(cmd, backup, ds))
}
// Group the jobs (sequentially) if there's more than one of
// them. Give the final job a status and a user-visible name,
// for debugging purposes. Set an exclusive lock with a
// leave-running policy, so no more than one backup per
// datasource can run at any given time. Finally, the job runs
// in the 'backup' queue for concurrency limiting.
id := fmt.Sprintf("backup-dataset-%s", ds.Name)
return m.WithQueue(
m.WithStatus(
m.WithExclusiveLock(jobs.SyncGroup(out), id, false),
id,
),
"backup",
)
}
// BackupJob returns a Job that backs up all known sources to the
......@@ -102,18 +134,25 @@ func (m *tabaccoManager) BackupJob(ctx context.Context, sourceSpecs []SourceSpec
// Run all backup tasks, scheduling them using the executor.
var backupJobs []jobs.Job
for _, ds := range datasets {
// Bind 'ds' to the closure via an anonymous
// function. Required because of the loop.
func(ds Dataset) {
// TODO: set a timeout based on the SourceSpec.
backupJobs = append(backupJobs, m.backupDatasetJob(backup, ds))
}(ds)
}
return backup, m.WithStatus(jobs.SyncGroup([]jobs.Job{
prepJob,
m.WithStatus(jobs.AsyncGroup(backupJobs)),
})), nil
// Find the handler for this source.
h, ok := m.configMgr.getHandler(ds.Handler)
if !ok {
log.Printf("errors parsing source %s: unknown handler '%s'", ds.Name, ds.Handler)
continue
}
// Create the backup job and add it to our list.
backupJobs = append(backupJobs, m.backupDatasetJob(h, backup, ds))
}
// Run the job to initialize the repository before anything else.
j := m.WithStatus(
jobs.SyncGroup([]jobs.Job{
prepJob,
jobs.AsyncGroup(backupJobs),
}),
fmt.Sprintf("backup-%s", backup.ID),
)
return backup, j, nil
}
// Backup just runs the BackupJob synchronously.
......@@ -127,16 +166,54 @@ func (m *tabaccoManager) Backup(ctx context.Context, sourceSpecs []SourceSpec) (
}
// Create a job to restore a single dataset.
func (m *tabaccoManager) restoreDatasetJob(backup Backup, ds Dataset, target string) jobs.Job {
j := jobs.JobFunc(func(ctx context.Context) error {
log.Printf("restoring %+v %+v", ds, backup)
h, ok := m.configMgr.getHandlerMap()[ds.Handler]
if !ok {
return fmt.Errorf("%s: unknown handler '%s'", ds.Name, ds.Handler)
}
return h.Restore(ctx, m.configMgr.getRepository(), backup, ds, target)
})
return m.withDefaults(j, "restore")
func (m *tabaccoManager) doRestoreDataset(ctx context.Context, backup Backup, ds Dataset, target string) error {
log.Printf("restoring %+v %+v", ds, backup)
h, ok := m.configMgr.getHandler(ds.Handler)
if !ok {
return fmt.Errorf("%s: unknown handler '%s'", ds.Name, ds.Handler)
}
return h.Restore(ctx, m.configMgr.getRepository(), backup, ds, target)
}
func (m *tabaccoManager) restoreDatasetJob(h Handler, backup Backup, ds Dataset, target string) jobs.Job {
var out []jobs.Job
// Run pre_restore_command.
if cmd := h.Spec().PreRestoreCommand; cmd != "" {
out = append(out, m.datasetCommandJob(cmd, backup, ds))
}
// The actual backup operation. Just a thin wrapper around
// doBackupDataset() that binds together the context, backup,
// ds and target via the closure.
out = append(out, jobs.WithTimeout(
jobs.JobFunc(func(ctx context.Context) (err error) {
return m.doRestoreDataset(ctx, backup, ds, target)
}),
24*time.Hour,
))
// Run post_restore_command.
if cmd := h.Spec().PostRestoreCommand; cmd != "" {
out = append(out, m.datasetCommandJob(cmd, backup, ds))
}
// Group the jobs (sequentially) if there's more than one of
// them. Give the final job a status and a user-visible name,
// for debugging purposes. Set an exclusive lock with a
// leave-running policy, so no more than one restore per
// datasource can run at any given time. Finally, the job runs
// in the 'restore' queue for concurrency limiting.
id := fmt.Sprintf("restore_%s", ds.Name)
return m.WithQueue(
m.WithStatus(
m.WithExclusiveLock(jobs.SyncGroup(out), id, false),
id,
),
"restore",
)
}
// RestoreJob creates a job that restores the results of the
......@@ -151,12 +228,19 @@ func (m *tabaccoManager) RestoreJob(ctx context.Context, req FindRequest, target
var restoreJobs []jobs.Job
for _, vv := range versions {
func(backup Backup, ds Dataset) {
restoreJobs = append(restoreJobs, m.restoreDatasetJob(backup, ds, target))
}(vv[0].Backup, vv[0].Dataset)
ds := vv[0].Dataset
backup := vv[0].Backup
h, ok := m.configMgr.getHandler(ds.Handler)
if !ok {
log.Printf("%s: unknown handler '%s'", ds.Name, ds.Handler)
continue
}
restoreJobs = append(restoreJobs, m.restoreDatasetJob(h, backup, ds, target))
}
return m.WithStatus(jobs.AsyncGroup(restoreJobs)), nil
return m.WithStatus(jobs.AsyncGroup(restoreJobs), fmt.Sprintf("restore_%s", util.RandomID())), nil
}
// Restore just runs the RestoreJob synchronously.
......@@ -180,3 +264,13 @@ func newBackup(host string) Backup {
Timestamp: time.Now(),
}
}
func (m *tabaccoManager) datasetCommandJob(cmd string, backup Backup, ds Dataset) jobs.Job {
env := map[string]string{
"BACKUP_ID": backup.ID,
"DATASET_NAME": ds.Name,
}
return jobs.JobFunc(func(ctx context.Context) error {
return m.configMgr.getShell().RunWithEnv(ctx, cmd, env)
})
}
......@@ -2,6 +2,7 @@ package tabacco
import (
"context"
"fmt"
"io"
"log"
"os"
......@@ -109,6 +110,19 @@ func (s *Shell) Run(ctx context.Context, arg string) error {
return c.Run()
}
// RunWithEnv runs a command with additional environment variables.
func (s *Shell) RunWithEnv(ctx context.Context, arg string, envMap map[string]string) error {
c := s.command(ctx, arg)
var env []string
copy(env, os.Environ())
for k, v := range envMap {
env = append(env, fmt.Sprintf("%s=%s", k, v))
}
c.Env = env
c.Stdout = os.Stdout
return c.Run()
}
// Output runs a command and returns the standard output.
func (s *Shell) Output(ctx context.Context, arg string) ([]byte, error) {
return s.command(ctx, arg).Output()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment