handler_pipe.go 2.99 KB
Newer Older
ale's avatar
ale committed
1 2 3 4 5
package tabacco

import (
	"context"
	"errors"
6
	"fmt"
ale's avatar
ale committed
7
	"os"
ale's avatar
ale committed
8
	"strings"
ale's avatar
ale committed
9

ale's avatar
ale committed
10
	"git.autistici.org/ai3/tools/tabacco/jobs"
ale's avatar
ale committed
11 12 13 14 15 16
)

// The pipeHandler must work on a 1:1 dataset/atom mapping, because it
// generates a single file on the repository, and thus it can't
// distinguish multiple atoms inside it.
type pipeHandler struct {
ale's avatar
ale committed
17 18 19 20 21
	backupCmd     string
	restoreCmd    string
	compress      bool
	compressCmd   string
	decompressCmd string
ale's avatar
ale committed
22 23
}

ale's avatar
ale committed
24 25 26 27 28 29
const (
	defaultCompress      = false
	defaultCompressCmd   = "lz4c -3z - -"
	defaultDecompressCmd = "lz4c -d - -"
)

ale's avatar
ale committed
30 31 32 33
func newPipeHandler(name string, params Params) (Handler, error) {
	backupCmd := params.Get("backup_command")
	if backupCmd == "" {
		return nil, errors.New("backup_command not set")
ale's avatar
ale committed
34
	}
ale's avatar
ale committed
35 36 37 38

	restoreCmd := params.Get("restore_command")
	if restoreCmd == "" {
		return nil, errors.New("restore_command not set")
ale's avatar
ale committed
39
	}
ale's avatar
ale committed
40

ale's avatar
ale committed
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
	// Create the pipeHandler with defaults, which can be
	// overriden from Params.
	h := &pipeHandler{
		backupCmd:     backupCmd,
		restoreCmd:    restoreCmd,
		compress:      defaultCompress,
		compressCmd:   defaultCompressCmd,
		decompressCmd: defaultDecompressCmd,
	}
	if b, ok := params.GetBool("compress"); ok {
		h.compress = b
	}
	if s := params.Get("compress_command"); s != "" {
		h.compressCmd = s
	}
	if s := params.Get("decompress_command"); s != "" {
		h.decompressCmd = s
	}

	return h, nil
ale's avatar
ale committed
61 62
}

ale's avatar
ale committed
63 64
func (h *pipeHandler) BackupJob(rctx RuntimeContext, repo Repository, backup *Backup, ds *Dataset) jobs.Job {
	cmd := fmt.Sprintf(
ale's avatar
ale committed
65
		"(%s)%s | %s",
ale's avatar
ale committed
66
		expandVars(h.backupCmd, backup, ds),
ale's avatar
ale committed
67
		h.compressSuffix(),
ale's avatar
ale committed
68 69
		repo.BackupStreamCmd(backup, ds),
	)
ale's avatar
ale committed
70
	return jobs.JobFunc(func(ctx context.Context) error {
ale's avatar
ale committed
71
		return rctx.Shell().Run(ctx, cmd)
ale's avatar
ale committed
72
	})
ale's avatar
ale committed
73 74
}

ale's avatar
ale committed
75 76 77 78 79 80 81
func (h *pipeHandler) RestoreJob(rctx RuntimeContext, repo Repository, backup *Backup, ds *Dataset, target string) jobs.Job {
	return jobs.JobFunc(func(ctx context.Context) error {
		restoreCmd, err := repo.RestoreStreamCmd(ctx, rctx, backup, ds, getWorkDir(ctx))
		if err != nil {
			return err
		}
		cmd := fmt.Sprintf(
ale's avatar
ale committed
82
			"%s | %s(%s)",
ale's avatar
ale committed
83
			restoreCmd,
ale's avatar
ale committed
84
			h.decompressPrefix(),
ale's avatar
ale committed
85 86 87 88
			expandVars(h.restoreCmd, backup, ds),
		)
		return rctx.Shell().Run(ctx, cmd)
	})
ale's avatar
ale committed
89 90
}

ale's avatar
ale committed
91 92 93 94 95 96 97 98 99 100 101 102 103 104
func (h *pipeHandler) compressSuffix() string {
	if !h.compress {
		return ""
	}
	return fmt.Sprintf(" | %s", h.compressCmd)
}

func (h *pipeHandler) decompressPrefix() string {
	if !h.compress {
		return ""
	}
	return fmt.Sprintf("%s | ", h.decompressCmd)
}

ale's avatar
ale committed
105
func expandVars(s string, backup *Backup, ds *Dataset) string {
ale's avatar
ale committed
106 107
	return os.Expand(s, func(key string) string {
		switch key {
ale's avatar
ale committed
108 109
		case "$":
			return key
ale's avatar
ale committed
110 111 112 113
		case "backup.id":
			return backup.ID
		case "ds.name":
			return ds.Name
ale's avatar
ale committed
114 115 116 117 118 119 120 121 122 123 124 125
		case "atom.names":
			names := make([]string, 0, len(ds.Atoms))
			for _, a := range ds.Atoms {
				names = append(names, a.Name)
			}
			return strings.Join(names, " ")
		case "atom.paths":
			paths := make([]string, 0, len(ds.Atoms))
			for _, a := range ds.Atoms {
				paths = append(paths, a.Path)
			}
			return strings.Join(paths, " ")
ale's avatar
ale committed
126 127 128 129 130
		default:
			return os.Getenv(key)
		}
	})
}