Commit a7b33a75 authored by ale's avatar ale

Initial commit

parents
tabacco
===
A data backup manager, modeling backups of high-level *datasets* and
maintaining the association with low-level hosts and files.
The idea is to describe the data to be backed up in terms that make
sense to the application layer: for instance, a web hosting provider
may have datasets corresponding to data and sql databases for each
individual user (e.g. *data/user1*, *data/user2*, *sql/user1* etc).
The software will then map these dataset names to hosts and files, it
will back up the data and allow you to retrieve it on those terms.
The following scenarios / use cases for retrieval are supported:
* retrieve specific datasets, identified by their name
* restore an entire host or dataset group, for administrative or
maintenance-related reasons
in order to do this, tabacco must allow you to restore datasets based
on both high-level or low-level identifiers.
package tabacco
import (
"context"
"fmt"
"io"
"regexp"
"strings"
"time"
)
// Backup is the over-arching entity describing a high level backup
// operation. Backups are initiated autonomously by individual hosts,
// so each Backup belongs to a single Host.
type Backup struct {
// Backup ID (globally unique identifier).
ID string `json:"id"`
// Timestamp (backup start).
Timestamp time.Time `json:"timestamp"`
// Host.
Host string `json:"host"`
}
// An Atom is a bit of data that can be restored independently as part
// of a Dataset.
type Atom struct {
// Name (path-like, not rooted).
Name string `json:"name"`
// Relative path with respect to the Dataset.
RelativePath string `json:"rel_path"`
// Source path (used for restore).
SourcePath string `json:"source_path,omitempty"`
}
// A Dataset describes a data set as a high level structure containing
// one or more atoms. The 1-to-many scenario is justified by the
// following use case: imagine a sql database server, we may want to
// back it up as a single operation, but it contains multiple
// databases (the atom we're interested in), which we might want to
// restore independently.
type Dataset struct {
// Name of the dataset. Will be prepended to target storage
// paths.
Name string `json:"name"`
// Handler specifies the dataset type (which handler to use to
// backup/restore it).
Handler string `json:"handler"`
// Atoms that are part of this dataset.
Atoms []Atom `json:"atoms"`
}
// Directory metadata. A bunch of files that are copied or restored
// together. A Directory contains implicitly all of its subdirectories.
//
// Actual datasets are identified by symbolic names called *tags*.
type Directory struct {
Path string `json:"path"`
}
// FindRequest specifies search criteria for atoms.
type FindRequest struct {
Pattern string `json:"pattern"`
patternRx *regexp.Regexp
Host string `json:"host"`
NumVersions int `json:"num_versions"`
}
func (req *FindRequest) matchPattern(s string) bool {
if req.patternRx == nil {
req.patternRx = regexp.MustCompile(
fmt.Sprintf("^%s$", strings.Replace(req.Pattern, "*", ".*", -1)))
}
return req.patternRx.MatchString(s)
}
// A Version ties together a Dataset and a Backup.
type Version struct {
Dataset Dataset `json:"dataset"`
Backup Backup `json:"backup"`
}
// FindResponseEntry groups datasets and their versions.
type FindResponseEntry struct {
Dataset Dataset `json:"dataset"`
Versions []Backup `json:"versions"`
}
// MetadataStore is the client interface to the global metadata store.
type MetadataStore interface {
// Find the datasets that match a specific criteria. Only
// atoms matching the criteria will be included in the Dataset
// objects in the response.
FindAtoms(context.Context, FindRequest) ([][]Version, error)
// Add a dataset entry (the Backup might already exist).
AddDataset(context.Context, Backup, Dataset) error
}
// Handler can backup and restore a specific class of datasets.
type Handler interface {
Backup(context.Context, Repository, Backup, Dataset) (Dataset, error)
Restore(context.Context, Repository, Backup, Dataset, string) error
}
// Repository is the interface to a remote repository.
type Repository interface {
Init(context.Context) error
Prepare(context.Context, Backup) error
Backup(context.Context, Backup, Dataset, string) error
Restore(context.Context, Backup, Dataset, string) error
BackupStream(context.Context, Backup, Dataset, io.Reader) error
RestoreStream(context.Context, Backup, Dataset, string, io.Writer) error
Close() error
}
package tabacco
import (
"context"
"fmt"
"testing"
"time"
)
type dummyMetadataEntry struct {
backupID string
backupTS time.Time
name string
dsName string
host string
handler string
atom Atom
}
func (e dummyMetadataEntry) match(req FindRequest) bool {
if req.Pattern != "" && !req.matchPattern(e.name) {
return false
}
if req.Host != "" && req.Host != e.host {
return false
}
return true
}
func (e dummyMetadataEntry) toDataset() Dataset {
return Dataset{
Name: e.dsName,
Handler: e.handler,
}
}
func (e dummyMetadataEntry) toBackup() Backup {
return Backup{
ID: e.backupID,
Timestamp: e.backupTS,
Host: e.host,
}
}
type dummyMetadataStore struct {
log []dummyMetadataEntry
}
func (d *dummyMetadataStore) FindAtoms(_ context.Context, req FindRequest) ([][]Version, error) {
tmp := make(map[string]map[string][]dummyMetadataEntry)
for _, l := range d.log {
if !l.match(req) {
continue
}
m, ok := tmp[l.name]
if !ok {
m = make(map[string][]dummyMetadataEntry)
tmp[l.name] = m
}
m[l.backupID] = append(m[l.name], l)
}
count := req.NumVersions
if count < 1 {
count = 1
}
var out [][]Version
for _, dsmap := range tmp {
var dsVersions []Version
for _, dslog := range dsmap {
v := Version{
Dataset: dslog[0].toDataset(),
Backup: dslog[0].toBackup(),
}
if len(dslog) > count {
dslog = dslog[len(dslog)-count:]
}
for _, l := range dslog {
v.Dataset.Atoms = append(v.Dataset.Atoms, l.atom)
}
dsVersions = append(dsVersions, v)
}
out = append(out, dsVersions)
}
return out, nil
}
func (d *dummyMetadataStore) AddDataset(_ context.Context, backup Backup, ds Dataset) error {
for _, atom := range ds.Atoms {
name := fmt.Sprintf("%s/%s", ds.Name, atom.Name)
d.log = append(d.log, dummyMetadataEntry{
backupID: backup.ID,
backupTS: backup.Timestamp,
host: backup.Host,
name: name,
dsName: ds.Name,
handler: ds.Handler,
atom: atom,
})
}
return nil
}
func TestBackup(t *testing.T) {
store := &dummyMetadataStore{}
repoSpec := RepositorySpec{
Name: "main",
Type: "restic",
Params: map[string]interface{}{
"uri": "/tmp/restic/repo",
"password": "testpass",
},
}
handlerSpecs := []HandlerSpec{
{
Name: "file1",
Type: "file",
Params: map[string]interface{}{
"path": "/source/of/file1",
},
},
{
Name: "dbpipe",
Type: "pipe",
Params: map[string]interface{}{
"backup_cmd": "echo ${backup.id} ${ds.name} ${atom.name}",
"restore_cmd": "cat",
},
},
}
sourceSpecs := []SourceSpec{
{
Name: "source1",
Handler: "file1",
Atoms: []Atom{
{
Name: "user1",
RelativePath: "user1",
},
{
Name: "user2",
RelativePath: "user2",
},
},
},
{
Name: "source2",
Handler: "dbpipe",
AtomsScript: "echo user1 user1 ; echo user2 user2",
},
}
e := NewExecutor(2)
defer e.Stop()
// Run the backup.
m, err := NewManager(
context.TODO(),
handlerSpecs,
repoSpec,
store,
e,
NewShell(true),
)
if err != nil {
t.Fatal(err)
}
defer m.Close()
backup, err := m.Backup(context.TODO(), sourceSpecs)
if err != nil {
t.Fatal(err)
}
if backup.ID == "" || backup.Host == "" {
t.Fatalf("empty fields in backup: %+v", backup)
}
// Try to find atoms in the metadata store.
// Let's try with a pattern first.
resp, err := store.FindAtoms(context.TODO(), FindRequest{Pattern: "source1/*", NumVersions: 1})
if err != nil {
t.Fatal("FindAtoms", err)
}
if len(resp) != 2 {
t.Fatalf("bad response: %+v", resp)
}
// A pattern matching a single atom.
resp, err = store.FindAtoms(context.TODO(), FindRequest{Pattern: "source1/user2"})
if err != nil {
t.Fatal("FindAtoms", err)
}
if len(resp) != 1 {
t.Fatalf("bad response: %+v", resp)
}
}
package tabacco
import (
"context"
"io"
"log"
"sync"
"time"
)
// ExecOp is an operation (a func(Context) error) that can be run
// asynchronously by a worker thread and waited for.
type ExecOp struct {
id string
fn func(context.Context) error
timeout time.Duration
done chan struct{}
err error
}
// NewExecOp wraps a function in an ExecOp (with timeout).
func NewExecOp(fn func(context.Context) error, timeout time.Duration) *ExecOp {
return &ExecOp{
id: randomID(),
fn: fn,
timeout: timeout,
done: make(chan struct{}),
}
}
func (op *ExecOp) run(ctx context.Context) {
ctx, cancel := context.WithTimeout(ctx, op.timeout)
op.err = op.fn(ctx)
cancel()
close(op.done)
}
// Wait for the operation to run, return its error.
func (op *ExecOp) Wait() error {
<-op.done
return op.err
}
// Executor is a task scheduler that keeps a two-level priority queue
// (for high-priority and low-priority tasks respectively).
type Executor struct {
queueCh chan struct{}
cancel context.CancelFunc
wg sync.WaitGroup
mx sync.Mutex
queueHigh []*ExecOp
queueLow []*ExecOp
}
// newUnstartedExecutor returns an executor without starting the worker threads.
func newUnstartedExecutor() *Executor {
return &Executor{
queueCh: make(chan struct{}, 1),
}
}
// NewExecutor returns a new Executor with the given number of
// concurrent asynchronous workers.
func NewExecutor(numWorkers int) *Executor {
e := newUnstartedExecutor()
e.start(numWorkers)
return e
}
func (e *Executor) start(numWorkers int) {
// Create a context that can be canceled on Stop, which will
// interrupt running processes.
ctx, cancel := context.WithCancel(context.Background())
e.cancel = cancel
for i := 0; i < numWorkers; i++ {
e.wg.Add(1)
go func() {
e.runWorker(ctx)
e.wg.Done()
}()
}
}
func (e *Executor) runWorker(ctx context.Context) {
for {
op, err := e.Pop(ctx)
if err != nil {
return
}
log.Printf("executing op %s", op.id)
op.run(ctx)
}
}
// Stop the workers and wait for them to terminate.
func (e *Executor) Stop() {
e.cancel()
e.wg.Wait()
}
// Enqueue an operation (possibly with high priority).
func (e *Executor) Enqueue(op *ExecOp, hiPri bool) {
e.EnqueueBatch([]*ExecOp{op}, hiPri)
}
// EnqueueBatch schedules a batch of operations all at once (possibly
// with high priority). Easier on lock contention.
func (e *Executor) EnqueueBatch(b []*ExecOp, hiPri bool) {
e.mx.Lock()
if hiPri {
e.queueHigh = append(e.queueHigh, b...)
} else {
e.queueLow = append(e.queueLow, b...)
}
e.mx.Unlock()
select {
case e.queueCh <- struct{}{}:
default:
}
}
// Pop an element from the queue (if it has to sleep, it can be
// interrupted by closing stopCh).
func (e *Executor) Pop(ctx context.Context) (op *ExecOp, err error) {
e.mx.Lock()
for len(e.queueHigh) == 0 && len(e.queueLow) == 0 {
e.mx.Unlock()
select {
case <-e.queueCh:
case <-ctx.Done():
err = io.EOF
return
}
e.mx.Lock()
}
if len(e.queueHigh) > 0 {
op = e.queueHigh[0]
e.queueHigh = e.queueHigh[1:]
} else {
op = e.queueLow[0]
e.queueLow = e.queueLow[1:]
}
e.mx.Unlock()
return
}
package tabacco
import (
"context"
"errors"
"sync"
"testing"
"time"
)
func TestExecutor_Priority(t *testing.T) {
e := newUnstartedExecutor()
var mx sync.Mutex
var lowDone bool
// Run 10 ops at low priority and 1 at high, and verify that
// the latter gets invoked first.
var ops []*ExecOp
for i := 0; i < 10; i++ {
op := NewExecOp(func(_ context.Context) error {
mx.Lock()
defer mx.Unlock()
lowDone = true
return nil
}, 10*time.Second)
ops = append(ops, op)
}
e.EnqueueBatch(ops, false)
hiOp := NewExecOp(func(_ context.Context) error {
mx.Lock()
defer mx.Unlock()
if lowDone {
return errors.New("called low-pri op before hi-pri")
}
return nil
}, 10*time.Second)
e.Enqueue(hiOp, true)
e.start(1)
defer e.Stop()
for _, op := range ops {
if err := op.Wait(); err != nil {
t.Fatalf("op error: %v", err)
}
}
if err := hiOp.Wait(); err != nil {
t.Fatalf("hi-prio op error: %v", err)
}
}
package tabacco
import (
"context"
"errors"
"path/filepath"
)
type fileHandler struct {
path string
}
func newFileHandler(params map[string]interface{}, _ *Shell) (Handler, error) {
path, ok := params["path"].(string)
if !ok || path == "" {
return nil, errors.New("missing path")
}
return &fileHandler{path}, nil
}
func (h *fileHandler) Backup(ctx context.Context, repo Repository, backup Backup, ds Dataset) (Dataset, error) {
// Set SourcePaths for all atoms.
var atoms []Atom
for _, atom := range ds.Atoms {
atom.SourcePath = filepath.Join(h.path, atom.RelativePath)
atoms = append(atoms, atom)
}
ds.Atoms = atoms
return ds, repo.Backup(ctx, backup, ds, h.path /* UNUSED */)
}
func (h *fileHandler) Restore(ctx context.Context, repo Repository, backup Backup, ds Dataset, target string) error {
return repo.Restore(ctx, backup, ds, target)
}
package tabacco
import (
"context"
"errors"
"io"
"os"
"path/filepath"
)
// The pipeHandler must work on a 1:1 dataset/atom mapping, because it
// generates a single file on the repository, and thus it can't
// distinguish multiple atoms inside it.
type pipeHandler struct {
backupCmd, restoreCmd string
shell *Shell
}
func newPipeHandler(params map[string]interface{}, shell *Shell) (Handler, error) {
backupCmd, ok := params["backup_cmd"].(string)
if !ok || backupCmd == "" {
return nil, errors.New("missing backup_cmd")
}
restoreCmd, ok := params["restore_cmd"].(string)
if !ok || restoreCmd == "" {
return nil, errors.New("missing restore_cmd")
}
return &pipeHandler{
backupCmd: backupCmd,
restoreCmd: restoreCmd,
shell: shell,
}, nil
}
func (h *pipeHandler) Backup(ctx context.Context, repo Repository, backup Backup, ds Dataset) (Dataset, error) {
var atoms []Atom
for _, atom := range ds.Atoms {
atom.SourcePath = filepath.Join(ds.Name, atom.Name)
if err := h.backupAtom(ctx, repo, backup, ds, atom); err != nil {
return ds, err
}
atoms = append(atoms, atom)
}
ds.Atoms = atoms
return ds, nil
}
func (h *pipeHandler) backupAtom(ctx context.Context, repo Repository, backup Backup, ds Dataset, atom Atom) error {
return h.shell.RunStdoutPipe(
ctx,
expandVars(h.backupCmd, backup, ds, atom),
func(stdout io.Reader) error {
return repo.BackupStream(ctx, backup, singleAtomDataset(ds, atom), stdout)
},
)
}
func (h *pipeHandler) Restore(ctx context.Context, repo Repository, backup Backup, ds Dataset, target string) error {
for _, atom := range ds.Atoms {
if err := h.restoreAtom(ctx, repo, backup, ds, atom, target); err != nil {
return err
}
}
return nil
}
func (h *pipeHandler) restoreAtom(ctx context.Context, repo Repository, backup Backup, ds Dataset, atom Atom, target string) error {
return h.shell.RunStdinPipe(
ctx,
expandVars(h.restoreCmd, backup, ds, atom),
func(stdin io.Writer) error {
return repo.RestoreStream(ctx, backup, singleAtomDataset(ds, atom), target, stdin)
},
)
}