diff --git a/repository_restic.go b/repository_restic.go index c721531081aaee40e312fd4dbafc3e87c8f20411..65a0e312f59828d00fe5cb66756e6cdd4e6a8663 100644 --- a/repository_restic.go +++ b/repository_restic.go @@ -27,7 +27,7 @@ type resticRepository struct { excludes []string excludeFiles []string autoPrune bool - cacheDirPath string + cacheMgr cacheManager initialized sync.Once } @@ -106,6 +106,11 @@ func newResticRepository(params Params) (Repository, error) { return nil, err } + var cmgr cacheManager = new(nullCacheManager) + if s := params.Get("cache_dir"); s != "" { + cmgr = newResticCacheManager(s) + } + autoPrune := true if b, ok := params.GetBool("autoprune"); ok { autoPrune = b @@ -117,7 +122,7 @@ func newResticRepository(params Params) (Repository, error) { excludes: params.GetList("exclude"), excludeFiles: params.GetList("exclude_files"), autoPrune: autoPrune, - cacheDirPath: params.Get("cache_dir"), + cacheMgr: cmgr, }, nil } @@ -154,25 +159,11 @@ func resticBackupTags(backup *Backup, ds *Dataset) string { return fmt.Sprintf("--tag dataset_id=%s --tag backup_id=%s --tag dataset_source=%s", ds.ID, backup.ID, ds.Source) } -func (r *resticRepository) cacheArgs(ds *Dataset) string { - if r.cacheDirPath == "" { - return "--no-cache" - } - dir := filepath.Join(r.cacheDirPath, strings.Replace(ds.Source, "/", "_", -1)) - if _, err := os.Stat(dir); os.IsNotExist(err) { - if err := os.MkdirAll(dir, 0700); err != nil { - log.Printf("warning: could not create cache directory (%s), running without a local cache", err) - return "--no-cache" - } - } - return fmt.Sprintf("--cache-dir %s", dir) -} - -func (r *resticRepository) backupCmd(backup *Backup, ds *Dataset, inputFile string, exclude []string) string { +func (r *resticRepository) backupCmd(cc cache, backup *Backup, ds *Dataset, inputFile string, exclude []string) string { cmd := fmt.Sprintf( "%s backup %s --json --exclude-caches --one-file-system %s --files-from %s", r.resticCmd(), - r.cacheArgs(ds), + cc.Args(), resticBackupTags(backup, ds), inputFile, ) @@ -182,7 +173,7 @@ func (r *resticRepository) backupCmd(backup *Backup, ds *Dataset, inputFile stri return cmd } -func (r *resticRepository) getSnapshotID(ctx context.Context, shell *Shell, backup *Backup, ds *Dataset) (string, error) { +func (r *resticRepository) getSnapshotID(ctx context.Context, cc cache, shell *Shell, backup *Backup, ds *Dataset) (string, error) { if ds.SnapshotID != "" { return ds.SnapshotID, nil } @@ -191,7 +182,7 @@ func (r *resticRepository) getSnapshotID(ctx context.Context, shell *Shell, back data, err := shell.Output(ctx, fmt.Sprintf( "%s snapshots %s --no-lock --json %s", r.resticCmd(), - r.cacheArgs(ds), + cc.Args(), resticBackupTags(backup, ds), )) if err != nil { @@ -207,8 +198,8 @@ func (r *resticRepository) getSnapshotID(ctx context.Context, shell *Shell, back return snaps[0].ShortID, nil } -func (r *resticRepository) restoreCmd(ctx context.Context, shell *Shell, backup *Backup, ds *Dataset, paths []string, target string) (string, error) { - snap, err := r.getSnapshotID(ctx, shell, backup, ds) +func (r *resticRepository) restoreCmd(ctx context.Context, cc cache, shell *Shell, backup *Backup, ds *Dataset, paths []string, target string) (string, error) { + snap, err := r.getSnapshotID(ctx, cc, shell, backup, ds) if err != nil { return "", err } @@ -222,6 +213,7 @@ func (r *resticRepository) restoreCmd(ctx context.Context, shell *Shell, backup } cmd = append(cmd, fmt.Sprintf("--target %s", target)) + cmd = append(cmd, cc.Args()) cmd = append(cmd, snap) return strings.Join(cmd, " "), nil } @@ -243,20 +235,20 @@ func datasetStdinPath(ds *Dataset) string { return fmt.Sprintf("/STDIN_%s", dsPath) } -func (r *resticRepository) backupStreamCmd(backup *Backup, ds *Dataset) string { +func (r *resticRepository) backupStreamCmd(cc cache, backup *Backup, ds *Dataset) string { fakePath := datasetStdinPath(ds) return fmt.Sprintf( // The --force prevents restic from trying to find a previous snapshot. "%s backup %s %s --json --force --stdin --stdin-filename %s", r.resticCmd(), - r.cacheArgs(ds), + cc.Args(), resticBackupTags(backup, ds), fakePath, ) } -func (r *resticRepository) restoreStreamCmd(ctx context.Context, shell *Shell, backup *Backup, ds *Dataset, target string) (string, error) { - snap, err := r.getSnapshotID(ctx, shell, backup, ds) +func (r *resticRepository) restoreStreamCmd(ctx context.Context, cc cache, shell *Shell, backup *Backup, ds *Dataset, target string) (string, error) { + snap, err := r.getSnapshotID(ctx, cc, shell, backup, ds) if err != nil { return "", err } @@ -266,8 +258,9 @@ func (r *resticRepository) restoreStreamCmd(ctx context.Context, shell *Shell, b // Restore the file to a temporary directory, then pipe it. return fmt.Sprintf( - "(%s restore --target %s %s 1>&2 && cat %s)", + "(%s restore %s --target %s %s 1>&2 && cat %s)", r.resticCmd(), + cc.Args(), target, snap, filepath.Join(target, targetPath), @@ -275,11 +268,23 @@ func (r *resticRepository) restoreStreamCmd(ctx context.Context, shell *Shell, b } func (r *resticRepository) RunBackup(ctx context.Context, shell *Shell, backup *Backup, ds *Dataset, inputFile string, exclude []string) error { - cmd := r.backupCmd(backup, ds, inputFile, exclude) + cc, err := r.cacheMgr.Acquire() + if err != nil { + return err + } + defer r.cacheMgr.Release(cc) + + cmd := r.backupCmd(cc, backup, ds, inputFile, exclude) return r.runBackupCmd(ctx, shell, backup, ds, cmd) } func (r *resticRepository) RunStreamBackup(ctx context.Context, shell *Shell, backup *Backup, ds *Dataset, backupCmd, compressCmd string) error { + cc, err := r.cacheMgr.Acquire() + if err != nil { + return err + } + defer r.cacheMgr.Release(cc) + // Concatenate the backupCmd with restic (and an optional // compression command) to build a shell pipeline. pipe := []string{ @@ -288,14 +293,20 @@ func (r *resticRepository) RunStreamBackup(ctx context.Context, shell *Shell, ba if compressCmd != "" { pipe = append(pipe, compressCmd) } - pipe = append(pipe, r.backupStreamCmd(backup, ds)) + pipe = append(pipe, r.backupStreamCmd(cc, backup, ds)) cmd := strings.Join(pipe, " | ") return r.runBackupCmd(ctx, shell, backup, ds, cmd) } func (r *resticRepository) RunRestore(ctx context.Context, shell *Shell, backup *Backup, ds *Dataset, paths []string, target string) error { - cmd, err := r.restoreCmd(ctx, shell, backup, ds, paths, target) + cc, err := r.cacheMgr.Acquire() + if err != nil { + return err + } + defer r.cacheMgr.Release(cc) + + cmd, err := r.restoreCmd(ctx, cc, shell, backup, ds, paths, target) if err != nil { return err } @@ -303,7 +314,13 @@ func (r *resticRepository) RunRestore(ctx context.Context, shell *Shell, backup } func (r *resticRepository) RunStreamRestore(ctx context.Context, shell *Shell, backup *Backup, ds *Dataset, restoreCmd, decompressCmd string) error { - resticCmd, err := r.restoreStreamCmd(ctx, shell, backup, ds, getWorkDir(ctx)) + cc, err := r.cacheMgr.Acquire() + if err != nil { + return err + } + defer r.cacheMgr.Release(cc) + + resticCmd, err := r.restoreStreamCmd(ctx, cc, shell, backup, ds, getWorkDir(ctx)) if err != nil { return err } diff --git a/repository_restic_test.go b/repository_restic_test.go index 817134735d5be1436c6597f527fa37ec8b066061..b0d847ec7de34f873d6ab45ec0623a6a3dbb28b2 100644 --- a/repository_restic_test.go +++ b/repository_restic_test.go @@ -45,14 +45,22 @@ func runResticTest(t *testing.T, tmpdir string, source *SourceSpec, restorePatte t.Skip("can't run restic: ", err) } + // Temporary cache dir. + cacheDir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(cacheDir) + store := &dummyMetadataStore{} repoSpec := RepositorySpec{ Name: "main", Type: "restic", Params: map[string]interface{}{ - "uri": tmpdir + "/repo", - "password": "testpass", + "uri": tmpdir + "/repo", + "password": "testpass", + "cache_dir": cacheDir, }, } handlerSpecs := []*HandlerSpec{ diff --git a/restic_cache.go b/restic_cache.go new file mode 100644 index 0000000000000000000000000000000000000000..bcd2194222317799d8f7b72fe810a5234c7e2acc --- /dev/null +++ b/restic_cache.go @@ -0,0 +1,91 @@ +package tabacco + +import ( + "errors" + "fmt" + "path/filepath" + "sync" +) + +const maxConcurrentCaches = 20 + +type cacheManager interface { + Acquire() (cache, error) + Release(cache) +} + +type cache interface { + Args() string +} + +// The restic cache manager maintains a pool of cache +// directories. These can take quite a lot of disk space, so we only +// create more than one when jobs are running in parallel. +type resticCacheManager struct { + root string + + mx sync.Mutex + busy []bool +} + +func newResticCacheManager(root string) *resticCacheManager { + return &resticCacheManager{ + root: root, + busy: make([]bool, maxConcurrentCaches), + } +} + +func (m *resticCacheManager) next() (int, bool) { + for i := 0; i < len(m.busy); i++ { + if !m.busy[i] { + m.busy[i] = true + return i, true + } + } + return 0, false +} + +func (m *resticCacheManager) Acquire() (cache, error) { + m.mx.Lock() + defer m.mx.Unlock() + + index, ok := m.next() + if !ok { + return nil, errors.New("no more caches available") + } + return &resticCache{ + index: index, + path: filepath.Join(m.root, fmt.Sprintf("cache-%03d", index)), + }, nil +} + +func (m *resticCacheManager) Release(c cache) { + m.mx.Lock() + rc := c.(*resticCache) + m.busy[rc.index] = false + m.mx.Unlock() +} + +type resticCache struct { + index int + path string +} + +func (c *resticCache) Args() string { + return fmt.Sprintf("--cache-dir %s", c.path) +} + +// The null cache manager always returns --no-cache. +type nullCacheManager struct{} + +func (m *nullCacheManager) Acquire() (cache, error) { + return new(nullCache), nil +} + +func (m *nullCacheManager) Release(cache) {} + +type nullCache struct{} + +func (c *nullCache) Args() string { + return "--no-cache" +}