manager.go 8.04 KB
Newer Older
ale's avatar
ale committed
1 2 3 4 5 6 7 8 9
package tabacco

import (
	"context"
	"errors"
	"fmt"
	"log"
	"os"
	"time"
ale's avatar
ale committed
10 11 12

	"git.autistici.org/ale/tabacco/jobs"
	"git.autistici.org/ale/tabacco/util"
ale's avatar
ale committed
13 14 15
)

// Manager for backups and restores.
16
type tabaccoManager struct {
ale's avatar
ale committed
17 18 19
	*jobs.ExclusiveLockManager
	*jobs.QueueManager
	*jobs.StateManager
ale's avatar
ale committed
20

ale's avatar
ale committed
21
	configMgr *ConfigManager
ale's avatar
ale committed
22
	ms        MetadataStore
ale's avatar
ale committed
23 24 25
}

// NewManager creates a new Manager.
ale's avatar
ale committed
26
func NewManager(ctx context.Context, configMgr *ConfigManager, ms MetadataStore) (Manager, error) {
ale's avatar
ale committed
27
	// Note: the queue configuration won't be reloaded.
28
	return &tabaccoManager{
ale's avatar
ale committed
29 30 31
		ExclusiveLockManager: jobs.NewExclusiveLockManager(),
		QueueManager:         jobs.NewQueueManager(configMgr.getQueueSpec()),
		StateManager:         jobs.NewStateManager(),
ale's avatar
ale committed
32

ale's avatar
ale committed
33 34
		configMgr: configMgr,
		ms:        ms,
ale's avatar
ale committed
35 36 37 38 39
	}, nil
}

// Close the Manager and free all associated resources (those owned by
// this object).
40
func (m *tabaccoManager) Close() error {
ale's avatar
ale committed
41
	return nil
ale's avatar
ale committed
42 43 44 45 46
}

// Prepare the repository for a new backup. This is a synchronous
// operation: we need to wait for it to complete to avoid running the
// backup tasks too soon.
ale's avatar
ale committed
47 48
func (m *tabaccoManager) prepareBackupJob(backup Backup) jobs.Job {
	return jobs.JobFunc(func(ctx context.Context) error {
ale's avatar
ale committed
49
		repo := m.configMgr.getRepository()
ale's avatar
ale committed
50
		if err := repo.Init(ctx); err != nil {
ale's avatar
ale committed
51 52 53
			log.Printf("repository init failed: %v", err)
			return err
		}
ale's avatar
ale committed
54 55
		log.Printf("preparing backup %s", backup.ID)
		return repo.Prepare(ctx, backup)
ale's avatar
ale committed
56
	})
ale's avatar
ale committed
57 58
}

59 60 61 62 63
func (m *tabaccoManager) doBackupDataset(ctx context.Context, backup Backup, ds Dataset) error {
	h, ok := m.configMgr.getHandler(ds.Handler)
	if !ok {
		return fmt.Errorf("%s: unknown handler '%s'", ds.Name, ds.Handler)
	}
ale's avatar
ale committed
64

65 66 67 68 69 70 71 72 73 74
	var err error
	ds, err = h.Backup(ctx, m.configMgr.getRepository(), backup, ds)
	if err != nil {
		return fmt.Errorf("%s: backup failed: %v", ds.Name, err)
	}

	if err = m.ms.AddDataset(ctx, backup, ds); err != nil {
		return fmt.Errorf("%s: error saving metadata: %v", ds.Name, err)
	}
	return nil
ale's avatar
ale committed
75 76
}

77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
func (m *tabaccoManager) backupDatasetJob(h Handler, backup Backup, ds Dataset) jobs.Job {
	var out []jobs.Job

	// Run pre_backup_command.
	if cmd := h.Spec().PreBackupCommand; cmd != "" {
		out = append(out, m.datasetCommandJob(cmd, backup, ds))
	}

	// The actual backup operation. Just a thin wrapper around
	// doBackupDataset() that binds together the context, backup
	// and ds via the closure.
	out = append(out, jobs.WithTimeout(
		jobs.JobFunc(func(ctx context.Context) (err error) {
			return m.doBackupDataset(ctx, backup, ds)
		}),
		24*time.Hour,
	))

	// Run post_backup_command.
	if cmd := h.Spec().PostBackupCommand; cmd != "" {
		out = append(out, m.datasetCommandJob(cmd, backup, ds))
	}

	// Group the jobs (sequentially) if there's more than one of
	// them. Give the final job a status and a user-visible name,
	// for debugging purposes. Set an exclusive lock with a
	// leave-running policy, so no more than one backup per
	// datasource can run at any given time. Finally, the job runs
	// in the 'backup' queue for concurrency limiting.
	id := fmt.Sprintf("backup-dataset-%s", ds.Name)
	return m.WithQueue(
		m.WithStatus(
			m.WithExclusiveLock(jobs.SyncGroup(out), id, false),
			id,
		),
		"backup",
	)
ale's avatar
ale committed
114 115 116 117
}

// BackupJob returns a Job that backs up all known sources to the
// configured destination repository.
ale's avatar
ale committed
118
func (m *tabaccoManager) BackupJob(ctx context.Context, sourceSpecs []SourceSpec) (Backup, jobs.Job, error) {
ale's avatar
ale committed
119 120 121 122 123 124 125
	// Parse the source specs and obtain Datasets. Errors here are
	// logged but *not* fatal, unless there are errors and the
	// list of non-erroring sources is nil.
	datasets, err := parseSourceSpecs(ctx, sourceSpecs)
	if err != nil {
		log.Printf("errors parsing sources: %v", err)
		if len(datasets) == 0 {
ale's avatar
ale committed
126
			return Backup{}, nil, errors.New("all sources have errors")
ale's avatar
ale committed
127 128 129 130 131
		}
	}

	// Create a new backup and initialize it .
	backup := newBackup("")
ale's avatar
ale committed
132
	prepJob := m.prepareBackupJob(backup)
ale's avatar
ale committed
133 134

	// Run all backup tasks, scheduling them using the executor.
ale's avatar
ale committed
135
	var backupJobs []jobs.Job
ale's avatar
ale committed
136
	for _, ds := range datasets {
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
		// Find the handler for this source.
		h, ok := m.configMgr.getHandler(ds.Handler)
		if !ok {
			log.Printf("errors parsing source %s: unknown handler '%s'", ds.Name, ds.Handler)
			continue
		}
		// Create the backup job and add it to our list.
		backupJobs = append(backupJobs, m.backupDatasetJob(h, backup, ds))
	}

	// Run the job to initialize the repository before anything else.
	j := m.WithStatus(
		jobs.SyncGroup([]jobs.Job{
			prepJob,
			jobs.AsyncGroup(backupJobs),
		}),
		fmt.Sprintf("backup-%s", backup.ID),
	)
	return backup, j, nil
ale's avatar
ale committed
156 157 158 159 160 161 162
}

// Backup just runs the BackupJob synchronously.
func (m *tabaccoManager) Backup(ctx context.Context, sourceSpecs []SourceSpec) (Backup, error) {
	backup, job, err := m.BackupJob(ctx, sourceSpecs)
	if err != nil {
		return backup, err
ale's avatar
ale committed
163
	}
ale's avatar
ale committed
164 165 166
	err = job.RunContext(ctx)
	return backup, err
}
ale's avatar
ale committed
167

ale's avatar
ale committed
168
// Create a job to restore a single dataset.
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
func (m *tabaccoManager) doRestoreDataset(ctx context.Context, backup Backup, ds Dataset, target string) error {
	log.Printf("restoring %+v %+v", ds, backup)

	h, ok := m.configMgr.getHandler(ds.Handler)
	if !ok {
		return fmt.Errorf("%s: unknown handler '%s'", ds.Name, ds.Handler)
	}

	return h.Restore(ctx, m.configMgr.getRepository(), backup, ds, target)
}

func (m *tabaccoManager) restoreDatasetJob(h Handler, backup Backup, ds Dataset, target string) jobs.Job {
	var out []jobs.Job

	// Run pre_restore_command.
	if cmd := h.Spec().PreRestoreCommand; cmd != "" {
		out = append(out, m.datasetCommandJob(cmd, backup, ds))
	}

	// The actual backup operation. Just a thin wrapper around
	// doBackupDataset() that binds together the context, backup,
	// ds and target via the closure.
	out = append(out, jobs.WithTimeout(
		jobs.JobFunc(func(ctx context.Context) (err error) {
			return m.doRestoreDataset(ctx, backup, ds, target)
		}),
		24*time.Hour,
	))

	// Run post_restore_command.
	if cmd := h.Spec().PostRestoreCommand; cmd != "" {
		out = append(out, m.datasetCommandJob(cmd, backup, ds))
	}

	// Group the jobs (sequentially) if there's more than one of
	// them. Give the final job a status and a user-visible name,
	// for debugging purposes. Set an exclusive lock with a
	// leave-running policy, so no more than one restore per
	// datasource can run at any given time. Finally, the job runs
	// in the 'restore' queue for concurrency limiting.
	id := fmt.Sprintf("restore_%s", ds.Name)
	return m.WithQueue(
		m.WithStatus(
			m.WithExclusiveLock(jobs.SyncGroup(out), id, false),
			id,
		),
		"restore",
	)
ale's avatar
ale committed
217 218
}

ale's avatar
ale committed
219 220
// RestoreJob creates a job that restores the results of the
// FindRequest (with NumVersions=1) onto the given target directory.
ale's avatar
ale committed
221
func (m *tabaccoManager) RestoreJob(ctx context.Context, req FindRequest, target string) (jobs.Job, error) {
ale's avatar
ale committed
222
	// Find the atoms relevant to this restore.
223
	req.NumVersions = 1
ale's avatar
ale committed
224 225
	versions, err := m.ms.FindAtoms(ctx, req)
	if err != nil {
ale's avatar
ale committed
226
		return nil, err
ale's avatar
ale committed
227 228
	}

ale's avatar
ale committed
229
	var restoreJobs []jobs.Job
ale's avatar
ale committed
230
	for _, vv := range versions {
231 232 233 234 235 236 237 238 239 240
		ds := vv[0].Dataset
		backup := vv[0].Backup

		h, ok := m.configMgr.getHandler(ds.Handler)
		if !ok {
			log.Printf("%s: unknown handler '%s'", ds.Name, ds.Handler)
			continue
		}

		restoreJobs = append(restoreJobs, m.restoreDatasetJob(h, backup, ds, target))
ale's avatar
ale committed
241 242
	}

243
	return m.WithStatus(jobs.AsyncGroup(restoreJobs), fmt.Sprintf("restore_%s", util.RandomID())), nil
ale's avatar
ale committed
244 245 246 247 248 249 250
}

// Restore just runs the RestoreJob synchronously.
func (m *tabaccoManager) Restore(ctx context.Context, req FindRequest, target string) error {
	job, err := m.RestoreJob(ctx, req, target)
	if err != nil {
		return err
ale's avatar
ale committed
251
	}
ale's avatar
ale committed
252
	return job.RunContext(ctx)
ale's avatar
ale committed
253 254 255 256 257 258
}

// Create a new Backup object with its own unique ID (which actually
// consists of 16 random bytes, hex-encoded).
func newBackup(host string) Backup {
	if host == "" {
259
		host, _ = os.Hostname() // nolint
ale's avatar
ale committed
260 261
	}
	return Backup{
ale's avatar
ale committed
262
		ID:        util.RandomID(),
ale's avatar
ale committed
263 264 265 266
		Host:      host,
		Timestamp: time.Now(),
	}
}
267 268 269 270 271 272 273 274 275 276

func (m *tabaccoManager) datasetCommandJob(cmd string, backup Backup, ds Dataset) jobs.Job {
	env := map[string]string{
		"BACKUP_ID":    backup.ID,
		"DATASET_NAME": ds.Name,
	}
	return jobs.JobFunc(func(ctx context.Context) error {
		return m.configMgr.getShell().RunWithEnv(ctx, cmd, env)
	})
}