Commit 0a281cb6 authored by ale's avatar ale
Browse files

Merge branch 'fewer-caches' into 'master'

Reuse a pool of cache directories

Closes #6

See merge request !22
parents 344d0ead 2aeb1ade
Pipeline #22365 passed with stages
in 2 minutes and 50 seconds
......@@ -27,7 +27,7 @@ type resticRepository struct {
excludes []string
excludeFiles []string
autoPrune bool
cacheDirPath string
cacheMgr cacheManager
initialized sync.Once
}
......@@ -106,6 +106,11 @@ func newResticRepository(params Params) (Repository, error) {
return nil, err
}
var cmgr cacheManager = new(nullCacheManager)
if s := params.Get("cache_dir"); s != "" {
cmgr = newResticCacheManager(s)
}
autoPrune := true
if b, ok := params.GetBool("autoprune"); ok {
autoPrune = b
......@@ -117,7 +122,7 @@ func newResticRepository(params Params) (Repository, error) {
excludes: params.GetList("exclude"),
excludeFiles: params.GetList("exclude_files"),
autoPrune: autoPrune,
cacheDirPath: params.Get("cache_dir"),
cacheMgr: cmgr,
}, nil
}
......@@ -154,25 +159,11 @@ func resticBackupTags(backup *Backup, ds *Dataset) string {
return fmt.Sprintf("--tag dataset_id=%s --tag backup_id=%s --tag dataset_source=%s", ds.ID, backup.ID, ds.Source)
}
func (r *resticRepository) cacheArgs(ds *Dataset) string {
if r.cacheDirPath == "" {
return "--no-cache"
}
dir := filepath.Join(r.cacheDirPath, strings.Replace(ds.Source, "/", "_", -1))
if _, err := os.Stat(dir); os.IsNotExist(err) {
if err := os.MkdirAll(dir, 0700); err != nil {
log.Printf("warning: could not create cache directory (%s), running without a local cache", err)
return "--no-cache"
}
}
return fmt.Sprintf("--cache-dir %s", dir)
}
func (r *resticRepository) backupCmd(backup *Backup, ds *Dataset, inputFile string, exclude []string) string {
func (r *resticRepository) backupCmd(cc cache, backup *Backup, ds *Dataset, inputFile string, exclude []string) string {
cmd := fmt.Sprintf(
"%s backup %s --json --exclude-caches --one-file-system %s --files-from %s",
r.resticCmd(),
r.cacheArgs(ds),
cc.Args(),
resticBackupTags(backup, ds),
inputFile,
)
......@@ -182,7 +173,7 @@ func (r *resticRepository) backupCmd(backup *Backup, ds *Dataset, inputFile stri
return cmd
}
func (r *resticRepository) getSnapshotID(ctx context.Context, shell *Shell, backup *Backup, ds *Dataset) (string, error) {
func (r *resticRepository) getSnapshotID(ctx context.Context, cc cache, shell *Shell, backup *Backup, ds *Dataset) (string, error) {
if ds.SnapshotID != "" {
return ds.SnapshotID, nil
}
......@@ -191,7 +182,7 @@ func (r *resticRepository) getSnapshotID(ctx context.Context, shell *Shell, back
data, err := shell.Output(ctx, fmt.Sprintf(
"%s snapshots %s --no-lock --json %s",
r.resticCmd(),
r.cacheArgs(ds),
cc.Args(),
resticBackupTags(backup, ds),
))
if err != nil {
......@@ -207,8 +198,8 @@ func (r *resticRepository) getSnapshotID(ctx context.Context, shell *Shell, back
return snaps[0].ShortID, nil
}
func (r *resticRepository) restoreCmd(ctx context.Context, shell *Shell, backup *Backup, ds *Dataset, paths []string, target string) (string, error) {
snap, err := r.getSnapshotID(ctx, shell, backup, ds)
func (r *resticRepository) restoreCmd(ctx context.Context, cc cache, shell *Shell, backup *Backup, ds *Dataset, paths []string, target string) (string, error) {
snap, err := r.getSnapshotID(ctx, cc, shell, backup, ds)
if err != nil {
return "", err
}
......@@ -222,6 +213,7 @@ func (r *resticRepository) restoreCmd(ctx context.Context, shell *Shell, backup
}
cmd = append(cmd, fmt.Sprintf("--target %s", target))
cmd = append(cmd, cc.Args())
cmd = append(cmd, snap)
return strings.Join(cmd, " "), nil
}
......@@ -243,20 +235,20 @@ func datasetStdinPath(ds *Dataset) string {
return fmt.Sprintf("/STDIN_%s", dsPath)
}
func (r *resticRepository) backupStreamCmd(backup *Backup, ds *Dataset) string {
func (r *resticRepository) backupStreamCmd(cc cache, backup *Backup, ds *Dataset) string {
fakePath := datasetStdinPath(ds)
return fmt.Sprintf(
// The --force prevents restic from trying to find a previous snapshot.
"%s backup %s %s --json --force --stdin --stdin-filename %s",
r.resticCmd(),
r.cacheArgs(ds),
cc.Args(),
resticBackupTags(backup, ds),
fakePath,
)
}
func (r *resticRepository) restoreStreamCmd(ctx context.Context, shell *Shell, backup *Backup, ds *Dataset, target string) (string, error) {
snap, err := r.getSnapshotID(ctx, shell, backup, ds)
func (r *resticRepository) restoreStreamCmd(ctx context.Context, cc cache, shell *Shell, backup *Backup, ds *Dataset, target string) (string, error) {
snap, err := r.getSnapshotID(ctx, cc, shell, backup, ds)
if err != nil {
return "", err
}
......@@ -266,8 +258,9 @@ func (r *resticRepository) restoreStreamCmd(ctx context.Context, shell *Shell, b
// Restore the file to a temporary directory, then pipe it.
return fmt.Sprintf(
"(%s restore --target %s %s 1>&2 && cat %s)",
"(%s restore %s --target %s %s 1>&2 && cat %s)",
r.resticCmd(),
cc.Args(),
target,
snap,
filepath.Join(target, targetPath),
......@@ -275,11 +268,23 @@ func (r *resticRepository) restoreStreamCmd(ctx context.Context, shell *Shell, b
}
func (r *resticRepository) RunBackup(ctx context.Context, shell *Shell, backup *Backup, ds *Dataset, inputFile string, exclude []string) error {
cmd := r.backupCmd(backup, ds, inputFile, exclude)
cc, err := r.cacheMgr.Acquire()
if err != nil {
return err
}
defer r.cacheMgr.Release(cc)
cmd := r.backupCmd(cc, backup, ds, inputFile, exclude)
return r.runBackupCmd(ctx, shell, backup, ds, cmd)
}
func (r *resticRepository) RunStreamBackup(ctx context.Context, shell *Shell, backup *Backup, ds *Dataset, backupCmd, compressCmd string) error {
cc, err := r.cacheMgr.Acquire()
if err != nil {
return err
}
defer r.cacheMgr.Release(cc)
// Concatenate the backupCmd with restic (and an optional
// compression command) to build a shell pipeline.
pipe := []string{
......@@ -288,14 +293,20 @@ func (r *resticRepository) RunStreamBackup(ctx context.Context, shell *Shell, ba
if compressCmd != "" {
pipe = append(pipe, compressCmd)
}
pipe = append(pipe, r.backupStreamCmd(backup, ds))
pipe = append(pipe, r.backupStreamCmd(cc, backup, ds))
cmd := strings.Join(pipe, " | ")
return r.runBackupCmd(ctx, shell, backup, ds, cmd)
}
func (r *resticRepository) RunRestore(ctx context.Context, shell *Shell, backup *Backup, ds *Dataset, paths []string, target string) error {
cmd, err := r.restoreCmd(ctx, shell, backup, ds, paths, target)
cc, err := r.cacheMgr.Acquire()
if err != nil {
return err
}
defer r.cacheMgr.Release(cc)
cmd, err := r.restoreCmd(ctx, cc, shell, backup, ds, paths, target)
if err != nil {
return err
}
......@@ -303,7 +314,13 @@ func (r *resticRepository) RunRestore(ctx context.Context, shell *Shell, backup
}
func (r *resticRepository) RunStreamRestore(ctx context.Context, shell *Shell, backup *Backup, ds *Dataset, restoreCmd, decompressCmd string) error {
resticCmd, err := r.restoreStreamCmd(ctx, shell, backup, ds, getWorkDir(ctx))
cc, err := r.cacheMgr.Acquire()
if err != nil {
return err
}
defer r.cacheMgr.Release(cc)
resticCmd, err := r.restoreStreamCmd(ctx, cc, shell, backup, ds, getWorkDir(ctx))
if err != nil {
return err
}
......
......@@ -45,14 +45,22 @@ func runResticTest(t *testing.T, tmpdir string, source *SourceSpec, restorePatte
t.Skip("can't run restic: ", err)
}
// Temporary cache dir.
cacheDir, err := ioutil.TempDir("", "")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(cacheDir)
store := &dummyMetadataStore{}
repoSpec := RepositorySpec{
Name: "main",
Type: "restic",
Params: map[string]interface{}{
"uri": tmpdir + "/repo",
"password": "testpass",
"uri": tmpdir + "/repo",
"password": "testpass",
"cache_dir": cacheDir,
},
}
handlerSpecs := []*HandlerSpec{
......
package tabacco
import (
"errors"
"fmt"
"path/filepath"
"sync"
)
const maxConcurrentCaches = 20
type cacheManager interface {
Acquire() (cache, error)
Release(cache)
}
type cache interface {
Args() string
}
// The restic cache manager maintains a pool of cache
// directories. These can take quite a lot of disk space, so we only
// create more than one when jobs are running in parallel.
type resticCacheManager struct {
root string
mx sync.Mutex
busy []bool
}
func newResticCacheManager(root string) *resticCacheManager {
return &resticCacheManager{
root: root,
busy: make([]bool, maxConcurrentCaches),
}
}
func (m *resticCacheManager) next() (int, bool) {
for i := 0; i < len(m.busy); i++ {
if !m.busy[i] {
m.busy[i] = true
return i, true
}
}
return 0, false
}
func (m *resticCacheManager) Acquire() (cache, error) {
m.mx.Lock()
defer m.mx.Unlock()
index, ok := m.next()
if !ok {
return nil, errors.New("no more caches available")
}
return &resticCache{
index: index,
path: filepath.Join(m.root, fmt.Sprintf("cache-%03d", index)),
}, nil
}
func (m *resticCacheManager) Release(c cache) {
m.mx.Lock()
rc := c.(*resticCache)
m.busy[rc.index] = false
m.mx.Unlock()
}
type resticCache struct {
index int
path string
}
func (c *resticCache) Args() string {
return fmt.Sprintf("--cache-dir %s", c.path)
}
// The null cache manager always returns --no-cache.
type nullCacheManager struct{}
func (m *nullCacheManager) Acquire() (cache, error) {
return new(nullCache), nil
}
func (m *nullCacheManager) Release(cache) {}
type nullCache struct{}
func (c *nullCache) Args() string {
return "--no-cache"
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment