Commit 3d39ca56 authored by ale's avatar ale

Small refactor of handler/manager

Handlers can now modify Datasets on-the-fly and influence the
structure of the generated backup jobs. Backup job creation is also
delegated to Handlers.
parent 14cf3f68
......@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"path/filepath"
"regexp"
"strings"
"time"
......@@ -57,12 +58,20 @@ type Dataset struct {
Atoms []Atom `json:"atoms"`
}
// Directory metadata. A bunch of files that are copied or restored
// together. A Directory contains implicitly all of its subdirectories.
//
// Actual datasets are identified by symbolic names called *tags*.
type Directory struct {
Path string `json:"path"`
func makeSingleAtomDataset(ds Dataset, atom Atom) Dataset {
return Dataset{
Name: filepath.Join(ds.Name, atom.Name),
Handler: ds.Handler,
Atoms: []Atom{atom},
}
}
func normalizeDataset(ds Dataset) Dataset {
// If the Dataset has no atoms, add an empty one.
if len(ds.Atoms) == 0 {
ds.Atoms = []Atom{Atom{}}
}
return ds
}
// FindRequest specifies search criteria for atoms.
......@@ -107,8 +116,10 @@ type MetadataStore interface {
// Handler can backup and restore a specific class of datasets.
type Handler interface {
Backup(context.Context, Repository, Backup, Dataset) (Dataset, error)
Restore(context.Context, Repository, Backup, Dataset, string) error
DatasetsForBackup(Dataset) []Dataset
DatasetsForRestore(Dataset) []Dataset
BackupJob(Repository, Backup, Dataset) jobs.Job
RestoreJob(Repository, Backup, Dataset, string) jobs.Job
Spec() HandlerSpec
}
......
......@@ -127,8 +127,8 @@ func TestBackup(t *testing.T) {
Name: "dbpipe",
Type: "pipe",
Params: map[string]interface{}{
"backup_cmd": "echo ${backup.id} ${ds.name} ${atom.name}",
"restore_cmd": "cat",
"backup_command": "echo ${backup.id} ${ds.name} ${atom.name}",
"restore_command": "cat",
},
},
}
......
......@@ -4,6 +4,8 @@ import (
"context"
"errors"
"path/filepath"
"git.autistici.org/ale/tabacco/jobs"
)
type fileHandler struct {
......@@ -23,17 +25,33 @@ func (h *fileHandler) Spec() HandlerSpec {
return h.spec
}
func (h *fileHandler) Backup(ctx context.Context, repo Repository, backup Backup, ds Dataset) (Dataset, error) {
// Set SourcePaths for all atoms.
func (h *fileHandler) DatasetsForBackup(ds Dataset) []Dataset {
// Set SourcePath on all atoms.
var atoms []Atom
for _, atom := range ds.Atoms {
atom.SourcePath = filepath.Join(h.path, atom.RelativePath)
relPath := atom.RelativePath
if relPath == "" {
relPath = atom.Name
}
atom.SourcePath = filepath.Join(h.path, relPath)
atoms = append(atoms, atom)
}
ds.Atoms = atoms
return ds, repo.Backup(ctx, backup, ds, h.path /* UNUSED */)
return []Dataset{ds}
}
func (h *fileHandler) BackupJob(repo Repository, backup Backup, ds Dataset) jobs.Job {
return jobs.JobFunc(func(ctx context.Context) error {
return repo.Backup(ctx, backup, ds, h.path /* UNUSED */)
})
}
func (h *fileHandler) DatasetsForRestore(ds Dataset) []Dataset {
return []Dataset{ds}
}
func (h *fileHandler) Restore(ctx context.Context, repo Repository, backup Backup, ds Dataset, target string) error {
return repo.Restore(ctx, backup, ds, target)
func (h *fileHandler) RestoreJob(repo Repository, backup Backup, ds Dataset, target string) jobs.Job {
return jobs.JobFunc(func(ctx context.Context) error {
return repo.Restore(ctx, backup, ds, target)
})
}
......@@ -6,6 +6,8 @@ import (
"io"
"os"
"path/filepath"
"git.autistici.org/ale/tabacco/jobs"
)
// The pipeHandler must work on a 1:1 dataset/atom mapping, because it
......@@ -18,13 +20,13 @@ type pipeHandler struct {
}
func newPipeHandler(spec HandlerSpec, shell *Shell) (Handler, error) {
backupCmd, ok := spec.Params["backup_cmd"].(string)
backupCmd, ok := spec.Params["backup_command"].(string)
if !ok || backupCmd == "" {
return nil, errors.New("missing backup_cmd")
return nil, errors.New("missing backup_command")
}
restoreCmd, ok := spec.Params["restore_cmd"].(string)
restoreCmd, ok := spec.Params["restore_command"].(string)
if !ok || restoreCmd == "" {
return nil, errors.New("missing restore_cmd")
return nil, errors.New("missing restore_command")
}
return &pipeHandler{
spec: spec,
......@@ -38,17 +40,23 @@ func (h *pipeHandler) Spec() HandlerSpec {
return h.spec
}
func (h *pipeHandler) Backup(ctx context.Context, repo Repository, backup Backup, ds Dataset) (Dataset, error) {
var atoms []Atom
func (h *pipeHandler) DatasetsForBackup(ds Dataset) []Dataset {
var dsl []Dataset
for _, atom := range ds.Atoms {
atom.SourcePath = filepath.Join(ds.Name, atom.Name)
if err := h.backupAtom(ctx, repo, backup, ds, atom); err != nil {
return ds, err
}
atoms = append(atoms, atom)
dsl = append(dsl, makeSingleAtomDataset(ds, atom))
}
return dsl
}
func (h *pipeHandler) BackupJob(repo Repository, backup Backup, ds Dataset) jobs.Job {
if len(ds.Atoms) > 1 {
panic("more than 1 atom in pipe source")
}
ds.Atoms = atoms
return ds, nil
return jobs.JobFunc(func(ctx context.Context) error {
return h.backupAtom(ctx, repo, backup, ds, ds.Atoms[0])
})
}
func (h *pipeHandler) backupAtom(ctx context.Context, repo Repository, backup Backup, ds Dataset, atom Atom) error {
......@@ -61,13 +69,24 @@ func (h *pipeHandler) backupAtom(ctx context.Context, repo Repository, backup Ba
)
}
func (h *pipeHandler) Restore(ctx context.Context, repo Repository, backup Backup, ds Dataset, target string) error {
func (h *pipeHandler) DatasetsForRestore(ds Dataset) []Dataset {
var dsl []Dataset
for _, atom := range ds.Atoms {
if err := h.restoreAtom(ctx, repo, backup, ds, atom, target); err != nil {
return err
}
dsl = append(dsl, makeSingleAtomDataset(ds, atom))
}
return dsl
}
func (h *pipeHandler) RestoreJob(repo Repository, backup Backup, ds Dataset, target string) jobs.Job {
var restoreJobs []jobs.Job
for _, atom := range ds.Atoms {
func(atom Atom) {
restoreJobs = append(restoreJobs, jobs.JobFunc(func(ctx context.Context) error {
return h.restoreAtom(ctx, repo, backup, ds, atom, target)
}))
}(atom)
}
return nil
return jobs.AsyncGroup(restoreJobs)
}
func (h *pipeHandler) restoreAtom(ctx context.Context, repo Repository, backup Backup, ds Dataset, atom Atom, target string) error {
......
package tabacco
import (
"context"
"errors"
)
type mysqlHandler struct {
spec HandlerSpec
addr string
user string
password string
}
func newMySQLHandler(spec HandlerSpec, _ *Shell) (Handler, error) {
addr, ok := spec.Params["host"].(string)
if !ok || addr == "" {
return nil, errors.New("missing host")
}
user, _ := spec.Params["user"].(string)
password, _ := spec.Params["password"].(string)
return &mysqlHandler{
addr: addr,
user: user,
password: password,
}, nil
}
func (h *mysqlHandler) Spec() HandlerSpec {
return h.spec
}
func (h *mysqlHandler) Backup(ctx context.Context, repo Repository, backup Backup, ds Dataset) (Dataset, error) {
return ds, nil
}
func (h *mysqlHandler) Restore(ctx context.Context, repo Repository, backup Backup, ds Dataset, target string) error {
return nil
}
......@@ -28,8 +28,6 @@ func (spec *HandlerSpec) Parse(shell *Shell) (Handler, error) {
return newFileHandler(*spec, shell)
case "pipe":
return newPipeHandler(*spec, shell)
case "mysql":
return newMySQLHandler(*spec, shell)
default:
return nil, fmt.Errorf("%s: unknown handler type '%s'", spec.Name, spec.Type)
}
......
......@@ -72,7 +72,7 @@ func TestScheduler(t *testing.T) {
s.SetSchedule(sched)
time.Sleep(4 * time.Second)
status := s.getStatus()
status := s.GetStatus()
s.Stop()
src1n := jc.counter["job1"]
......
......@@ -2,7 +2,6 @@ package tabacco
import (
"context"
"errors"
"fmt"
"log"
"os"
......@@ -41,6 +40,32 @@ func (m *tabaccoManager) Close() error {
return nil
}
type metadataJob struct {
jobs.Job
ms MetadataStore
backup Backup
ds Dataset
}
func (j *metadataJob) RunContext(ctx context.Context) error {
err := j.Job.RunContext(ctx)
if err == nil {
if merr := j.ms.AddDataset(ctx, j.backup, j.ds); merr != nil {
log.Printf("%s: error saving metadata: %v", j.ds.Name, merr)
}
}
return err
}
func (m *tabaccoManager) withMetadata(j jobs.Job, backup Backup, ds Dataset) jobs.Job {
return &metadataJob{
Job: j,
ms: m.ms,
backup: backup,
ds: ds,
}
}
// Prepare the repository for a new backup. This is a synchronous
// operation: we need to wait for it to complete to avoid running the
// backup tasks too soon.
......@@ -56,39 +81,32 @@ func (m *tabaccoManager) prepareBackupJob(backup Backup) jobs.Job {
})
}
func (m *tabaccoManager) doBackupDataset(ctx context.Context, backup Backup, ds Dataset) error {
h, ok := m.configMgr.getHandler(ds.Handler)
if !ok {
return fmt.Errorf("%s: unknown handler '%s'", ds.Name, ds.Handler)
}
var err error
ds, err = h.Backup(ctx, m.configMgr.getRepository(), backup, ds)
if err != nil {
return fmt.Errorf("%s: backup failed: %v", ds.Name, err)
}
if err = m.ms.AddDataset(ctx, backup, ds); err != nil {
return fmt.Errorf("%s: error saving metadata: %v", ds.Name, err)
}
return nil
}
func (m *tabaccoManager) backupDatasetJob(h Handler, backup Backup, ds Dataset) jobs.Job {
var out []jobs.Job
// Let Handlers modify the Dataset if necessary, or generate
// more than one.
dsl := h.DatasetsForBackup(ds)
// Run pre_backup_command.
if cmd := h.Spec().PreBackupCommand; cmd != "" {
out = append(out, m.datasetCommandJob(cmd, backup, ds))
}
// The actual backup operation. Just a thin wrapper around
// doBackupDataset() that binds together the context, backup
// and ds via the closure.
// The actual backup operation. Assemble all the backup jobs
// for the datasets in 'dsl', and add them to an AsyncGroup.
//
// TODO: get the timeout from the SourceSpec.
var backupJobs []jobs.Job
for _, realDS := range dsl {
backupJobs = append(backupJobs, m.withMetadata(
h.BackupJob(m.configMgr.getRepository(), backup, realDS),
backup,
realDS,
))
}
out = append(out, jobs.WithTimeout(
jobs.JobFunc(func(ctx context.Context) (err error) {
return m.doBackupDataset(ctx, backup, ds)
}),
jobs.AsyncGroup(backupJobs),
24*time.Hour,
))
......@@ -121,33 +139,30 @@ func (m *tabaccoManager) backupDatasetJob(h Handler, backup Backup, ds Dataset)
)
}
// BackupJob returns a Job that backs up all known sources to the
// configured destination repository.
// BackupJob returns a single Job that backs up one or more sources to
// the configured destination repository.
func (m *tabaccoManager) BackupJob(ctx context.Context, sourceSpecs []SourceSpec) (Backup, jobs.Job, error) {
// Parse the source specs and obtain Datasets. Errors here are
// logged but *not* fatal, unless there are errors and the
// list of non-erroring sources is nil.
datasets, err := parseSourceSpecs(ctx, sourceSpecs)
if err != nil {
log.Printf("errors parsing sources: %v", err)
if len(datasets) == 0 {
return Backup{}, nil, errors.New("all sources have errors")
}
}
// Create a new backup and initialize it .
backup := newBackup("")
prepJob := m.prepareBackupJob(backup)
// Run all backup tasks, scheduling them using the executor.
var backupJobs []jobs.Job
for _, ds := range datasets {
// Find the handler for this source.
h, ok := m.configMgr.getHandler(ds.Handler)
merr := new(util.MultiError)
for _, spec := range sourceSpecs {
h, ok := m.configMgr.getHandler(spec.Handler)
if !ok {
log.Printf("errors parsing source %s: unknown handler '%s'", ds.Name, ds.Handler)
return Backup{}, nil, fmt.Errorf("inconsistency: no '%s' handler", spec.Handler)
}
ds, err := spec.Parse(ctx)
if err != nil {
merr.Add(err)
continue
}
// Create the backup job and add it to our list.
backupJobs = append(backupJobs, m.backupDatasetJob(h, backup, ds))
}
......@@ -173,21 +188,11 @@ func (m *tabaccoManager) Backup(ctx context.Context, sourceSpecs []SourceSpec) (
return backup, err
}
// Create a job to restore a single dataset.
func (m *tabaccoManager) doRestoreDataset(ctx context.Context, backup Backup, ds Dataset, target string) error {
log.Printf("restoring %+v %+v", ds, backup)
h, ok := m.configMgr.getHandler(ds.Handler)
if !ok {
return fmt.Errorf("%s: unknown handler '%s'", ds.Name, ds.Handler)
}
return h.Restore(ctx, m.configMgr.getRepository(), backup, ds, target)
}
func (m *tabaccoManager) restoreDatasetJob(h Handler, backup Backup, ds Dataset, target string) jobs.Job {
var out []jobs.Job
dsl := h.DatasetsForRestore(ds)
// Run pre_restore_command.
if cmd := h.Spec().PreRestoreCommand; cmd != "" {
out = append(out, m.datasetCommandJob(cmd, backup, ds))
......@@ -196,12 +201,11 @@ func (m *tabaccoManager) restoreDatasetJob(h Handler, backup Backup, ds Dataset,
// The actual backup operation. Just a thin wrapper around
// doBackupDataset() that binds together the context, backup,
// ds and target via the closure.
out = append(out, jobs.WithTimeout(
jobs.JobFunc(func(ctx context.Context) (err error) {
return m.doRestoreDataset(ctx, backup, ds, target)
}),
24*time.Hour,
))
var restoreJobs []jobs.Job
for _, realDS := range dsl {
restoreJobs = append(restoreJobs, h.RestoreJob(m.configMgr.getRepository(), backup, realDS, target))
}
out = append(out, jobs.AsyncGroup(restoreJobs))
// Run post_restore_command.
if cmd := h.Spec().PostRestoreCommand; cmd != "" {
......
......@@ -7,8 +7,6 @@ import (
"errors"
"fmt"
"os/exec"
"git.autistici.org/ale/tabacco/util"
)
// SourceSpec defines the configuration for a data source.
......@@ -50,11 +48,11 @@ func (spec *SourceSpec) Parse(ctx context.Context) (ds Dataset, err error) {
}
}
ds = Dataset{
ds = normalizeDataset(Dataset{
Name: spec.Name,
Handler: spec.Handler,
Atoms: atoms,
}
})
return
}
......@@ -69,29 +67,12 @@ func (spec *SourceSpec) Check(handlers map[string]struct{}) error {
if _, ok := handlers[spec.Handler]; !ok {
return fmt.Errorf("unknown handler '%s'", spec.Handler)
}
if len(spec.Atoms) == 0 && spec.AtomsCommand == "" {
return errors.New("no atoms specified (either directly or via atoms_command)")
if len(spec.Atoms) > 0 && spec.AtomsCommand != "" {
return errors.New("can't specify both 'atoms' and 'atoms_command'")
}
return nil
}
// Parse multiple SourceSpec objects. Parsing can fail for each source
// independently, so it's possible that this function returns a
// non-nil Dataset list even when err != nil.
func parseSourceSpecs(ctx context.Context, sourceSpecs []SourceSpec) ([]Dataset, error) {
var datasets []Dataset
merr := new(util.MultiError)
for _, spec := range sourceSpecs {
ds, err := spec.Parse(ctx) // nolint: vetshadow
if err != nil {
merr.Add(err)
continue
}
datasets = append(datasets, ds)
}
return datasets, merr.OrNil()
}
func runAtomsCommand(ctx context.Context, cmd string) ([]Atom, error) {
c := exec.Command("/bin/sh", "-c", cmd) // #nosec
stdout, err := c.StdoutPipe()
......@@ -107,12 +88,11 @@ func runAtomsCommand(ctx context.Context, cmd string) ([]Atom, error) {
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
parts := bytes.Fields(scanner.Bytes())
atom := Atom{Name: string(parts[0])}
if len(parts) == 2 {
atoms = append(atoms, Atom{
Name: string(parts[0]),
RelativePath: string(parts[1]),
})
atom.RelativePath = string(parts[1])
}
atoms = append(atoms, atom)
}
return atoms, scanner.Err()
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment