Commit 9f419b84 authored by ale's avatar ale
Browse files

Refactor Repository to handle command execution

Command execution logic moves from the Handler type to Repository, to
allow us to introduce Repository-owned command-scoped contexts later.
parent ef39ae15
......@@ -61,9 +61,7 @@ func (h *fileHandler) BackupJob(rctx RuntimeContext, backup *Backup, ds *Dataset
}
// Invoke the backup command (path-based).
repo := rctx.Repo()
cmd := repo.BackupCmd(backup, ds, tmpf, h.exclude)
return repo.RunBackup(ctx, rctx.Shell(), backup, ds, cmd)
return rctx.Repo().RunBackup(ctx, rctx.Shell(), backup, ds, tmpf, h.exclude)
})
}
......@@ -77,10 +75,6 @@ func (h *fileHandler) RestoreJob(rctx RuntimeContext, backup *Backup, ds *Datase
// Call the repo Restore method.
return jobs.JobFunc(func(ctx context.Context) error {
cmd, err := rctx.Repo().RestoreCmd(ctx, rctx, backup, ds, paths, target)
if err != nil {
return err
}
return rctx.Shell().Run(ctx, cmd)
return rctx.Repo().RunRestore(ctx, rctx.Shell(), backup, ds, paths, target)
})
}
......@@ -3,7 +3,6 @@ package tabacco
import (
"context"
"errors"
"fmt"
"os"
"strings"
......@@ -61,46 +60,29 @@ func newPipeHandler(name string, params Params) (Handler, error) {
}
func (h *pipeHandler) BackupJob(rctx RuntimeContext, backup *Backup, ds *Dataset) jobs.Job {
repo := rctx.Repo()
cmd := fmt.Sprintf(
"(%s)%s | %s",
expandVars(h.backupCmd, backup, ds),
h.compressSuffix(),
repo.BackupStreamCmd(backup, ds),
)
return jobs.JobFunc(func(ctx context.Context) error {
return repo.RunBackup(ctx, rctx.Shell(), backup, ds, cmd)
return rctx.Repo().RunStreamBackup(ctx, rctx.Shell(), backup, ds, h.backupCmd, h.compressCommand())
})
}
func (h *pipeHandler) RestoreJob(rctx RuntimeContext, backup *Backup, ds *Dataset, target string) jobs.Job {
return jobs.JobFunc(func(ctx context.Context) error {
restoreCmd, err := rctx.Repo().RestoreStreamCmd(ctx, rctx, backup, ds, getWorkDir(ctx))
if err != nil {
return err
}
cmd := fmt.Sprintf(
"%s | %s(%s)",
restoreCmd,
h.decompressPrefix(),
expandVars(h.restoreCmd, backup, ds),
)
return rctx.Shell().Run(ctx, cmd)
return rctx.Repo().RunStreamRestore(ctx, rctx.Shell(), backup, ds, h.restoreCmd, h.decompressCommand())
})
}
func (h *pipeHandler) compressSuffix() string {
func (h *pipeHandler) compressCommand() string {
if !h.compress {
return ""
}
return fmt.Sprintf(" | %s", h.compressCmd)
return h.compressCmd
}
func (h *pipeHandler) decompressPrefix() string {
func (h *pipeHandler) decompressCommand() string {
if !h.compress {
return ""
}
return fmt.Sprintf("%s | ", h.decompressCmd)
return h.decompressCmd
}
func expandVars(s string, backup *Backup, ds *Dataset) string {
......
......@@ -168,7 +168,7 @@ func (r *resticRepository) cacheArgs(ds *Dataset) string {
return fmt.Sprintf("--cache-dir %s", dir)
}
func (r *resticRepository) BackupCmd(backup *Backup, ds *Dataset, inputFile string, exclude []string) string {
func (r *resticRepository) backupCmd(backup *Backup, ds *Dataset, inputFile string, exclude []string) string {
cmd := fmt.Sprintf(
"%s backup %s --json --exclude-caches --one-file-system %s --files-from %s",
r.resticCmd(),
......@@ -182,13 +182,13 @@ func (r *resticRepository) BackupCmd(backup *Backup, ds *Dataset, inputFile stri
return cmd
}
func (r *resticRepository) getSnapshotID(ctx context.Context, rctx RuntimeContext, backup *Backup, ds *Dataset) (string, error) {
func (r *resticRepository) getSnapshotID(ctx context.Context, shell *Shell, backup *Backup, ds *Dataset) (string, error) {
if ds.SnapshotID != "" {
return ds.SnapshotID, nil
}
// Legacy compatibility: query restic using the dataset ID.
data, err := rctx.Shell().Output(ctx, fmt.Sprintf(
data, err := shell.Output(ctx, fmt.Sprintf(
"%s snapshots %s --no-lock --json %s",
r.resticCmd(),
r.cacheArgs(ds),
......@@ -207,8 +207,8 @@ func (r *resticRepository) getSnapshotID(ctx context.Context, rctx RuntimeContex
return snaps[0].ShortID, nil
}
func (r *resticRepository) RestoreCmd(ctx context.Context, rctx RuntimeContext, backup *Backup, ds *Dataset, paths []string, target string) (string, error) {
snap, err := r.getSnapshotID(ctx, rctx, backup, ds)
func (r *resticRepository) restoreCmd(ctx context.Context, shell *Shell, backup *Backup, ds *Dataset, paths []string, target string) (string, error) {
snap, err := r.getSnapshotID(ctx, shell, backup, ds)
if err != nil {
return "", err
}
......@@ -243,7 +243,7 @@ func datasetStdinPath(ds *Dataset) string {
return fmt.Sprintf("/STDIN_%s", dsPath)
}
func (r *resticRepository) BackupStreamCmd(backup *Backup, ds *Dataset) string {
func (r *resticRepository) backupStreamCmd(backup *Backup, ds *Dataset) string {
fakePath := datasetStdinPath(ds)
return fmt.Sprintf(
// The --force prevents restic from trying to find a previous snapshot.
......@@ -255,8 +255,8 @@ func (r *resticRepository) BackupStreamCmd(backup *Backup, ds *Dataset) string {
)
}
func (r *resticRepository) RestoreStreamCmd(ctx context.Context, rctx RuntimeContext, backup *Backup, ds *Dataset, target string) (string, error) {
snap, err := r.getSnapshotID(ctx, rctx, backup, ds)
func (r *resticRepository) restoreStreamCmd(ctx context.Context, shell *Shell, backup *Backup, ds *Dataset, target string) (string, error) {
snap, err := r.getSnapshotID(ctx, shell, backup, ds)
if err != nil {
return "", err
}
......@@ -274,6 +274,49 @@ func (r *resticRepository) RestoreStreamCmd(ctx context.Context, rctx RuntimeCon
), nil
}
func (r *resticRepository) RunBackup(ctx context.Context, shell *Shell, backup *Backup, ds *Dataset, inputFile string, exclude []string) error {
cmd := r.backupCmd(backup, ds, inputFile, exclude)
return r.runBackupCmd(ctx, shell, backup, ds, cmd)
}
func (r *resticRepository) RunStreamBackup(ctx context.Context, shell *Shell, backup *Backup, ds *Dataset, backupCmd, compressCmd string) error {
// Concatenate the backupCmd with restic (and an optional
// compression command) to build a shell pipeline.
pipe := []string{
fmt.Sprintf("(%s)", expandVars(backupCmd, backup, ds)),
}
if compressCmd != "" {
pipe = append(pipe, compressCmd)
}
pipe = append(pipe, r.backupStreamCmd(backup, ds))
cmd := strings.Join(pipe, " | ")
return r.runBackupCmd(ctx, shell, backup, ds, cmd)
}
func (r *resticRepository) RunRestore(ctx context.Context, shell *Shell, backup *Backup, ds *Dataset, paths []string, target string) error {
cmd, err := r.restoreCmd(ctx, shell, backup, ds, paths, target)
if err != nil {
return err
}
return shell.Run(ctx, cmd)
}
func (r *resticRepository) RunStreamRestore(ctx context.Context, shell *Shell, backup *Backup, ds *Dataset, restoreCmd, decompressCmd string) error {
resticCmd, err := r.restoreStreamCmd(ctx, shell, backup, ds, getWorkDir(ctx))
if err != nil {
return err
}
pipe := []string{resticCmd}
if decompressCmd != "" {
pipe = append(pipe, decompressCmd)
}
pipe = append(pipe, fmt.Sprintf("(%s)", expandVars(restoreCmd, backup, ds)))
cmd := strings.Join(pipe, " | ")
return shell.Run(ctx, cmd)
}
// Global map that keeps track of the currently running restic
// processes and their respective progress, for debugging purposes.
type activeBackupStatus struct {
......@@ -366,8 +409,10 @@ type resticMessage struct {
resticSummaryMessage
}
// Scan the output of 'restic backup' for the snapshot ID. Modifies backup/dataset objects.
func (r *resticRepository) RunBackup(ctx context.Context, shell *Shell, backup *Backup, ds *Dataset, cmd string) error {
// Run a backup command. The output of 'restic backup' is examined for
// progress and to obtain the snapshot ID. Modifies the provided
// backup/dataset objects.
func (r *resticRepository) runBackupCmd(ctx context.Context, shell *Shell, backup *Backup, ds *Dataset, cmd string) error {
return active.WithResticStatus(jobs.GetID(ctx), backup, ds, func(progressCh chan *resticStatusMessage) error {
return shell.RunWithStdoutCallback(ctx, cmd, func(line []byte) {
var msg resticMessage
......
......@@ -147,11 +147,13 @@ type Handler interface {
// Repository is the interface to a remote repository.
type Repository interface {
Init(context.Context, RuntimeContext) error
BackupCmd(*Backup, *Dataset, string, []string) string
RestoreCmd(context.Context, RuntimeContext, *Backup, *Dataset, []string, string) (string, error)
BackupStreamCmd(*Backup, *Dataset) string
RestoreStreamCmd(context.Context, RuntimeContext, *Backup, *Dataset, string) (string, error)
RunBackup(context.Context, *Shell, *Backup, *Dataset, string) error
RunBackup(context.Context, *Shell, *Backup, *Dataset, string, []string) error
RunStreamBackup(context.Context, *Shell, *Backup, *Dataset, string, string) error
RunRestore(context.Context, *Shell, *Backup, *Dataset, []string, string) error
RunStreamRestore(context.Context, *Shell, *Backup, *Dataset, string, string) error
Close() error
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment