job.go 9.58 KB
Newer Older
ale's avatar
ale committed
1
2
3
4
5
6
package jobs

import (
	"container/list"
	"context"
	"errors"
7
	"fmt"
ale's avatar
ale committed
8
9
10
11
12
13
14
15
16
17
18
19
	"sync"
	"time"

	"git.autistici.org/ale/tabacco/util"
)

// Job is a task that can be run and canceled, and that has a string
// that can be used to identify successive instances of the same task,
// differing only in their execution time (so keys don't have to be
// unique). It's basically a glorified goroutine wrapper with a
// cancelable Context.
type Job interface {
20
	ID() string
ale's avatar
ale committed
21
22
23
24
25
	RunContext(context.Context) error
	Cancel()
	Wait() error
}

26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// Give a job its own random unique ID.
type idJob struct {
	Job
	id string
}

func (j *idJob) ID() string { return j.id }

// WithID gives a job a random unique ID.
func WithID(j Job) Job {
	return &idJob{
		Job: j,
		id:  util.RandomID(),
	}
}

ale's avatar
ale committed
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
// Adds a Cancel method to a job.
type cancelableJob struct {
	Job
	done     chan struct{}
	err      error
	cancelMx sync.Mutex
	cancel   context.CancelFunc
}

// WithCancel wraps a job with a Context that can be canceled by
// calling the Cancel() method. It also implements its own Wait()
// method, so you don't have to.
func WithCancel(j Job) Job {
	return &cancelableJob{
		Job:  j,
		done: make(chan struct{}),
	}
}

func (j *cancelableJob) RunContext(ctx context.Context) error {
	defer close(j.done)
	j.cancelMx.Lock()
	innerCtx, cancel := context.WithCancel(ctx)
	j.cancel = cancel
	defer cancel()
	j.cancelMx.Unlock()

	j.err = j.Job.RunContext(innerCtx)
	return j.err
}

func (j *cancelableJob) Cancel() {
	j.cancelMx.Lock()
	if j.cancel != nil {
		j.cancel()
	}
	j.cancelMx.Unlock()
}

func (j *cancelableJob) Wait() error {
	<-j.done
	return j.err
}

// A base job that is just a function. It can be canceled, and the
// result can be waited upon.
type funcJob struct {
	fn func(context.Context) error
}

// JobFunc creates a new cancelable Job that wraps a function call.
func JobFunc(fn func(context.Context) error) Job {
94
	return WithCancel(WithID(&funcJob{
ale's avatar
ale committed
95
		fn: fn,
96
	}))
ale's avatar
ale committed
97
98
}

99
func (j *funcJob) ID() string  { return "" }
ale's avatar
ale committed
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
func (j *funcJob) Cancel()     {}
func (j *funcJob) Wait() error { return errors.New("Wait not implemented") }

func (j *funcJob) RunContext(ctx context.Context) error {
	return j.fn(ctx)
}

// A job that runs with a timeout.
type timeoutJob struct {
	Job
	timeout time.Duration
}

// WithTimeout wraps a job with a timeout, after which the job is
// canceled.
func WithTimeout(j Job, timeout time.Duration) Job {
	return &timeoutJob{Job: j, timeout: timeout}
}

func (j *timeoutJob) RunContext(ctx context.Context) error {
	innerCtx, cancel := context.WithTimeout(ctx, j.timeout)
	defer cancel()
	return j.Job.RunContext(innerCtx)
}

// ExclusiveLockManager keeps an exclusive lock of running jobs by
// Key, and can apply an overrun policy to them: upon starting a new
// job, when a previous job with the same key exists, it can evict the
// previous job or abort starting the new one.
type ExclusiveLockManager struct {
	mx     sync.Mutex
	locked map[string]Job
}

// NewExclusiveLockManager returns a new ExclusiveLockManager.
func NewExclusiveLockManager() *ExclusiveLockManager {
	return &ExclusiveLockManager{
		locked: make(map[string]Job),
	}
}

// WithExclusiveLock wraps a Job with an exclusive lock, so that no
// more than one job with this key may be running at any given
// time. The killAndRun flag selects the desired overrun policy when a
// second task is started.
func (m *ExclusiveLockManager) WithExclusiveLock(j Job, lockKey string, killAndRun bool) Job {
	return &exclusiveJob{Job: j, mgr: m, lockKey: lockKey, killAndRun: killAndRun}
}

type exclusiveJob struct {
	Job
	mgr        *ExclusiveLockManager
	lockKey    string
	killAndRun bool
}

func (j *exclusiveJob) RunContext(ctx context.Context) error {
	key := j.lockKey

	j.mgr.mx.Lock()
	defer j.mgr.mx.Unlock()
	if cur, ok := j.mgr.locked[key]; ok {
		if !j.killAndRun {
			// TODO: return nil?
164
			return fmt.Errorf("job %s skipped (overrun)", j.ID())
ale's avatar
ale committed
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
		}
		cur.Cancel()
		cur.Wait() // nolint
	}
	j.mgr.locked[key] = j
	j.mgr.mx.Unlock()

	err := j.Job.RunContext(ctx)

	j.mgr.mx.Lock()
	delete(j.mgr.locked, key)
	return err
}

// QueueManager can limit the number of jobs by user-define tag. Each
// tag corresponds to a separate queue, limited to a certain number of
// parallel tasks. By default (or for unknown queues) there is no
// limit.
type QueueManager struct {
	queues map[string]chan struct{}
}

// QueueSpec describes the configuration of named queues.
type QueueSpec struct {
	Workers map[string]int `yaml:"workers"`
}

// NewQueueManager returns a new QueueManager with the provided
// configuration.
func NewQueueManager(spec QueueSpec) *QueueManager {
	q := make(map[string]chan struct{})
	for name, n := range spec.Workers {
		q[name] = make(chan struct{}, n)
	}
	return &QueueManager{queues: q}
}

// WithQueue wraps a job with a concurrency limit controller.
func (m *QueueManager) WithQueue(j Job, queue string) Job {
	ch, ok := m.queues[queue]
	if !ok {
		ch, ok = m.queues["default"]
		if !ok {
			return j
		}
	}
	return &queuedJob{Job: j, ch: ch}
}

type queuedJob struct {
	Job
	ch chan struct{}
}

func (j *queuedJob) RunContext(ctx context.Context) error {
	select {
	case j.ch <- struct{}{}:
	case <-ctx.Done():
		return ctx.Err()
	}
	err := j.Job.RunContext(ctx)
	<-j.ch
	return err
}

// Enum for possible job states as tracked by the StateManager.
const (
	JobStatusPending = iota
	JobStatusRunning
	JobStatusDone
)

// JobStatus is an enum representing the state of a job.
type JobStatus int

// String returns a text representation of the job state.
func (s JobStatus) String() string {
	switch s {
	case JobStatusPending:
		return "PENDING"
	case JobStatusRunning:
		return "RUNNING"
	case JobStatusDone:
		return "DONE"
	default:
		return "UNKNOWN"
	}
}

// Status holds information on the current state of a job.
type Status struct {
	ID          string
257
	Name        string
ale's avatar
ale committed
258
259
260
261
262
263
264
265
266
267
268
269
270
	Status      JobStatus
	StartedAt   time.Time
	CompletedAt time.Time
	Err         error
	Job         Job
}

// StateManager adds a state and ID to jobs and keeps track of them
// after they have run. This is basically a way to keep track of
// running goroutines at the level of granularity that we desire.
//
// It has no practical effect on the jobs themselves, it's just a way
// to provide the user with debugging and auditing information.
271
272
273
274
//
// Note that it must use its own random ID, and not job.ID, otherwise
// nesting WithStatus handlers on the same job will cause panics (due
// to duplicated transitions from state to state).
ale's avatar
ale committed
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
type StateManager struct {
	mx      sync.Mutex
	pending map[string]*Status
	running map[string]*Status
	done    *list.List
}

// How many past executions to keep in memory.
const logsToKeep = 100

// NewStateManager creates a new StateManager.
func NewStateManager() *StateManager {
	return &StateManager{
		pending: make(map[string]*Status),
		running: make(map[string]*Status),
		done:    list.New(),
	}
}

294
func (m *StateManager) setStatusPending(id, name string, j Job) {
ale's avatar
ale committed
295
296
297
	m.mx.Lock()
	defer m.mx.Unlock()

298
299
	m.pending[id] = &Status{
		ID:     id,
300
		Name:   name,
ale's avatar
ale committed
301
302
303
304
305
		Status: JobStatusPending,
		Job:    j,
	}
}

306
func (m *StateManager) setStatusRunning(id string) {
ale's avatar
ale committed
307
308
309
	m.mx.Lock()
	defer m.mx.Unlock()

310
	js := m.pending[id]
ale's avatar
ale committed
311
312
	js.StartedAt = time.Now()
	js.Status = JobStatusRunning
313
314
	delete(m.pending, id)
	m.running[id] = js
ale's avatar
ale committed
315
316
}

317
func (m *StateManager) setStatusDone(id string, err error) {
ale's avatar
ale committed
318
319
320
	m.mx.Lock()
	defer m.mx.Unlock()

321
322
	js := m.running[id]
	delete(m.running, id)
ale's avatar
ale committed
323
324
325
326
327
328
329
330
331
332
	js.CompletedAt = time.Now()
	js.Status = JobStatusDone
	js.Err = err

	m.done.PushFront(js)
	for m.done.Len() > logsToKeep {
		m.done.Remove(m.done.Back())
	}
}

333
334
335
// WithStatus tracks a job through its lifetime. The name is used for
// display purposes and needs not be unique.
func (m *StateManager) WithStatus(j Job, name string) Job {
ale's avatar
ale committed
336
337
338
	sj := &statusJob{
		Job: j,
		mgr: m,
339
		id:  util.RandomID(),
ale's avatar
ale committed
340
	}
341
	m.setStatusPending(sj.id, name, sj)
ale's avatar
ale committed
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
	return sj
}

func jobStatusMapToSlice(m map[string]*Status) []Status {
	out := make([]Status, 0, len(m))
	for _, js := range m {
		out = append(out, *js)
	}
	return out
}

func jobStatusListToSlice(l *list.List) []Status {
	out := make([]Status, 0, l.Len())
	for e := l.Front(); e != nil; e = e.Next() {
		js := e.Value.(*Status)
		out = append(out, *js)
	}
	return out
}

// GetStatus returns three lists of Status objects, representing
// respectively pending jobs (waiting to run), running jobs, and
// completed jobs (ordered by decreasing timestamp).
ale's avatar
ale committed
365
func (m *StateManager) GetStatus() ([]Status, []Status, []Status) {
ale's avatar
ale committed
366
367
368
369
370
371
372
373
374
	m.mx.Lock()
	defer m.mx.Unlock()

	return jobStatusMapToSlice(m.pending), jobStatusMapToSlice(m.running), jobStatusListToSlice(m.done)
}

type statusJob struct {
	Job
	mgr *StateManager
375
	id  string
ale's avatar
ale committed
376
377
378
}

func (j *statusJob) RunContext(ctx context.Context) error {
379
	j.mgr.setStatusRunning(j.id)
ale's avatar
ale committed
380
	err := j.Job.RunContext(ctx)
381
	j.mgr.setStatusDone(j.id, err)
ale's avatar
ale committed
382
383
384
385
386
387
	return err
}

// SyncGroup runs all the given jobs synchronously, aborting on the
// first error.
func SyncGroup(jobs []Job) Job {
388
389
390
	if len(jobs) == 1 {
		return jobs[0]
	}
ale's avatar
ale committed
391
392
393
394
395
396
397
398
399
400
401
402
403
	return JobFunc(func(ctx context.Context) error {
		for _, j := range jobs {
			if err := j.RunContext(ctx); err != nil {
				return err
			}
		}
		return nil
	})
}

// AsyncGroup runs all the given jobs asynchronously, and waits for
// all of them to terminate. It fails if any return an error.
func AsyncGroup(jobs []Job) Job {
404
405
406
	if len(jobs) == 1 {
		return jobs[0]
	}
ale's avatar
ale committed
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
	return JobFunc(func(ctx context.Context) error {
		errCh := make(chan error)
		defer close(errCh)
		for _, j := range jobs {
			go func(j Job) {
				errCh <- j.RunContext(ctx)
			}(j)
		}

		merr := new(util.MultiError)
		for i := 0; i < len(jobs); i++ {
			if err := <-errCh; err != nil {
				merr.Add(err)
			}
		}

		return merr.OrNil()
	})
}