Commit 46dcd9ea authored by ale's avatar ale

Add support for compression

Use LZ4 by default.
parent 7f59ad7f
......@@ -14,9 +14,19 @@ import (
// generates a single file on the repository, and thus it can't
// distinguish multiple atoms inside it.
type pipeHandler struct {
backupCmd, restoreCmd string
backupCmd string
restoreCmd string
compress bool
compressCmd string
decompressCmd string
}
const (
defaultCompress = false
defaultCompressCmd = "lz4c -3z - -"
defaultDecompressCmd = "lz4c -d - -"
)
func newPipeHandler(name string, params Params) (Handler, error) {
backupCmd := params.Get("backup_command")
if backupCmd == "" {
......@@ -28,16 +38,33 @@ func newPipeHandler(name string, params Params) (Handler, error) {
return nil, errors.New("restore_command not set")
}
return &pipeHandler{
backupCmd: backupCmd,
restoreCmd: restoreCmd,
}, nil
// Create the pipeHandler with defaults, which can be
// overriden from Params.
h := &pipeHandler{
backupCmd: backupCmd,
restoreCmd: restoreCmd,
compress: defaultCompress,
compressCmd: defaultCompressCmd,
decompressCmd: defaultDecompressCmd,
}
if b, ok := params.GetBool("compress"); ok {
h.compress = b
}
if s := params.Get("compress_command"); s != "" {
h.compressCmd = s
}
if s := params.Get("decompress_command"); s != "" {
h.decompressCmd = s
}
return h, nil
}
func (h *pipeHandler) BackupJob(rctx RuntimeContext, repo Repository, backup *Backup, ds *Dataset) jobs.Job {
cmd := fmt.Sprintf(
"(%s) | %s",
"(%s)%s | %s",
expandVars(h.backupCmd, backup, ds),
h.compressSuffix(),
repo.BackupStreamCmd(backup, ds),
)
return jobs.JobFunc(func(ctx context.Context) error {
......@@ -52,14 +79,29 @@ func (h *pipeHandler) RestoreJob(rctx RuntimeContext, repo Repository, backup *B
return err
}
cmd := fmt.Sprintf(
"%s | (%s)",
"%s | %s(%s)",
restoreCmd,
h.decompressPrefix(),
expandVars(h.restoreCmd, backup, ds),
)
return rctx.Shell().Run(ctx, cmd)
})
}
func (h *pipeHandler) compressSuffix() string {
if !h.compress {
return ""
}
return fmt.Sprintf(" | %s", h.compressCmd)
}
func (h *pipeHandler) decompressPrefix() string {
if !h.compress {
return ""
}
return fmt.Sprintf("%s | ", h.decompressCmd)
}
func expandVars(s string, backup *Backup, ds *Dataset) string {
return os.Expand(s, func(key string) string {
switch key {
......
......@@ -39,7 +39,7 @@ func createTempDirWithData(t *testing.T) string {
}
// nolint: gocyclo
func TestRestic(t *testing.T) {
func runResticTest(t *testing.T, tmpdir string, source *SourceSpec, restorePattern string, checkFn func(testing.TB, string)) {
// Check that we can actually run restic.
if err := checkResticVersion("restic"); err != nil {
t.Skip("can't run restic: ", err)
......@@ -47,9 +47,6 @@ func TestRestic(t *testing.T) {
store := &dummyMetadataStore{}
tmpdir := createTempDirWithData(t)
defer os.RemoveAll(tmpdir)
repoSpec := RepositorySpec{
Name: "main",
Type: "restic",
......@@ -59,39 +56,21 @@ func TestRestic(t *testing.T) {
},
}
handlerSpecs := []*HandlerSpec{
// 'file' is predefined.
&HandlerSpec{
Name: "data",
Type: "file",
},
}
sourceSpecs := []*SourceSpec{
&SourceSpec{
Name: "source1",
Handler: "data",
Schedule: "@random_every 1h",
Type: "pipe",
Params: map[string]interface{}{
"path": filepath.Join(tmpdir, "data"),
},
Datasets: []*DatasetSpec{
&DatasetSpec{
Name: "files",
Atoms: []Atom{
{
Name: "f1",
Path: "file1",
},
{
Name: "f2",
Path: "file2",
},
},
},
"backup_command": "echo data",
// The restore command also verifies the data.
"restore_command": "read row ; test \"x$$row\" = xdata",
},
},
}
queueSpec := jobs.QueueSpec{
Workers: map[string]int{"backup": 2, "restore": 1},
}
sourceSpecs := []*SourceSpec{source}
// Run the backup.
configMgr, err := NewConfigManager(&Config{
......@@ -139,12 +118,19 @@ func TestRestic(t *testing.T) {
// Now try to restore.
err = m.Restore(
context.TODO(),
&FindRequest{Pattern: "source1/*"},
&FindRequest{Pattern: restorePattern},
tmpdir+"/restore",
)
if err != nil {
t.Fatal("Restore", err)
}
if checkFn != nil {
checkFn(t, tmpdir)
}
}
func checkRestoredData(t testing.TB, tmpdir string) {
data, err := ioutil.ReadFile(filepath.Join(tmpdir, "restore", tmpdir, "data", "file1"))
if err != nil {
t.Fatalf("data/file1 has not been restored: %v", err)
......@@ -154,37 +140,46 @@ func TestRestic(t *testing.T) {
}
}
// nolint: gocyclo
func TestRestic_Stream(t *testing.T) {
// Check that we can actually run restic.
if err := checkResticVersion("restic"); err != nil {
t.Skip("can't run restic: ", err)
}
store := &dummyMetadataStore{}
func TestRestic(t *testing.T) {
tmpdir := createTempDirWithData(t)
defer os.RemoveAll(tmpdir)
repoSpec := RepositorySpec{
Name: "main",
Type: "restic",
Params: map[string]interface{}{
"uri": tmpdir + "/repo",
"password": "testpass",
},
}
handlerSpecs := []*HandlerSpec{
&HandlerSpec{
Name: "data",
Type: "pipe",
runResticTest(
t, tmpdir,
&SourceSpec{
Name: "source1",
Handler: "file",
Schedule: "@random_every 1h",
Params: map[string]interface{}{
"backup_command": "echo data",
"restore_command": "read row ; test \"x$$row\" = xdata",
"path": filepath.Join(tmpdir, "data"),
},
Datasets: []*DatasetSpec{
&DatasetSpec{
Name: "files",
Atoms: []Atom{
{
Name: "f1",
Path: "file1",
},
{
Name: "f2",
Path: "file2",
},
},
},
},
},
}
sourceSpecs := []*SourceSpec{
"source1/*",
checkRestoredData,
)
}
func TestRestic_Stream(t *testing.T) {
tmpdir := createTempDirWithData(t)
defer os.RemoveAll(tmpdir)
runResticTest(
t, tmpdir,
&SourceSpec{
Name: "source1",
Handler: "data",
......@@ -200,61 +195,36 @@ func TestRestic_Stream(t *testing.T) {
},
},
},
}
queueSpec := jobs.QueueSpec{
Workers: map[string]int{"backup": 2, "restore": 1},
}
// Run the backup.
configMgr, err := NewConfigManager(&Config{
Queue: queueSpec,
Repository: repoSpec,
HandlerSpecs: handlerSpecs,
SourceSpecs: sourceSpecs,
})
if err != nil {
t.Fatal(err)
}
defer configMgr.Close()
m, err := NewManager(context.TODO(), configMgr, store)
if err != nil {
t.Fatal(err)
}
defer m.Close()
backup, err := m.Backup(context.TODO(), configMgr.getSourceSpecs()[0])
if err != nil {
t.Fatal(err)
}
if backup.ID == "" || backup.Host == "" {
t.Fatalf("empty fields in backup: %+v", backup)
}
"source1/*",
nil,
)
}
// Check the 'restic snapshots' output.
output, err := exec.Command("env", "RESTIC_REPOSITORY=", "RESTIC_PASSWORD_FILE=", "RESTIC_PASSWORD=testpass", "restic", "-r", tmpdir+"/repo", "snapshots", "--json").Output()
if err != nil {
t.Fatalf("'restic snapshots' failed: %v", err)
}
snaps, err := parseResticSnapshots(output)
if err != nil {
t.Fatalf("parsing restic snaphots output: %v, output:\n%s", err, string(output))
}
if len(snaps) != 1 {
t.Fatalf("wrong number of snapshots: %+v", snaps)
}
snap := snaps[0]
if len(snap.Tags) != 2 {
t.Fatalf("woops, bad number of tags: %+v", snap)
}
func TestRestic_Stream_Compress(t *testing.T) {
tmpdir := createTempDirWithData(t)
defer os.RemoveAll(tmpdir)
// Now try to restore.
err = m.Restore(
context.TODO(),
&FindRequest{Pattern: "source1/*"},
tmpdir+"/restore",
runResticTest(
t, tmpdir,
&SourceSpec{
Name: "source1",
Handler: "data",
Schedule: "@random_every 1h",
Datasets: []*DatasetSpec{
&DatasetSpec{
Name: "f1",
Atoms: []Atom{
{
Name: "f1",
},
},
},
},
Params: map[string]interface{}{
"compress": true,
},
},
"source1/*",
nil,
)
if err != nil {
t.Fatal("Restore", err)
}
}
......@@ -22,6 +22,22 @@ func (p Params) Get(key string) string {
return ""
}
// GetBool returns a boolean value for a parameter (may be a string).
// Returns value and presence.
func (p Params) GetBool(key string) (bool, bool) {
if b, ok := p[key].(bool); ok {
return b, true
}
if s, ok := p[key].(string); ok {
switch strings.ToLower(s) {
case "on", "yes", "true", "1":
return true, true
}
return false, true
}
return false, false
}
// Backup is the over-arching entity describing a high level backup
// operation. Backups are initiated autonomously by individual hosts,
// so each Backup belongs to a single Host.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment