Commit c93018f2 authored by ale's avatar ale

Store the restic snapshot ID in the backup metadata

This will eventually allow us to use --previous and skip the expensive
remote snapshot lookup.
parent d728bfd2
Pipeline #4751 passed with stages
in 5 minutes and 40 seconds
...@@ -56,10 +56,10 @@ func (h *fileHandler) BackupJob(rctx RuntimeContext, backup *Backup, ds *Dataset ...@@ -56,10 +56,10 @@ func (h *fileHandler) BackupJob(rctx RuntimeContext, backup *Backup, ds *Dataset
return err return err
} }
cmd := rctx.Repo().BackupCmd(backup, ds, tmpf) // Invoke the backup command (path-based).
repo := rctx.Repo()
// Now pass those paths to the Backup method. cmd := repo.BackupCmd(backup, ds, tmpf)
return rctx.Shell().Run(ctx, cmd) return repo.RunBackup(ctx, rctx.Shell(), backup, ds, cmd)
}) })
} }
......
...@@ -61,14 +61,15 @@ func newPipeHandler(name string, params Params) (Handler, error) { ...@@ -61,14 +61,15 @@ func newPipeHandler(name string, params Params) (Handler, error) {
} }
func (h *pipeHandler) BackupJob(rctx RuntimeContext, backup *Backup, ds *Dataset) jobs.Job { func (h *pipeHandler) BackupJob(rctx RuntimeContext, backup *Backup, ds *Dataset) jobs.Job {
repo := rctx.Repo()
cmd := fmt.Sprintf( cmd := fmt.Sprintf(
"(%s)%s | %s", "(%s)%s | %s",
expandVars(h.backupCmd, backup, ds), expandVars(h.backupCmd, backup, ds),
h.compressSuffix(), h.compressSuffix(),
rctx.Repo().BackupStreamCmd(backup, ds), repo.BackupStreamCmd(backup, ds),
) )
return jobs.JobFunc(func(ctx context.Context) error { return jobs.JobFunc(func(ctx context.Context) error {
return rctx.Shell().Run(ctx, cmd) return repo.RunBackup(ctx, rctx.Shell(), backup, ds, cmd)
}) })
} }
......
ALTER TABLE log DROP COLUMN dataset_snapshot_id;
ALTER TABLE log ADD COLUMN dataset_snapshot_id VARCHAR(64);
...@@ -2,6 +2,8 @@ ...@@ -2,6 +2,8 @@
// sources: // sources:
// migrations/1_initialize_schema.down.sql // migrations/1_initialize_schema.down.sql
// migrations/1_initialize_schema.up.sql // migrations/1_initialize_schema.up.sql
// migrations/2_add_snapshot_id.down.sql
// migrations/2_add_snapshot_id.up.sql
// DO NOT EDIT! // DO NOT EDIT!
package migrations package migrations
...@@ -63,7 +65,7 @@ func _1_initialize_schemaDownSql() (*asset, error) { ...@@ -63,7 +65,7 @@ func _1_initialize_schemaDownSql() (*asset, error) {
return nil, err return nil, err
} }
info := bindataFileInfo{name: "1_initialize_schema.down.sql", size: 121, mode: os.FileMode(420), modTime: time.Unix(1560930730, 0)} info := bindataFileInfo{name: "1_initialize_schema.down.sql", size: 121, mode: os.FileMode(420), modTime: time.Unix(1560983647, 0)}
a := &asset{bytes: bytes, info: info} a := &asset{bytes: bytes, info: info}
return a, nil return a, nil
} }
...@@ -98,7 +100,45 @@ func _1_initialize_schemaUpSql() (*asset, error) { ...@@ -98,7 +100,45 @@ func _1_initialize_schemaUpSql() (*asset, error) {
return nil, err return nil, err
} }
info := bindataFileInfo{name: "1_initialize_schema.up.sql", size: 568, mode: os.FileMode(420), modTime: time.Unix(1560930732, 0)} info := bindataFileInfo{name: "1_initialize_schema.up.sql", size: 568, mode: os.FileMode(420), modTime: time.Unix(1560983647, 0)}
a := &asset{bytes: bytes, info: info}
return a, nil
}
var __2_add_snapshot_idDownSql = []byte(`
ALTER TABLE log DROP COLUMN dataset_snapshot_id;
`)
func _2_add_snapshot_idDownSqlBytes() ([]byte, error) {
return __2_add_snapshot_idDownSql, nil
}
func _2_add_snapshot_idDownSql() (*asset, error) {
bytes, err := _2_add_snapshot_idDownSqlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "2_add_snapshot_id.down.sql", size: 50, mode: os.FileMode(420), modTime: time.Unix(1571647185, 0)}
a := &asset{bytes: bytes, info: info}
return a, nil
}
var __2_add_snapshot_idUpSql = []byte(`
ALTER TABLE log ADD COLUMN dataset_snapshot_id VARCHAR(64);
`)
func _2_add_snapshot_idUpSqlBytes() ([]byte, error) {
return __2_add_snapshot_idUpSql, nil
}
func _2_add_snapshot_idUpSql() (*asset, error) {
bytes, err := _2_add_snapshot_idUpSqlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "2_add_snapshot_id.up.sql", size: 61, mode: os.FileMode(420), modTime: time.Unix(1571647193, 0)}
a := &asset{bytes: bytes, info: info} a := &asset{bytes: bytes, info: info}
return a, nil return a, nil
} }
...@@ -157,6 +197,8 @@ func AssetNames() []string { ...@@ -157,6 +197,8 @@ func AssetNames() []string {
var _bindata = map[string]func() (*asset, error){ var _bindata = map[string]func() (*asset, error){
"1_initialize_schema.down.sql": _1_initialize_schemaDownSql, "1_initialize_schema.down.sql": _1_initialize_schemaDownSql,
"1_initialize_schema.up.sql": _1_initialize_schemaUpSql, "1_initialize_schema.up.sql": _1_initialize_schemaUpSql,
"2_add_snapshot_id.down.sql": _2_add_snapshot_idDownSql,
"2_add_snapshot_id.up.sql": _2_add_snapshot_idUpSql,
} }
// AssetDir returns the file names below a certain // AssetDir returns the file names below a certain
...@@ -201,6 +243,8 @@ type bintree struct { ...@@ -201,6 +243,8 @@ type bintree struct {
var _bintree = &bintree{nil, map[string]*bintree{ var _bintree = &bintree{nil, map[string]*bintree{
"1_initialize_schema.down.sql": &bintree{_1_initialize_schemaDownSql, map[string]*bintree{}}, "1_initialize_schema.down.sql": &bintree{_1_initialize_schemaDownSql, map[string]*bintree{}},
"1_initialize_schema.up.sql": &bintree{_1_initialize_schemaUpSql, map[string]*bintree{}}, "1_initialize_schema.up.sql": &bintree{_1_initialize_schemaUpSql, map[string]*bintree{}},
"2_add_snapshot_id.down.sql": &bintree{_2_add_snapshot_idDownSql, map[string]*bintree{}},
"2_add_snapshot_id.up.sql": &bintree{_2_add_snapshot_idUpSql, map[string]*bintree{}},
}} }}
// RestoreAsset restores an asset under the given directory // RestoreAsset restores an asset under the given directory
......
...@@ -14,14 +14,15 @@ import ( ...@@ -14,14 +14,15 @@ import (
// An atom, as represented in the database, denormalized. // An atom, as represented in the database, denormalized.
type dbAtom struct { type dbAtom struct {
BackupID string BackupID string
BackupTimestamp time.Time BackupTimestamp time.Time
BackupHost string BackupHost string
DatasetID string DatasetID string
DatasetSource string DatasetSource string
AtomName string DatasetSnapshotID string
AtomFullPath string AtomName string
AtomPath string AtomFullPath string
AtomPath string
} }
func makeAtoms(backup tabacco.Backup, ds tabacco.Dataset) []dbAtom { func makeAtoms(backup tabacco.Backup, ds tabacco.Dataset) []dbAtom {
...@@ -34,14 +35,15 @@ func makeAtoms(backup tabacco.Backup, ds tabacco.Dataset) []dbAtom { ...@@ -34,14 +35,15 @@ func makeAtoms(backup tabacco.Backup, ds tabacco.Dataset) []dbAtom {
path := filepath.Join(ds.Source, atom.Name) path := filepath.Join(ds.Source, atom.Name)
out = append(out, dbAtom{ out = append(out, dbAtom{
BackupID: backup.ID, BackupID: backup.ID,
BackupTimestamp: backup.Timestamp, BackupTimestamp: backup.Timestamp,
BackupHost: backup.Host, BackupHost: backup.Host,
DatasetID: ds.ID, DatasetID: ds.ID,
DatasetSource: ds.Source, DatasetSnapshotID: ds.SnapshotID,
AtomName: atom.Name, DatasetSource: ds.Source,
AtomPath: atom.Path, AtomName: atom.Name,
AtomFullPath: path, AtomPath: atom.Path,
AtomFullPath: path,
}) })
} }
return out return out
...@@ -57,8 +59,9 @@ func (a *dbAtom) getBackup() *tabacco.Backup { ...@@ -57,8 +59,9 @@ func (a *dbAtom) getBackup() *tabacco.Backup {
func (a *dbAtom) getDataset() *tabacco.Dataset { func (a *dbAtom) getDataset() *tabacco.Dataset {
return &tabacco.Dataset{ return &tabacco.Dataset{
ID: a.DatasetID, ID: a.DatasetID,
Source: a.DatasetSource, SnapshotID: a.DatasetSnapshotID,
Source: a.DatasetSource,
} }
} }
...@@ -161,10 +164,10 @@ var statements = map[string]string{ ...@@ -161,10 +164,10 @@ var statements = map[string]string{
"insert_atom": ` "insert_atom": `
INSERT INTO log ( INSERT INTO log (
backup_id, backup_timestamp, backup_host, backup_id, backup_timestamp, backup_host,
dataset_id, dataset_source, dataset_id, dataset_snapshot_id, dataset_source,
atom_name, atom_path, atom_full_path atom_name, atom_path, atom_full_path
) VALUES ( ) VALUES (
?, ?, ?, ?, ?, ?, ?, ? ?, ?, ?, ?, ?, ?, ?, ?, ?
) )
`, `,
} }
...@@ -184,6 +187,7 @@ func (s *Service) AddDataset(ctx context.Context, backup tabacco.Backup, ds taba ...@@ -184,6 +187,7 @@ func (s *Service) AddDataset(ctx context.Context, backup tabacco.Backup, ds taba
dbAtom.BackupTimestamp, dbAtom.BackupTimestamp,
dbAtom.BackupHost, dbAtom.BackupHost,
dbAtom.DatasetID, dbAtom.DatasetID,
dbAtom.DatasetSnapshotID,
dbAtom.DatasetSource, dbAtom.DatasetSource,
dbAtom.AtomName, dbAtom.AtomName,
dbAtom.AtomPath, dbAtom.AtomPath,
...@@ -223,7 +227,7 @@ func (s *Service) FindAtoms(ctx context.Context, req *tabacco.FindRequest) ([]*t ...@@ -223,7 +227,7 @@ func (s *Service) FindAtoms(ctx context.Context, req *tabacco.FindRequest) ([]*t
q := fmt.Sprintf( q := fmt.Sprintf(
`SELECT `SELECT
backup_id, backup_timestamp, backup_host, backup_id, backup_timestamp, backup_host,
dataset_id, dataset_source, dataset_id, dataset_snapshot_id, dataset_source,
atom_name, atom_path, atom_full_path atom_name, atom_path, atom_full_path
FROM log WHERE %s FROM log WHERE %s
ORDER BY backup_timestamp DESC`, ORDER BY backup_timestamp DESC`,
...@@ -240,7 +244,7 @@ func (s *Service) FindAtoms(ctx context.Context, req *tabacco.FindRequest) ([]*t ...@@ -240,7 +244,7 @@ func (s *Service) FindAtoms(ctx context.Context, req *tabacco.FindRequest) ([]*t
var a dbAtom var a dbAtom
if err := rows.Scan( if err := rows.Scan(
&a.BackupID, &a.BackupTimestamp, &a.BackupHost, &a.BackupID, &a.BackupTimestamp, &a.BackupHost,
&a.DatasetID, &a.DatasetSource, &a.DatasetID, &a.DatasetSnapshotID, &a.DatasetSource,
&a.AtomName, &a.AtomPath, &a.AtomFullPath, &a.AtomName, &a.AtomPath, &a.AtomFullPath,
); err != nil { ); err != nil {
log.Printf("bad row: %v", err) log.Printf("bad row: %v", err)
......
...@@ -266,6 +266,18 @@ func (r *resticRepository) RestoreStreamCmd(ctx context.Context, rctx RuntimeCon ...@@ -266,6 +266,18 @@ func (r *resticRepository) RestoreStreamCmd(ctx context.Context, rctx RuntimeCon
), nil ), nil
} }
var resticSnapshotRx = regexp.MustCompile(`^snapshot ([0-9a-f]{8}) saved`)
// Scan the output of 'restic backup' for the snapshot ID. Modifies backup/dataset objects.
func (r *resticRepository) RunBackup(ctx context.Context, shell *Shell, backup *Backup, ds *Dataset, cmd string) error {
return shell.RunWithStdoutCallback(ctx, cmd, func(line string) {
m := resticSnapshotRx.FindStringSubmatch(line)
if len(m) > 1 && m[1] != "" {
ds.SnapshotID = m[1]
}
})
}
// Data about a snapshot, obtained from 'restic snapshots --json'. // Data about a snapshot, obtained from 'restic snapshots --json'.
type resticSnapshot struct { type resticSnapshot struct {
ID string `json:"id"` ID string `json:"id"`
......
...@@ -114,8 +114,10 @@ func (s *Shell) command(ctx context.Context, arg string) *exec.Cmd { ...@@ -114,8 +114,10 @@ func (s *Shell) command(ctx context.Context, arg string) *exec.Cmd {
return c return c
} }
// Run a command. Log its standard output and error. // RunWithStdoutCallback executes a command and invokes a callback on
func (s *Shell) Run(ctx context.Context, arg string) error { // every line read from its standard output. Stdandard output and
// error are still logged normally as in Run().
func (s *Shell) RunWithStdoutCallback(ctx context.Context, arg string, stdoutCallback func(string)) error {
c := s.command(ctx, arg) c := s.command(ctx, arg)
stdout, err := c.StdoutPipe() stdout, err := c.StdoutPipe()
...@@ -138,7 +140,11 @@ func (s *Shell) Run(ctx context.Context, arg string) error { ...@@ -138,7 +140,11 @@ func (s *Shell) Run(ctx context.Context, arg string) error {
go func() { go func() {
scanner := bufio.NewScanner(stdout) scanner := bufio.NewScanner(stdout)
for scanner.Scan() { for scanner.Scan() {
log.Printf("%s%s", logPrefix, scanner.Text()) line := scanner.Text()
log.Printf("%s%s", logPrefix, line)
if stdoutCallback != nil {
stdoutCallback(line)
}
} }
}() }()
go func() { go func() {
...@@ -150,6 +156,11 @@ func (s *Shell) Run(ctx context.Context, arg string) error { ...@@ -150,6 +156,11 @@ func (s *Shell) Run(ctx context.Context, arg string) error {
return c.Wait() return c.Wait()
} }
// Run a command. Log its standard output and error.
func (s *Shell) Run(ctx context.Context, arg string) error {
return s.RunWithStdoutCallback(ctx, arg, nil)
}
// Output runs a command and returns the standard output. // Output runs a command and returns the standard output.
func (s *Shell) Output(ctx context.Context, arg string) ([]byte, error) { func (s *Shell) Output(ctx context.Context, arg string) ([]byte, error) {
c := s.command(ctx, arg) c := s.command(ctx, arg)
......
...@@ -84,6 +84,9 @@ type Dataset struct { ...@@ -84,6 +84,9 @@ type Dataset struct {
// Atoms that are part of this dataset. // Atoms that are part of this dataset.
Atoms []Atom `json:"atoms"` Atoms []Atom `json:"atoms"`
// Snapshot ID (repository-specific).
SnapshotID string `json:"snapshot_id"`
} }
// FindRequest specifies search criteria for atoms. // FindRequest specifies search criteria for atoms.
...@@ -127,6 +130,7 @@ type Repository interface { ...@@ -127,6 +130,7 @@ type Repository interface {
RestoreCmd(context.Context, RuntimeContext, *Backup, *Dataset, []string, string) (string, error) RestoreCmd(context.Context, RuntimeContext, *Backup, *Dataset, []string, string) (string, error)
BackupStreamCmd(*Backup, *Dataset) string BackupStreamCmd(*Backup, *Dataset) string
RestoreStreamCmd(context.Context, RuntimeContext, *Backup, *Dataset, string) (string, error) RestoreStreamCmd(context.Context, RuntimeContext, *Backup, *Dataset, string) (string, error)
RunBackup(context.Context, *Shell, *Backup, *Dataset, string) error
Close() error Close() error
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment