manager.go 8.34 KB
Newer Older
ale's avatar
ale committed
1 2 3 4
package tabacco

import (
	"context"
ale's avatar
ale committed
5
	"errors"
ale's avatar
ale committed
6 7 8 9
	"fmt"
	"log"
	"os"
	"time"
10

ale's avatar
ale committed
11 12
	"git.autistici.org/ai3/tools/tabacco/jobs"
	"git.autistici.org/ai3/tools/tabacco/util"
ale's avatar
ale committed
13 14 15
)

// Manager for backups and restores.
16
type tabaccoManager struct {
17 18 19
	*jobs.ExclusiveLockManager
	*jobs.QueueManager
	*jobs.StateManager
20
	*workdirManager
ale's avatar
ale committed
21

ale's avatar
ale committed
22
	configMgr *ConfigManager
ale's avatar
ale committed
23
	ms        MetadataStore
ale's avatar
ale committed
24 25 26
}

// NewManager creates a new Manager.
ale's avatar
ale committed
27
func NewManager(ctx context.Context, configMgr *ConfigManager, ms MetadataStore) (Manager, error) {
28 29 30 31 32 33 34
	// If we can't create a workdirManager, it probably means we
	// don't have permissions to the WorkDir, which is bad.
	wm, err := newWorkdirManager(configMgr.getWorkDir())
	if err != nil {
		return nil, err
	}

ale's avatar
ale committed
35
	// Note: the queue configuration won't be reloaded.
36
	return &tabaccoManager{
37 38 39
		ExclusiveLockManager: jobs.NewExclusiveLockManager(),
		QueueManager:         jobs.NewQueueManager(configMgr.getQueueSpec()),
		StateManager:         jobs.NewStateManager(),
40
		workdirManager:       wm,
ale's avatar
ale committed
41

ale's avatar
ale committed
42 43
		configMgr: configMgr,
		ms:        ms,
ale's avatar
ale committed
44 45 46 47 48
	}, nil
}

// Close the Manager and free all associated resources (those owned by
// this object).
49
func (m *tabaccoManager) Close() error {
ale's avatar
ale committed
50 51
	m.workdirManager.Close()

ale's avatar
ale committed
52
	return nil
ale's avatar
ale committed
53 54
}

ale's avatar
ale committed
55 56 57
type metadataJob struct {
	jobs.Job
	ms     MetadataStore
ale's avatar
ale committed
58 59
	backup *Backup
	ds     *Dataset
ale's avatar
ale committed
60 61 62 63 64 65 66 67 68 69 70 71
}

func (j *metadataJob) RunContext(ctx context.Context) error {
	err := j.Job.RunContext(ctx)
	if err == nil {
		if merr := j.ms.AddDataset(ctx, j.backup, j.ds); merr != nil {
			log.Printf("%s: error saving metadata: %v", j.ds.Name, merr)
		}
	}
	return err
}

ale's avatar
ale committed
72
func (m *tabaccoManager) withMetadata(j jobs.Job, backup *Backup, ds *Dataset) jobs.Job {
ale's avatar
ale committed
73 74 75 76 77 78 79 80
	return &metadataJob{
		Job:    j,
		ms:     m.ms,
		backup: backup,
		ds:     ds,
	}
}

ale's avatar
ale committed
81 82 83
// Prepare the repository for a new backup. This is a synchronous
// operation: we need to wait for it to complete to avoid running the
// backup tasks too soon.
ale's avatar
ale committed
84 85
func (m *tabaccoManager) prepareBackupJob(rctx RuntimeContext, backup *Backup) jobs.Job {
	repo := m.configMgr.getRepository()
86
	return jobs.JobFunc(func(ctx context.Context) error {
ale's avatar
ale committed
87 88 89
		return repo.Init(ctx, rctx)
		//log.Printf("preparing backup %s", backup.ID)
		//return repo.Prepare(ctx, backup)
ale's avatar
ale committed
90
	})
ale's avatar
ale committed
91 92
}

ale's avatar
ale committed
93
func (m *tabaccoManager) wrapWithCommands(rctx RuntimeContext, initJob, backupJob jobs.Job, pre, post string) jobs.Job {
94
	var out []jobs.Job
ale's avatar
ale committed
95 96 97 98 99 100 101 102 103 104
	if initJob != nil {
		out = append(out, initJob)
	}
	if pre != "" {
		out = append(out, m.commandJob(rctx, pre))
	}
	out = append(out, backupJob)
	if post != "" {
		out = append(out, m.commandJob(rctx, post))
	}
105

ale's avatar
ale committed
106 107 108 109 110
	if len(out) == 1 {
		return out[0]
	}
	return jobs.SyncGroup(out)
}
ale's avatar
ale committed
111

ale's avatar
ale committed
112 113 114 115 116 117 118 119 120 121 122 123 124
func (m *tabaccoManager) makeBackupJob(ctx context.Context, rctx RuntimeContext, backup *Backup, src *SourceSpec) (jobs.Job, error) {
	// Compile the source and the associated Handler.
	dsl, err := src.Parse(ctx)
	if err != nil {
		return nil, err
	}
	hspec, ok := m.configMgr.getHandlerSpec(src.Handler)
	if !ok {
		return nil, fmt.Errorf("unknown handler '%s'", src.Handler)
	}
	h, err := hspec.Parse(src)
	if err != nil {
		return nil, err
125 126
	}

ale's avatar
ale committed
127 128 129 130 131
	// The actual backup operation. Assemble all the backup jobs
	// for the datasets in 'dsl', and add them to an AsyncGroup.
	//
	// TODO: get the timeout from the SourceSpec.
	var backupJobs []jobs.Job
ale's avatar
ale committed
132 133
	repo := m.configMgr.getRepository()
	for _, ds := range dsl {
ale's avatar
ale committed
134
		backupJobs = append(backupJobs, m.withMetadata(
ale's avatar
ale committed
135
			h.BackupJob(rctx, repo, backup, ds),
ale's avatar
ale committed
136
			backup,
ale's avatar
ale committed
137
			ds,
ale's avatar
ale committed
138 139
		))
	}
ale's avatar
ale committed
140 141 142 143 144 145 146 147

	backupJob := m.wrapWithCommands(
		rctx,
		m.prepareBackupJob(rctx, backup),
		jobs.WithTimeout(jobs.AsyncGroup(backupJobs), src.Timeout),
		src.PreBackupCommand,
		src.PostBackupCommand,
	)
148 149 150 151 152 153 154

	// Group the jobs (sequentially) if there's more than one of
	// them. Give the final job a status and a user-visible name,
	// for debugging purposes. Set an exclusive lock with a
	// leave-running policy, so no more than one backup per
	// datasource can run at any given time. Finally, the job runs
	// in the 'backup' queue for concurrency limiting.
155 156
	//
	// Oh, and here is where we add per-dataset instrumentation.
ale's avatar
ale committed
157
	id := fmt.Sprintf("backup-source-%s", src.Name)
158 159
	return m.WithQueue(
		m.WithStatus(
160
			m.WithExclusiveLock(
161 162
				m.withWorkDir(
					withInstrumentation(
ale's avatar
ale committed
163 164
						backupJob,
						src.Name,
165
					),
166 167 168
				),
				id,
				false),
169 170 171
			id,
		),
		"backup",
ale's avatar
ale committed
172
	), nil
ale's avatar
ale committed
173 174
}

ale's avatar
ale committed
175 176 177
func (m *tabaccoManager) BackupJob(ctx context.Context, src *SourceSpec) (*Backup, jobs.Job, error) {
	// Create a new Backup.
	b := newBackup("")
ale's avatar
ale committed
178

ale's avatar
ale committed
179 180
	// Create a RuntimeContext.
	rctx := m.configMgr.newRuntimeContext()
181

ale's avatar
ale committed
182 183
	j, err := m.makeBackupJob(ctx, rctx, b, src)
	return b, j, err
ale's avatar
ale committed
184 185 186
}

// Backup just runs the BackupJob synchronously.
ale's avatar
ale committed
187 188
func (m *tabaccoManager) Backup(ctx context.Context, src *SourceSpec) (*Backup, error) {
	backup, job, err := m.BackupJob(ctx, src)
ale's avatar
ale committed
189 190
	if err != nil {
		return backup, err
ale's avatar
ale committed
191
	}
ale's avatar
ale committed
192 193 194
	err = job.RunContext(ctx)
	return backup, err
}
ale's avatar
ale committed
195

ale's avatar
ale committed
196 197 198 199 200 201 202 203 204
func (m *tabaccoManager) makeRestoreJob(rctx RuntimeContext, backup *Backup, src *SourceSpec, dsl []*Dataset, target string) (jobs.Job, error) {
	// Just need the Handler.
	hspec, ok := m.configMgr.getHandlerSpec(src.Handler)
	if !ok {
		return nil, fmt.Errorf("unknown handler '%s'", src.Handler)
	}
	h, err := hspec.Parse(src)
	if err != nil {
		return nil, err
205 206 207 208 209
	}

	// The actual backup operation. Just a thin wrapper around
	// doBackupDataset() that binds together the context, backup,
	// ds and target via the closure.
ale's avatar
ale committed
210
	var restoreJobs []jobs.Job
ale's avatar
ale committed
211 212 213 214 215 216
	repo := m.configMgr.getRepository()
	for _, ds := range dsl {
		restoreJobs = append(
			restoreJobs,
			h.RestoreJob(rctx, repo, backup, ds, target),
		)
ale's avatar
ale committed
217
	}
218

ale's avatar
ale committed
219 220 221 222 223 224 225
	restoreJob := m.wrapWithCommands(
		rctx,
		nil,
		jobs.AsyncGroup(restoreJobs),
		src.PreRestoreCommand,
		src.PostRestoreCommand,
	)
226 227 228 229 230 231 232

	// Group the jobs (sequentially) if there's more than one of
	// them. Give the final job a status and a user-visible name,
	// for debugging purposes. Set an exclusive lock with a
	// leave-running policy, so no more than one restore per
	// datasource can run at any given time. Finally, the job runs
	// in the 'restore' queue for concurrency limiting.
ale's avatar
ale committed
233
	id := fmt.Sprintf("restore-source-%s", src.Name)
234 235
	return m.WithQueue(
		m.WithStatus(
ale's avatar
ale committed
236 237 238 239
			m.WithExclusiveLock(
				restoreJob,
				id,
				false),
240 241 242
			id,
		),
		"restore",
ale's avatar
ale committed
243 244 245 246 247 248 249 250 251
	), nil
}

func groupDatasetsBySource(dsl []*Dataset) map[string][]*Dataset {
	m := make(map[string][]*Dataset)
	for _, ds := range dsl {
		m[ds.Source] = append(m[ds.Source], ds)
	}
	return m
ale's avatar
ale committed
252 253
}

ale's avatar
ale committed
254 255
// RestoreJob creates a job that restores the results of the
// FindRequest (with NumVersions=1) onto the given target directory.
ale's avatar
ale committed
256 257 258 259
func (m *tabaccoManager) RestoreJob(ctx context.Context, req *FindRequest, target string) (jobs.Job, error) {
	// Find the atoms relevant to this restore. The results will
	// be grouped in Backups and Datasets that only include the
	// relevant Atoms.
260
	req.NumVersions = 1
ale's avatar
ale committed
261
	backups, err := m.ms.FindAtoms(ctx, req)
ale's avatar
ale committed
262
	if err != nil {
ale's avatar
ale committed
263
		return nil, err
ale's avatar
ale committed
264
	}
ale's avatar
ale committed
265 266 267 268 269 270
	if len(backups) == 0 {
		return nil, errors.New("no results found for query")
	}

	// Create a RuntimeContext.
	rctx := m.configMgr.newRuntimeContext()
ale's avatar
ale committed
271

272
	var restoreJobs []jobs.Job
ale's avatar
ale committed
273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289
	merr := new(util.MultiError)
	for _, b := range backups {
		// Group the datasets by source, find the source and create the restore jobs.
		for srcName, dsl := range groupDatasetsBySource(b.Datasets) {

			src := m.configMgr.findSource(srcName)
			if src == nil {
				merr.Add(fmt.Errorf("unknown source '%s'", srcName))
				continue
			}

			j, err := m.makeRestoreJob(rctx, b, src, dsl, target)
			if err != nil {
				merr.Add(fmt.Errorf("source %s: %v", srcName, err))
				continue
			}
			restoreJobs = append(restoreJobs, j)
290
		}
ale's avatar
ale committed
291 292
	}

ale's avatar
ale committed
293
	return m.WithStatus(jobs.AsyncGroup(restoreJobs), fmt.Sprintf("restore_%s", util.RandomID())), merr.OrNil()
ale's avatar
ale committed
294 295 296
}

// Restore just runs the RestoreJob synchronously.
ale's avatar
ale committed
297
func (m *tabaccoManager) Restore(ctx context.Context, req *FindRequest, target string) error {
ale's avatar
ale committed
298 299 300
	job, err := m.RestoreJob(ctx, req, target)
	if err != nil {
		return err
ale's avatar
ale committed
301
	}
ale's avatar
ale committed
302
	return job.RunContext(ctx)
ale's avatar
ale committed
303 304 305 306
}

// Create a new Backup object with its own unique ID (which actually
// consists of 16 random bytes, hex-encoded).
ale's avatar
ale committed
307
func newBackup(host string) *Backup {
ale's avatar
ale committed
308
	if host == "" {
309
		host, _ = os.Hostname() // nolint
ale's avatar
ale committed
310
	}
ale's avatar
ale committed
311
	return &Backup{
312
		ID:        util.RandomID(),
ale's avatar
ale committed
313 314 315 316
		Host:      host,
		Timestamp: time.Now(),
	}
}
317

ale's avatar
ale committed
318
func (m *tabaccoManager) commandJob(rctx RuntimeContext, cmd string) jobs.Job {
319
	return jobs.JobFunc(func(ctx context.Context) error {
ale's avatar
ale committed
320
		return rctx.Shell().Run(ctx, cmd)
321 322
	})
}