Commit 124dfaf7 authored by ale's avatar ale

Move to a unified job abstraction

parent 920fe0a4
......@@ -122,7 +122,9 @@ type Repository interface {
// Manager for backups and restores.
type Manager interface {
BackupJob(context.Context, []SourceSpec) (Backup, Job, error)
Backup(context.Context, []SourceSpec) (Backup, error)
RestoreJob(context.Context, FindRequest, string) (Job, error)
Restore(context.Context, FindRequest, string) error
Close() error
}
......@@ -151,8 +151,9 @@ func TestBackup(t *testing.T) {
AtomsScript: "echo user1 user1 ; echo user2 user2",
},
}
e := NewExecutor(2)
defer e.Stop()
queueSpec := MultiQueueSpec{
Workers: map[string]int{"backup": 2},
}
// Run the backup.
m, err := NewManager(
......@@ -160,7 +161,7 @@ func TestBackup(t *testing.T) {
handlerSpecs,
repoSpec,
store,
e,
queueSpec,
NewShell(true),
)
if err != nil {
......
package tabacco
import (
"context"
"io"
"log"
"sync"
"time"
)
// Op is an operation (a func(Context) error) that can be run
// asynchronously by a worker thread and waited for.
type Op struct {
id string
desc string
fn func(context.Context) error
timeout time.Duration
startedAt time.Time
done chan struct{}
err error
}
// NewOp wraps a function in an Op (with timeout).
func NewOp(desc string, fn func(context.Context) error, timeout time.Duration) *Op {
return &Op{
id: randomID(),
desc: desc,
fn: fn,
timeout: timeout,
done: make(chan struct{}),
}
}
func (op *Op) run(ctx context.Context) {
ctx, cancel := context.WithTimeout(ctx, op.timeout)
op.startedAt = time.Now()
op.err = op.fn(ctx)
cancel()
close(op.done)
}
// Wait for the operation to run, return its error.
func (op *Op) Wait() error {
<-op.done
return op.err
}
// executorStatus holds the currently running jobs, for debugging.
type executorStatus struct {
mx sync.Mutex
running map[string]*Op
failed []Op
}
func newExecutorStatus() *executorStatus {
return &executorStatus{
running: make(map[string]*Op),
}
}
func (s *executorStatus) push(op *Op) {
s.mx.Lock()
s.running[op.id] = op
s.mx.Unlock()
}
const maxFailedLength = 20
func (s *executorStatus) pop(op *Op) {
s.mx.Lock()
delete(s.running, op.id)
if op.err != nil {
s.failed = append(s.failed, *op)
if n := len(s.failed); n > maxFailedLength {
s.failed = s.failed[n-maxFailedLength : n]
}
}
s.mx.Unlock()
}
func (s *executorStatus) getStatus() ([]Op, []Op) {
var out []Op
s.mx.Lock()
defer s.mx.Unlock()
for _, op := range s.running {
out = append(out, *op)
}
return out, s.failed
}
// Executor is a task scheduler that keeps a two-level priority queue
// (for high-priority and low-priority tasks respectively).
type Executor struct {
queueCh chan struct{}
cancel context.CancelFunc
wg sync.WaitGroup
running *executorStatus
mx sync.Mutex
queueHigh []*Op
queueLow []*Op
}
// newUnstartedExecutor returns an executor without starting the worker threads.
func newUnstartedExecutor() *Executor {
return &Executor{
queueCh: make(chan struct{}, 1),
running: newExecutorStatus(),
}
}
// NewExecutor returns a new Executor with the given number of
// concurrent asynchronous workers.
func NewExecutor(numWorkers int) *Executor {
e := newUnstartedExecutor()
e.start(numWorkers)
return e
}
func (e *Executor) start(numWorkers int) {
// Create a context that can be canceled on Stop, which will
// interrupt running processes.
ctx, cancel := context.WithCancel(context.Background())
e.cancel = cancel
for i := 0; i < numWorkers; i++ {
e.wg.Add(1)
go func() {
e.runWorker(ctx)
e.wg.Done()
}()
}
}
func (e *Executor) runWorker(ctx context.Context) {
for {
op, err := e.Pop(ctx)
if err != nil {
return
}
log.Printf("executing op %s", op.id)
e.running.push(op)
op.run(ctx)
e.running.pop(op)
}
}
// Stop the workers and wait for them to terminate.
func (e *Executor) Stop() {
e.cancel()
e.wg.Wait()
}
// Enqueue an operation (possibly with high priority).
func (e *Executor) Enqueue(op *Op, hiPri bool) {
e.EnqueueBatch([]*Op{op}, hiPri)
}
// EnqueueBatch schedules a batch of operations all at once (possibly
// with high priority). Easier on lock contention.
func (e *Executor) EnqueueBatch(b []*Op, hiPri bool) {
e.mx.Lock()
if hiPri {
e.queueHigh = append(e.queueHigh, b...)
} else {
e.queueLow = append(e.queueLow, b...)
}
e.mx.Unlock()
select {
case e.queueCh <- struct{}{}:
default:
}
}
// Pop an element from the queue (if it has to sleep, it can be
// interrupted by closing stopCh).
func (e *Executor) Pop(ctx context.Context) (op *Op, err error) {
e.mx.Lock()
for len(e.queueHigh) == 0 && len(e.queueLow) == 0 {
e.mx.Unlock()
select {
case <-e.queueCh:
case <-ctx.Done():
err = io.EOF
return
}
e.mx.Lock()
}
if len(e.queueHigh) > 0 {
op = e.queueHigh[0]
e.queueHigh = e.queueHigh[1:]
} else {
op = e.queueLow[0]
e.queueLow = e.queueLow[1:]
}
e.mx.Unlock()
return
}
// GetStatus returns the running jobs, and a short list of recent
// failures.
func (e *Executor) GetStatus() ([]Op, []Op) {
return e.running.getStatus()
}
package tabacco
import (
"context"
"errors"
"sync"
"testing"
"time"
)
func TestExecutor_Priority(t *testing.T) {
e := newUnstartedExecutor()
var mx sync.Mutex
var lowDone bool
// Run 10 ops at low priority and 1 at high, and verify that
// the latter gets invoked first.
var ops []*Op
for i := 0; i < 10; i++ {
op := NewOp("lo", func(_ context.Context) error {
mx.Lock()
defer mx.Unlock()
lowDone = true
return nil
}, 10*time.Second)
ops = append(ops, op)
}
e.EnqueueBatch(ops, false)
hiOp := NewOp("hi", func(_ context.Context) error {
mx.Lock()
defer mx.Unlock()
if lowDone {
return errors.New("called low-pri op before hi-pri")
}
return nil
}, 10*time.Second)
e.Enqueue(hiOp, true)
e.start(1)
defer e.Stop()
for _, op := range ops {
if err := op.Wait(); err != nil {
t.Fatalf("op error: %v", err)
}
}
if err := hiOp.Wait(); err != nil {
t.Fatalf("hi-prio op error: %v", err)
}
}
func TestExecutor_GetStatus(t *testing.T) {
e := newUnstartedExecutor()
op := NewOp("fail", func(_ context.Context) error {
return errors.New("failure")
}, 10*time.Second)
e.Enqueue(op, false)
e.start(1)
defer e.Stop()
op.Wait()
running, failures := e.GetStatus()
if len(running) != 0 {
t.Fatalf("op still reported as running: %v", running)
}
if len(failures) != 1 {
t.Fatalf("too many/few failures: %v", failures)
}
failedOp := failures[0]
if op.id != failedOp.id {
t.Fatalf("failed op is not original op? got %+v, expected %+v", failedOp, op)
}
}
package tabacco
import (
"container/list"
"context"
"errors"
"sync"
"time"
)
// Job is a task that can be run and canceled, and that has a
// string that can be used to identify successive instances of the
// same task, differing only in their execution time.
type Job interface {
Key() string
RunContext(context.Context) error
Cancel()
Wait() error
}
// A base job that is just a function. It can be canceled, and the
// result can be waited upon.
type baseJob struct {
fn func(context.Context) error
key string
done chan struct{}
err error
cancelMx sync.Mutex
cancel context.CancelFunc
}
func newJob(key string, fn func(context.Context) error) *baseJob {
return &baseJob{
fn: fn,
key: key,
done: make(chan struct{}),
}
}
func (j *baseJob) Key() string { return j.key }
func (j *baseJob) RunContext(ctx context.Context) error {
defer close(j.done)
j.cancelMx.Lock()
innerCtx, cancel := context.WithCancel(ctx)
j.cancel = cancel
defer cancel()
j.cancelMx.Unlock()
j.err = j.fn(innerCtx)
return j.err
}
func (j *baseJob) Cancel() {
j.cancelMx.Lock()
if j.cancel != nil {
j.cancel()
}
j.cancelMx.Unlock()
}
func (j *baseJob) Wait() error {
<-j.done
return j.err
}
// A job that runs with a timeout.
type timeoutJob struct {
Job
timeout time.Duration
}
func withTimeout(j Job, timeout time.Duration) Job {
return &timeoutJob{Job: j, timeout: timeout}
}
func (j *timeoutJob) RunContext(ctx context.Context) error {
innerCtx, cancel := context.WithTimeout(ctx, j.timeout)
defer cancel()
return j.Job.RunContext(innerCtx)
}
// Manager that keeps an exclusive lock by Key of running jobs, and
// can apply an overrun policy to them: upon starting a new job, when
// a previous job with the same key exists, it can evict the previous
// job or abort starting the new one.
type exclusiveLockManager struct {
mx sync.Mutex
locked map[string]Job
}
func newExclusiveLockManager() *exclusiveLockManager {
return &exclusiveLockManager{
locked: make(map[string]Job),
}
}
func (m *exclusiveLockManager) withExclusiveLock(j Job, lockKey string, killAndRun bool) Job {
return &exclusiveJob{Job: j, mgr: m, lockKey: lockKey, killAndRun: killAndRun}
}
type exclusiveJob struct {
Job
mgr *exclusiveLockManager
lockKey string
killAndRun bool
}
func (j *exclusiveJob) RunContext(ctx context.Context) error {
key := j.lockKey
j.mgr.mx.Lock()
defer j.mgr.mx.Unlock()
if cur, ok := j.mgr.locked[key]; ok {
if !j.killAndRun {
// TODO: return nil?
return errors.New("skipped (overrun)")
}
cur.Cancel()
cur.Wait() // nolint
}
j.mgr.locked[key] = j
j.mgr.mx.Unlock()
err := j.Job.RunContext(ctx)
j.mgr.mx.Lock()
delete(j.mgr.locked, key)
return err
}
// Manager that can limit the number of jobs by tag. Each tag
// corresponds to a separate queue, limited to a certain number of
// parallel tasks. By default (or for unknown queues) there is no
// limit.
type multiQueueManager struct {
queues map[string]chan struct{}
}
// MultiQueueSpec describes the configuration of named queues.
type MultiQueueSpec struct {
Workers map[string]int `json:"workers"`
}
func newMultiQueueManager(spec MultiQueueSpec) *multiQueueManager {
q := make(map[string]chan struct{})
for name, n := range spec.Workers {
q[name] = make(chan struct{}, n)
}
return &multiQueueManager{queues: q}
}
func (m *multiQueueManager) withQueue(j Job, queue string) Job {
ch, ok := m.queues[queue]
if !ok {
ch, ok = m.queues["default"]
if !ok {
return j
}
}
return &queuedJob{Job: j, ch: ch}
}
type queuedJob struct {
Job
ch chan struct{}
}
func (j *queuedJob) RunContext(ctx context.Context) error {
select {
case j.ch <- struct{}{}:
case <-ctx.Done():
return ctx.Err()
}
err := j.Job.RunContext(ctx)
<-j.ch
return err
}
const (
jobStatusPending = iota
jobStatusRunning
jobStatusDone
)
type jobStatus struct {
id string
status int
startedAt time.Time
completedAt time.Time
err error
job Job
}
// Manager that adds a state to jobs and keeps track of it.
type stateManager struct {
mx sync.Mutex
pending map[string]*jobStatus
running map[string]*jobStatus
done *list.List
}
func newStateManager() *stateManager {
return &stateManager{
pending: make(map[string]*jobStatus),
running: make(map[string]*jobStatus),
done: list.New(),
}
}
func (m *stateManager) setStatusPending(jobID string, j Job) {
m.mx.Lock()
defer m.mx.Unlock()
m.pending[jobID] = &jobStatus{
id: jobID,
status: jobStatusPending,
job: j,
}
}
func (m *stateManager) setStatusRunning(jobID string) {
m.mx.Lock()
defer m.mx.Unlock()
js := m.pending[jobID]
js.startedAt = time.Now()
js.status = jobStatusRunning
delete(m.pending, jobID)
m.running[jobID] = js
}
const logsToKeep = 100
func (m *stateManager) setStatusDone(jobID string, err error) {
m.mx.Lock()
defer m.mx.Unlock()
js := m.running[jobID]
delete(m.running, jobID)
js.completedAt = time.Now()
js.status = jobStatusDone
js.err = err
m.done.PushFront(js)
for m.done.Len() > logsToKeep {
m.done.Remove(m.done.Back())
}
}
func (m *stateManager) withTrackedStatus(j Job) Job {
sj := &statusJob{
Job: j,
id: randomID(),
mgr: m,
}
m.setStatusPending(sj.id, sj)
return sj
}
func jobStatusMapToSlice(m map[string]*jobStatus) []jobStatus {
out := make([]jobStatus, 0, len(m))
for _, js := range m {
out = append(out, *js)
}
return out
}
func jobStatusListToSlice(l *list.List) []jobStatus {
out := make([]jobStatus, 0, l.Len())
for e := l.Front(); e != nil; e = e.Next() {
js := e.Value.(*jobStatus)
out = append(out, *js)
}
return out
}
func (m *stateManager) getJobsStatus() ([]jobStatus, []jobStatus, []jobStatus) {
m.mx.Lock()
defer m.mx.Unlock()
return jobStatusMapToSlice(m.pending), jobStatusMapToSlice(m.running), jobStatusListToSlice(m.done)
}
type statusJob struct {
Job
id string
mgr *stateManager
}
func (j *statusJob) RunContext(ctx context.Context) error {
j.mgr.setStatusRunning(j.id)
err := j.Job.RunContext(ctx)
j.mgr.setStatusDone(j.id, err)
return err
}
func newSyncGroup(key string, jobs []Job) Job {
return newJob(key, func(ctx context.Context) error {
for _, j := range jobs {
if err := j.RunContext(ctx); err != nil {
return err
}
}
return nil
})
}
func newAsyncGroup(key string, jobs []Job) Job {
return newJob(key, func(ctx context.Context) error {
errCh := make(chan error)
defer close(errCh)
for _, j := range jobs {
go func(j Job) {
errCh <- j.RunContext(ctx)
}(j)
}
merr := new(multiError)
for i := 0; i < len(jobs); i++ {
if err := <-errCh; err != nil {
merr.Add(err)
}
}
return merr.orNil()
})
}
......@@ -9,29 +9,33 @@ import (
"time"
)
var defaultOpTimeout = 6 * time.Hour
// Manager for backups and restores.
type tabaccoManager struct {
*exclusiveLockManager
*multiQueueManager
*stateManager
handlerMap map[string]Handler
repo Repository
ms MetadataStore
exec *Executor
shell *Shell
}
// NewManager creates a new Manager.
func NewManager(ctx context.Context, handlerSpecs []HandlerSpec, repoSpec RepositorySpec, ms MetadataStore, exec *Executor, shell *Shell) (Manager, error) {
func NewManager(ctx context.Context, handlerSpecs []HandlerSpec, repoSpec RepositorySpec, ms MetadataStore, queueSpec MultiQueueSpec, shell *Shell) (Manager, error) {
handlerMap, repo, err := parseSpecs(ctx, handlerSpecs, repoSpec, shell)
if err != nil {
return nil, err
}
return &tabaccoManager{
exclusiveLockManager: newExclusiveLockManager(),
multiQueueManager: newMultiQueueManager(queueSpec),
stateManager: newStateManager(),
handlerMap: handlerMap,
repo: repo,
ms: ms,
exec: exec,
shell: shell,
}, nil
}
......@@ -45,21 +49,45 @@ func (m *tabaccoManager) Close() error {
// Prepare the repository for a new backup. This is a synchronous
// operation: we need to wait for it to complete to avoid running the
// backup tasks too soon.
func (m *tabaccoManager) prepareBackup(ctx context.Context, backup Backup) error {
op := NewOp("prepare repository", func(ctx context.Context) error {
func (m *tabaccoManager) prepareBackupJob(backup Backup) Job {
return newJob("backup_prep", func(ctx context.Context) error {
if err := m.repo.Init(ctx); err != nil {
log.Printf("repository init failed: %v", err)
return err
}
log.Printf("preparing backup")
return m.repo.Prepare(ctx, backup)
}, defaultOpTimeout)
m.exec.Enqueue(op, false)
return op.Wait()
})
}