Commit b35324a5 authored by ale's avatar ale

Add status handlers, an overrun policy manager for scheduled jobs

parent a7b33a75
......@@ -119,3 +119,10 @@ type Repository interface {
RestoreStream(context.Context, Backup, Dataset, string, io.Writer) error
Close() error
}
// Manager for backups and restores.
type Manager interface {
Backup(context.Context, []SourceSpec) (Backup, error)
Restore(context.Context, FindRequest, string) error
Close() error
}
......@@ -8,55 +8,104 @@ import (
"time"
)
// ExecOp is an operation (a func(Context) error) that can be run
// Op is an operation (a func(Context) error) that can be run
// asynchronously by a worker thread and waited for.
type ExecOp struct {
id string
fn func(context.Context) error
timeout time.Duration
done chan struct{}
err error
type Op struct {
id string
desc string
fn func(context.Context) error
timeout time.Duration
startedAt time.Time
done chan struct{}
err error
}
// NewExecOp wraps a function in an ExecOp (with timeout).
func NewExecOp(fn func(context.Context) error, timeout time.Duration) *ExecOp {
return &ExecOp{
// NewOp wraps a function in an Op (with timeout).
func NewOp(desc string, fn func(context.Context) error, timeout time.Duration) *Op {
return &Op{
id: randomID(),
desc: desc,
fn: fn,
timeout: timeout,
done: make(chan struct{}),
}
}
func (op *ExecOp) run(ctx context.Context) {
func (op *Op) run(ctx context.Context) {
ctx, cancel := context.WithTimeout(ctx, op.timeout)
op.startedAt = time.Now()
op.err = op.fn(ctx)
cancel()
close(op.done)
}
// Wait for the operation to run, return its error.
func (op *ExecOp) Wait() error {
func (op *Op) Wait() error {
<-op.done
return op.err
}
// executorStatus holds the currently running jobs, for debugging.
type executorStatus struct {
mx sync.Mutex
running map[string]*Op
failed []Op
}
func newExecutorStatus() *executorStatus {
return &executorStatus{
running: make(map[string]*Op),
}
}
func (s *executorStatus) push(op *Op) {
s.mx.Lock()
s.running[op.id] = op
s.mx.Unlock()
}
const maxFailedLength = 20
func (s *executorStatus) pop(op *Op) {
s.mx.Lock()
delete(s.running, op.id)
if op.err != nil {
s.failed = append(s.failed, *op)
if n := len(s.failed); n > maxFailedLength {
s.failed = s.failed[n-maxFailedLength : n]
}
}
s.mx.Unlock()
}
func (s *executorStatus) getStatus() ([]Op, []Op) {
var out []Op
s.mx.Lock()
defer s.mx.Unlock()
for _, op := range s.running {
out = append(out, *op)
}
return out, s.failed
}
// Executor is a task scheduler that keeps a two-level priority queue
// (for high-priority and low-priority tasks respectively).
type Executor struct {
queueCh chan struct{}
cancel context.CancelFunc
wg sync.WaitGroup
running *executorStatus
mx sync.Mutex
queueHigh []*ExecOp
queueLow []*ExecOp
queueHigh []*Op
queueLow []*Op
}
// newUnstartedExecutor returns an executor without starting the worker threads.
func newUnstartedExecutor() *Executor {
return &Executor{
queueCh: make(chan struct{}, 1),
running: newExecutorStatus(),
}
}
......@@ -90,7 +139,9 @@ func (e *Executor) runWorker(ctx context.Context) {
}
log.Printf("executing op %s", op.id)
e.running.push(op)
op.run(ctx)
e.running.pop(op)
}
}
......@@ -101,13 +152,13 @@ func (e *Executor) Stop() {
}
// Enqueue an operation (possibly with high priority).
func (e *Executor) Enqueue(op *ExecOp, hiPri bool) {
e.EnqueueBatch([]*ExecOp{op}, hiPri)
func (e *Executor) Enqueue(op *Op, hiPri bool) {
e.EnqueueBatch([]*Op{op}, hiPri)
}
// EnqueueBatch schedules a batch of operations all at once (possibly
// with high priority). Easier on lock contention.
func (e *Executor) EnqueueBatch(b []*ExecOp, hiPri bool) {
func (e *Executor) EnqueueBatch(b []*Op, hiPri bool) {
e.mx.Lock()
if hiPri {
e.queueHigh = append(e.queueHigh, b...)
......@@ -124,7 +175,7 @@ func (e *Executor) EnqueueBatch(b []*ExecOp, hiPri bool) {
// Pop an element from the queue (if it has to sleep, it can be
// interrupted by closing stopCh).
func (e *Executor) Pop(ctx context.Context) (op *ExecOp, err error) {
func (e *Executor) Pop(ctx context.Context) (op *Op, err error) {
e.mx.Lock()
for len(e.queueHigh) == 0 && len(e.queueLow) == 0 {
e.mx.Unlock()
......@@ -146,3 +197,9 @@ func (e *Executor) Pop(ctx context.Context) (op *ExecOp, err error) {
e.mx.Unlock()
return
}
// GetStatus returns the running jobs, and a short list of recent
// failures.
func (e *Executor) GetStatus() ([]Op, []Op) {
return e.running.getStatus()
}
......@@ -16,9 +16,9 @@ func TestExecutor_Priority(t *testing.T) {
// Run 10 ops at low priority and 1 at high, and verify that
// the latter gets invoked first.
var ops []*ExecOp
var ops []*Op
for i := 0; i < 10; i++ {
op := NewExecOp(func(_ context.Context) error {
op := NewOp("lo", func(_ context.Context) error {
mx.Lock()
defer mx.Unlock()
lowDone = true
......@@ -27,7 +27,7 @@ func TestExecutor_Priority(t *testing.T) {
ops = append(ops, op)
}
e.EnqueueBatch(ops, false)
hiOp := NewExecOp(func(_ context.Context) error {
hiOp := NewOp("hi", func(_ context.Context) error {
mx.Lock()
defer mx.Unlock()
if lowDone {
......@@ -49,3 +49,28 @@ func TestExecutor_Priority(t *testing.T) {
t.Fatalf("hi-prio op error: %v", err)
}
}
func TestExecutor_GetStatus(t *testing.T) {
e := newUnstartedExecutor()
op := NewOp("fail", func(_ context.Context) error {
return errors.New("failure")
}, 10*time.Second)
e.Enqueue(op, false)
e.start(1)
defer e.Stop()
op.Wait()
running, failures := e.GetStatus()
if len(running) != 0 {
t.Fatalf("op still reported as running: %v", running)
}
if len(failures) != 1 {
t.Fatalf("too many/few failures: %v", failures)
}
failedOp := failures[0]
if op.id != failedOp.id {
t.Fatalf("failed op is not original op? got %+v, expected %+v", failedOp, op)
}
}
......@@ -2,20 +2,17 @@ package tabacco
import (
"context"
"crypto/rand"
"encoding/hex"
"errors"
"fmt"
"log"
"os"
"strings"
"time"
)
var defaultOpTimeout = 6 * time.Hour
// Manager for backups and restores.
type Manager struct {
type tabaccoManager struct {
handlerMap map[string]Handler
repo Repository
ms MetadataStore
......@@ -24,13 +21,13 @@ type Manager struct {
}
// NewManager creates a new Manager.
func NewManager(ctx context.Context, handlerSpecs []HandlerSpec, repoSpec RepositorySpec, ms MetadataStore, exec *Executor, shell *Shell) (*Manager, error) {
func NewManager(ctx context.Context, handlerSpecs []HandlerSpec, repoSpec RepositorySpec, ms MetadataStore, exec *Executor, shell *Shell) (Manager, error) {
handlerMap, repo, err := parseSpecs(ctx, handlerSpecs, repoSpec, shell)
if err != nil {
return nil, err
}
return &Manager{
return &tabaccoManager{
handlerMap: handlerMap,
repo: repo,
ms: ms,
......@@ -41,15 +38,15 @@ func NewManager(ctx context.Context, handlerSpecs []HandlerSpec, repoSpec Reposi
// Close the Manager and free all associated resources (those owned by
// this object).
func (m *Manager) Close() error {
func (m *tabaccoManager) Close() error {
return m.repo.Close()
}
// Prepare the repository for a new backup. This is a synchronous
// operation: we need to wait for it to complete to avoid running the
// backup tasks too soon.
func (m *Manager) prepareBackup(ctx context.Context, backup Backup) error {
op := NewExecOp(func(ctx context.Context) error {
func (m *tabaccoManager) prepareBackup(ctx context.Context, backup Backup) error {
op := NewOp("prepare repository", func(ctx context.Context) error {
if err := m.repo.Init(ctx); err != nil {
log.Printf("repository init failed: %v", err)
return err
......@@ -62,7 +59,7 @@ func (m *Manager) prepareBackup(ctx context.Context, backup Backup) error {
}
// Backup all known sources to the configured destination.
func (m *Manager) Backup(ctx context.Context, sourceSpecs []SourceSpec) (Backup, error) {
func (m *tabaccoManager) Backup(ctx context.Context, sourceSpecs []SourceSpec) (Backup, error) {
// Parse the source specs and obtain Datasets. Errors here are
// logged but *not* fatal, unless there are errors and the
// list of non-erroring sources is nil.
......@@ -81,12 +78,12 @@ func (m *Manager) Backup(ctx context.Context, sourceSpecs []SourceSpec) (Backup,
}
// Run all backup tasks, scheduling them using the executor.
var ops []*ExecOp
var ops []*Op
for _, ds := range datasets {
// Bind 'ds' to the closure via an anonymous
// function. Required because of the loop.
func(ds Dataset) {
ops = append(ops, NewExecOp(func(ctx context.Context) (err error) {
ops = append(ops, NewOp(fmt.Sprintf("backup dataset %s", ds.Name), func(ctx context.Context) (err error) {
h, ok := m.handlerMap[ds.Handler]
if !ok {
return fmt.Errorf("%s: unknown handler '%s'", ds.Name, ds.Handler)
......@@ -117,17 +114,18 @@ func (m *Manager) Backup(ctx context.Context, sourceSpecs []SourceSpec) (Backup,
// Restore the results of the FindRequest (with NumVersions=1) onto
// the given target directory.
func (m *Manager) Restore(ctx context.Context, req FindRequest, target string) error {
func (m *tabaccoManager) Restore(ctx context.Context, req FindRequest, target string) error {
// Find the atoms relevant to this restore.
req.NumVersions = 1
versions, err := m.ms.FindAtoms(ctx, req)
if err != nil {
return err
}
var ops []*ExecOp
var ops []*Op
for _, vv := range versions {
func(ds Dataset, backup Backup) {
ops = append(ops, NewExecOp(func(ctx context.Context) error {
ops = append(ops, NewOp(fmt.Sprintf("restore dataset %s", ds.Name), func(ctx context.Context) error {
log.Printf("restoring %+v %+v", ds, backup)
h, ok := m.handlerMap[ds.Handler]
if !ok {
......@@ -148,49 +146,11 @@ func (m *Manager) Restore(ctx context.Context, req FindRequest, target string) e
return merr.orNil()
}
type multiError struct {
errors []error
}
func (m *multiError) Add(err error) {
m.errors = append(m.errors, err)
}
func (m *multiError) Error() string {
var tmp []string
for _, e := range m.errors {
tmp = append(tmp, e.Error())
}
return strings.Join(tmp, ", ")
}
func (m *multiError) Errors() []error {
return m.errors
}
func (m *multiError) orNil() error {
if len(m.errors) > 0 {
return m
}
return nil
}
// Generate a random unique ID. It will return an identifier
// consisting of 32 ascii-friendly bytes (16 random bytes,
// hex-encoded).
func randomID() string {
var b [16]byte
if _, err := rand.Read(b[:]); err != nil {
panic(err)
}
return hex.EncodeToString(b[:])
}
// Create a new Backup object with its own unique ID (which actually
// consists of 16 random bytes, hex-encoded).
func newBackup(host string) Backup {
if host == "" {
host, _ = os.Hostname()
host, _ = os.Hostname() // nolint
}
return Backup{
ID: randomID(),
......
......@@ -54,11 +54,11 @@ func newResticRepository(params map[string]interface{}, shell *Shell) (Repositor
return nil, err
}
if _, err := io.WriteString(tmpf, password); err != nil {
os.Remove(tmpf.Name()) // nolint: errcheck
os.Remove(tmpf.Name()) // nolint
return nil, err
}
if err := tmpf.Close(); err != nil {
os.Remove(tmpf.Name()) // nolint: errcheck
os.Remove(tmpf.Name()) // nolint
return nil, err
}
......
This diff is collapsed.
package tabacco
import (
"context"
"fmt"
"io/ioutil"
"math/rand"
"os"
"sync"
"sync/atomic"
"testing"
"time"
)
type fakeManager struct {
counts map[string]int
}
func newFakeManager() *fakeManager {
return &fakeManager{
counts: make(map[string]int),
}
}
func (f *fakeManager) Backup(_ context.Context, specs []SourceSpec) (Backup, error) {
for _, spec := range specs {
f.counts[spec.Name] = f.counts[spec.Name] + 1
}
return Backup{}, nil
}
func (f *fakeManager) Restore(_ context.Context, _ FindRequest, _ string) error {
return nil
}
func (f *fakeManager) Close() error { return nil }
func TestScheduler(t *testing.T) {
tmpf, err := ioutil.TempFile("", "")
if err != nil {
t.Fatal(err)
}
tmpf.Close()
defer os.Remove(tmpf.Name()) // nolint
m := newFakeManager()
sourceSpecs := []SourceSpec{
{
Name: "source1",
Handler: "file1",
Schedule: "@every 1s",
Atoms: []Atom{
{
Name: "user1",
RelativePath: "user1",
},
{
Name: "user2",
RelativePath: "user2",
},
},
},
{
Name: "source2",
Handler: "dbpipe",
Schedule: "@random_every 3s",
AtomsScript: "echo user1 user1 ; echo user2 user2",
},
}
s, err := NewScheduler(context.Background(), m, sourceSpecs, tmpf.Name())
if err != nil {
t.Fatal(err)
}
time.Sleep(4 * time.Second)
status := s.GetStatus()
s.Stop()
src1n := m.counts["source1"]
if src1n == 0 {
t.Error("count of source1 backups is zero")
}
if src1n > 10 {
t.Errorf("source1 ran too many times (%d)", src1n)
}
src2n := m.counts["source2"]
if src2n == 0 {
t.Error("count of source2 backups is zero")
}
if src2n > 10 {
t.Errorf("source2 ran too many times (%d)", src2n)
}
if len(status.Scheduled) != 2 {
t.Errorf("bad status.Scheduled (expected 2 values): %+v", status.Scheduled)
}
if len(status.Completed) != (src1n + src2n) {
t.Errorf("bad status.Completed (expected %d values, got %d): %+v", src1n+src2n, len(status.Completed), status.Completed)
}
}
func TestRandomPeriodicSchedule(t *testing.T) {
// To improve the statistical odds of finding something wrong,
// run a large number of identical tests, with different
// random seeds, in parallel.
numTests := 1000
var errcount int32
var sampleErr error
var wg sync.WaitGroup
wg.Add(numTests)
for i := 0; i < numTests; i++ {
go func(i int) {
if err := runRandomPeriodicScheduleTest(i); err != nil {
if atomic.AddInt32(&errcount, 1) == 1 {
sampleErr = err
}
}
wg.Done()
}(i)
}
wg.Wait()
if errcount > 0 {
t.Fatalf("found %d errors (on %d tests), sample: %v", errcount, numTests, sampleErr)
}
}
func runRandomPeriodicScheduleTest(i int) error {
rnd := rand.New(rand.NewSource(int64(i)))
period := 30 * time.Second
s := randomScheduleEvery(rnd, period)
now := time.Now()
next := s.Next(now)
if next.Before(now) {
return fmt.Errorf("next before now: next=%s, now=%s", next, now)
}
delta := next.Sub(now)
if delta > period {
return fmt.Errorf("delta greater than period: next=%s, now=%s, delta=%s", next, now, delta)
}
return nil
}
package tabacco
import (
"crypto/rand"
"encoding/binary"
"encoding/hex"
"strings"
)
type multiError struct {
errors []error
}
func (m *multiError) Add(err error) {
m.errors = append(m.errors, err)
}
func (m *multiError) Error() string {
var tmp []string
for _, e := range m.errors {
tmp = append(tmp, e.Error())
}
return strings.Join(tmp, ", ")
}
func (m *multiError) Errors() []error {
return m.errors
}
func (m *multiError) orNil() error {
if len(m.errors) > 0 {
return m
}
return nil
}
// Generate a random unique ID. It will return an identifier
// consisting of 32 ascii-friendly bytes (16 random bytes,
// hex-encoded).
func randomID() string {
var b [16]byte
if _, err := rand.Read(b[:]); err != nil { // nolint: gosec
panic(err)
}
return hex.EncodeToString(b[:])
}
// Generate a random uint64, and return it along with its byte
// representation (encoding/binary, little-endian).
func randomSeed() (uint64, []byte) {
// Initialize the seed from a secure source.
var b [8]byte
if _, err := rand.Read(b[:]); err != nil { // nolint: gosec
panic(err)
}
seed := binary.LittleEndian.Uint64(b[:])
return seed, b[:]
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment