manager.go 8.24 KB
Newer Older
ale's avatar
ale committed
1 2 3 4
package tabacco

import (
	"context"
ale's avatar
ale committed
5
	"errors"
ale's avatar
ale committed
6 7 8 9
	"fmt"
	"log"
	"os"
	"time"
10

ale's avatar
ale committed
11 12
	"git.autistici.org/ai3/tools/tabacco/jobs"
	"git.autistici.org/ai3/tools/tabacco/util"
ale's avatar
ale committed
13 14 15
)

// Manager for backups and restores.
16
type tabaccoManager struct {
17 18 19
	*jobs.ExclusiveLockManager
	*jobs.QueueManager
	*jobs.StateManager
20
	*workdirManager
ale's avatar
ale committed
21

ale's avatar
ale committed
22
	configMgr *ConfigManager
ale's avatar
ale committed
23
	ms        MetadataStore
ale's avatar
ale committed
24 25 26
}

// NewManager creates a new Manager.
ale's avatar
ale committed
27
func NewManager(ctx context.Context, configMgr *ConfigManager, ms MetadataStore) (Manager, error) {
28 29
	// If we can't create a workdirManager, it probably means we
	// don't have permissions to the WorkDir, which is bad.
ale's avatar
ale committed
30
	wm, err := newWorkdirManager(configMgr.current().WorkDir())
31 32 33 34
	if err != nil {
		return nil, err
	}

ale's avatar
ale committed
35
	// Note: the queue configuration won't be reloaded.
36
	return &tabaccoManager{
37
		ExclusiveLockManager: jobs.NewExclusiveLockManager(),
ale's avatar
ale committed
38
		QueueManager:         jobs.NewQueueManager(configMgr.current().QueueSpec()),
39
		StateManager:         jobs.NewStateManager(),
40
		workdirManager:       wm,
ale's avatar
ale committed
41

ale's avatar
ale committed
42 43
		configMgr: configMgr,
		ms:        ms,
ale's avatar
ale committed
44 45 46 47 48
	}, nil
}

// Close the Manager and free all associated resources (those owned by
// this object).
49
func (m *tabaccoManager) Close() error {
ale's avatar
ale committed
50 51
	m.workdirManager.Close()

ale's avatar
ale committed
52
	return nil
ale's avatar
ale committed
53 54
}

ale's avatar
ale committed
55 56 57
type metadataJob struct {
	jobs.Job
	ms     MetadataStore
ale's avatar
ale committed
58 59
	backup *Backup
	ds     *Dataset
ale's avatar
ale committed
60 61 62 63 64 65
}

func (j *metadataJob) RunContext(ctx context.Context) error {
	err := j.Job.RunContext(ctx)
	if err == nil {
		if merr := j.ms.AddDataset(ctx, j.backup, j.ds); merr != nil {
66
			log.Printf("%s@%s: error saving metadata: %v", j.ds.Source, j.ds.ID, merr)
ale's avatar
ale committed
67 68 69 70 71
		}
	}
	return err
}

ale's avatar
ale committed
72
func (m *tabaccoManager) withMetadata(j jobs.Job, backup *Backup, ds *Dataset) jobs.Job {
ale's avatar
ale committed
73 74 75 76 77 78 79 80
	return &metadataJob{
		Job:    j,
		ms:     m.ms,
		backup: backup,
		ds:     ds,
	}
}

ale's avatar
ale committed
81 82 83
// Prepare the repository for a new backup. This is a synchronous
// operation: we need to wait for it to complete to avoid running the
// backup tasks too soon.
ale's avatar
ale committed
84
func (m *tabaccoManager) prepareBackupJob(rctx RuntimeContext, backup *Backup) jobs.Job {
85
	return jobs.JobFunc(func(ctx context.Context) error {
ale's avatar
ale committed
86
		return rctx.Repo().Init(ctx, rctx)
ale's avatar
ale committed
87 88
		//log.Printf("preparing backup %s", backup.ID)
		//return repo.Prepare(ctx, backup)
ale's avatar
ale committed
89
	})
ale's avatar
ale committed
90 91
}

ale's avatar
ale committed
92
func (m *tabaccoManager) wrapWithCommands(rctx RuntimeContext, initJob, backupJob jobs.Job, pre, post string) jobs.Job {
93
	var out []jobs.Job
ale's avatar
ale committed
94 95 96 97 98 99 100 101 102 103
	if initJob != nil {
		out = append(out, initJob)
	}
	if pre != "" {
		out = append(out, m.commandJob(rctx, pre))
	}
	out = append(out, backupJob)
	if post != "" {
		out = append(out, m.commandJob(rctx, post))
	}
104

ale's avatar
ale committed
105 106 107 108 109
	if len(out) == 1 {
		return out[0]
	}
	return jobs.SyncGroup(out)
}
ale's avatar
ale committed
110

ale's avatar
ale committed
111 112 113 114 115 116
func (m *tabaccoManager) makeBackupJob(ctx context.Context, rctx RuntimeContext, backup *Backup, src *SourceSpec) (jobs.Job, error) {
	// Compile the source and the associated Handler.
	dsl, err := src.Parse(ctx)
	if err != nil {
		return nil, err
	}
ale's avatar
ale committed
117 118
	hspec := rctx.HandlerSpec(src.Handler)
	if hspec == nil {
ale's avatar
ale committed
119 120 121 122 123
		return nil, fmt.Errorf("unknown handler '%s'", src.Handler)
	}
	h, err := hspec.Parse(src)
	if err != nil {
		return nil, err
124 125
	}

ale's avatar
ale committed
126 127 128 129 130
	// The actual backup operation. Assemble all the backup jobs
	// for the datasets in 'dsl', and add them to an AsyncGroup.
	//
	// TODO: get the timeout from the SourceSpec.
	var backupJobs []jobs.Job
ale's avatar
ale committed
131
	for _, ds := range dsl {
ale's avatar
ale committed
132
		backupJobs = append(backupJobs, m.withMetadata(
ale's avatar
ale committed
133
			h.BackupJob(rctx, backup, ds),
ale's avatar
ale committed
134
			backup,
ale's avatar
ale committed
135
			ds,
ale's avatar
ale committed
136 137
		))
	}
ale's avatar
ale committed
138 139 140 141 142 143 144 145

	backupJob := m.wrapWithCommands(
		rctx,
		m.prepareBackupJob(rctx, backup),
		jobs.WithTimeout(jobs.AsyncGroup(backupJobs), src.Timeout),
		src.PreBackupCommand,
		src.PostBackupCommand,
	)
146 147 148 149 150 151 152

	// Group the jobs (sequentially) if there's more than one of
	// them. Give the final job a status and a user-visible name,
	// for debugging purposes. Set an exclusive lock with a
	// leave-running policy, so no more than one backup per
	// datasource can run at any given time. Finally, the job runs
	// in the 'backup' queue for concurrency limiting.
153 154
	//
	// Oh, and here is where we add per-dataset instrumentation.
ale's avatar
ale committed
155
	id := fmt.Sprintf("backup-source-%s", src.Name)
156 157
	return m.WithQueue(
		m.WithStatus(
158
			m.WithExclusiveLock(
159 160
				m.withWorkDir(
					withInstrumentation(
ale's avatar
ale committed
161 162
						backupJob,
						src.Name,
163
					),
164 165 166
				),
				id,
				false),
167 168 169
			id,
		),
		"backup",
ale's avatar
ale committed
170
	), nil
ale's avatar
ale committed
171 172
}

ale's avatar
ale committed
173 174 175
func (m *tabaccoManager) BackupJob(ctx context.Context, src *SourceSpec) (*Backup, jobs.Job, error) {
	// Create a new Backup.
	b := newBackup("")
ale's avatar
ale committed
176

ale's avatar
ale committed
177
	// Create a RuntimeContext.
ale's avatar
ale committed
178
	rctx := m.configMgr.NewRuntimeContext()
179

ale's avatar
ale committed
180 181
	j, err := m.makeBackupJob(ctx, rctx, b, src)
	return b, j, err
ale's avatar
ale committed
182 183 184
}

// Backup just runs the BackupJob synchronously.
ale's avatar
ale committed
185 186
func (m *tabaccoManager) Backup(ctx context.Context, src *SourceSpec) (*Backup, error) {
	backup, job, err := m.BackupJob(ctx, src)
ale's avatar
ale committed
187 188
	if err != nil {
		return backup, err
ale's avatar
ale committed
189
	}
ale's avatar
ale committed
190 191 192
	err = job.RunContext(ctx)
	return backup, err
}
ale's avatar
ale committed
193

ale's avatar
ale committed
194 195
func (m *tabaccoManager) makeRestoreJob(rctx RuntimeContext, backup *Backup, src *SourceSpec, dsl []*Dataset, target string) (jobs.Job, error) {
	// Just need the Handler.
ale's avatar
ale committed
196 197
	hspec := rctx.HandlerSpec(src.Handler)
	if hspec == nil {
ale's avatar
ale committed
198 199 200 201 202
		return nil, fmt.Errorf("unknown handler '%s'", src.Handler)
	}
	h, err := hspec.Parse(src)
	if err != nil {
		return nil, err
203 204 205 206 207
	}

	// The actual backup operation. Just a thin wrapper around
	// doBackupDataset() that binds together the context, backup,
	// ds and target via the closure.
ale's avatar
ale committed
208
	var restoreJobs []jobs.Job
ale's avatar
ale committed
209 210 211
	for _, ds := range dsl {
		restoreJobs = append(
			restoreJobs,
ale's avatar
ale committed
212
			h.RestoreJob(rctx, backup, ds, target),
ale's avatar
ale committed
213
		)
ale's avatar
ale committed
214
	}
215

ale's avatar
ale committed
216 217 218 219 220 221 222
	restoreJob := m.wrapWithCommands(
		rctx,
		nil,
		jobs.AsyncGroup(restoreJobs),
		src.PreRestoreCommand,
		src.PostRestoreCommand,
	)
223 224 225 226 227 228 229

	// Group the jobs (sequentially) if there's more than one of
	// them. Give the final job a status and a user-visible name,
	// for debugging purposes. Set an exclusive lock with a
	// leave-running policy, so no more than one restore per
	// datasource can run at any given time. Finally, the job runs
	// in the 'restore' queue for concurrency limiting.
ale's avatar
ale committed
230
	id := fmt.Sprintf("restore-source-%s", src.Name)
231 232
	return m.WithQueue(
		m.WithStatus(
ale's avatar
ale committed
233 234 235 236
			m.WithExclusiveLock(
				restoreJob,
				id,
				false),
237 238 239
			id,
		),
		"restore",
ale's avatar
ale committed
240 241 242 243 244 245 246 247 248
	), nil
}

func groupDatasetsBySource(dsl []*Dataset) map[string][]*Dataset {
	m := make(map[string][]*Dataset)
	for _, ds := range dsl {
		m[ds.Source] = append(m[ds.Source], ds)
	}
	return m
ale's avatar
ale committed
249 250
}

ale's avatar
ale committed
251 252
// RestoreJob creates a job that restores the results of the
// FindRequest (with NumVersions=1) onto the given target directory.
ale's avatar
ale committed
253 254 255 256
func (m *tabaccoManager) RestoreJob(ctx context.Context, req *FindRequest, target string) (jobs.Job, error) {
	// Find the atoms relevant to this restore. The results will
	// be grouped in Backups and Datasets that only include the
	// relevant Atoms.
257
	req.NumVersions = 1
ale's avatar
ale committed
258
	backups, err := m.ms.FindAtoms(ctx, req)
ale's avatar
ale committed
259
	if err != nil {
ale's avatar
ale committed
260
		return nil, err
ale's avatar
ale committed
261
	}
ale's avatar
ale committed
262 263 264 265 266
	if len(backups) == 0 {
		return nil, errors.New("no results found for query")
	}

	// Create a RuntimeContext.
ale's avatar
ale committed
267
	rctx := m.configMgr.NewRuntimeContext()
ale's avatar
ale committed
268

269
	var restoreJobs []jobs.Job
ale's avatar
ale committed
270 271 272 273 274
	merr := new(util.MultiError)
	for _, b := range backups {
		// Group the datasets by source, find the source and create the restore jobs.
		for srcName, dsl := range groupDatasetsBySource(b.Datasets) {

ale's avatar
ale committed
275
			src := rctx.FindSource(srcName)
ale's avatar
ale committed
276 277 278 279 280 281 282 283 284 285 286
			if src == nil {
				merr.Add(fmt.Errorf("unknown source '%s'", srcName))
				continue
			}

			j, err := m.makeRestoreJob(rctx, b, src, dsl, target)
			if err != nil {
				merr.Add(fmt.Errorf("source %s: %v", srcName, err))
				continue
			}
			restoreJobs = append(restoreJobs, j)
287
		}
ale's avatar
ale committed
288 289
	}

ale's avatar
ale committed
290
	return m.WithStatus(jobs.AsyncGroup(restoreJobs), fmt.Sprintf("restore_%s", util.RandomID())), merr.OrNil()
ale's avatar
ale committed
291 292 293
}

// Restore just runs the RestoreJob synchronously.
ale's avatar
ale committed
294
func (m *tabaccoManager) Restore(ctx context.Context, req *FindRequest, target string) error {
ale's avatar
ale committed
295 296 297
	job, err := m.RestoreJob(ctx, req, target)
	if err != nil {
		return err
ale's avatar
ale committed
298
	}
ale's avatar
ale committed
299
	return job.RunContext(ctx)
ale's avatar
ale committed
300 301 302 303
}

// Create a new Backup object with its own unique ID (which actually
// consists of 16 random bytes, hex-encoded).
ale's avatar
ale committed
304
func newBackup(host string) *Backup {
ale's avatar
ale committed
305
	if host == "" {
306
		host, _ = os.Hostname() // nolint
ale's avatar
ale committed
307
	}
ale's avatar
ale committed
308
	return &Backup{
309
		ID:        util.RandomID(),
ale's avatar
ale committed
310 311 312 313
		Host:      host,
		Timestamp: time.Now(),
	}
}
314

ale's avatar
ale committed
315
func (m *tabaccoManager) commandJob(rctx RuntimeContext, cmd string) jobs.Job {
316
	return jobs.JobFunc(func(ctx context.Context) error {
ale's avatar
ale committed
317
		return rctx.Shell().Run(ctx, cmd)
318 319
	})
}