handler_pipe.go 2.08 KB
Newer Older
ale's avatar
ale committed
1 2 3 4 5
package tabacco

import (
	"context"
	"errors"
6
	"fmt"
ale's avatar
ale committed
7
	"os"
ale's avatar
ale committed
8
	"strings"
ale's avatar
ale committed
9

ale's avatar
ale committed
10
	"git.autistici.org/ai3/tools/tabacco/jobs"
ale's avatar
ale committed
11 12 13 14 15 16 17 18 19
)

// The pipeHandler must work on a 1:1 dataset/atom mapping, because it
// generates a single file on the repository, and thus it can't
// distinguish multiple atoms inside it.
type pipeHandler struct {
	backupCmd, restoreCmd string
}

ale's avatar
ale committed
20 21 22 23
func newPipeHandler(name string, params Params) (Handler, error) {
	backupCmd := params.Get("backup_command")
	if backupCmd == "" {
		return nil, errors.New("backup_command not set")
ale's avatar
ale committed
24
	}
ale's avatar
ale committed
25 26 27 28

	restoreCmd := params.Get("restore_command")
	if restoreCmd == "" {
		return nil, errors.New("restore_command not set")
ale's avatar
ale committed
29
	}
ale's avatar
ale committed
30

ale's avatar
ale committed
31 32 33 34 35 36
	return &pipeHandler{
		backupCmd:  backupCmd,
		restoreCmd: restoreCmd,
	}, nil
}

ale's avatar
ale committed
37 38 39 40 41 42
func (h *pipeHandler) BackupJob(rctx RuntimeContext, repo Repository, backup *Backup, ds *Dataset) jobs.Job {
	cmd := fmt.Sprintf(
		"(%s) | %s",
		expandVars(h.backupCmd, backup, ds),
		repo.BackupStreamCmd(backup, ds),
	)
ale's avatar
ale committed
43
	return jobs.JobFunc(func(ctx context.Context) error {
ale's avatar
ale committed
44
		return rctx.Shell().Run(ctx, cmd)
ale's avatar
ale committed
45
	})
ale's avatar
ale committed
46 47
}

ale's avatar
ale committed
48 49 50 51 52 53 54 55 56 57 58 59 60
func (h *pipeHandler) RestoreJob(rctx RuntimeContext, repo Repository, backup *Backup, ds *Dataset, target string) jobs.Job {
	return jobs.JobFunc(func(ctx context.Context) error {
		restoreCmd, err := repo.RestoreStreamCmd(ctx, rctx, backup, ds, getWorkDir(ctx))
		if err != nil {
			return err
		}
		cmd := fmt.Sprintf(
			"%s | (%s)",
			restoreCmd,
			expandVars(h.restoreCmd, backup, ds),
		)
		return rctx.Shell().Run(ctx, cmd)
	})
ale's avatar
ale committed
61 62
}

ale's avatar
ale committed
63
func expandVars(s string, backup *Backup, ds *Dataset) string {
ale's avatar
ale committed
64 65
	return os.Expand(s, func(key string) string {
		switch key {
ale's avatar
ale committed
66 67
		case "$":
			return key
ale's avatar
ale committed
68 69 70 71
		case "backup.id":
			return backup.ID
		case "ds.name":
			return ds.Name
ale's avatar
ale committed
72 73 74 75 76 77 78 79 80 81 82 83
		case "atom.names":
			names := make([]string, 0, len(ds.Atoms))
			for _, a := range ds.Atoms {
				names = append(names, a.Name)
			}
			return strings.Join(names, " ")
		case "atom.paths":
			paths := make([]string, 0, len(ds.Atoms))
			for _, a := range ds.Atoms {
				paths = append(paths, a.Path)
			}
			return strings.Join(paths, " ")
ale's avatar
ale committed
84 85 86 87 88
		default:
			return os.Getenv(key)
		}
	})
}