Commit 1db91b00 authored by ale's avatar ale

Move the atom full paths completely within the metadata API

Paths (for atoms and datasets) only exist within the API itself now.
parent 35f8a72e
Pipeline #3473 passed with stage
in 35 seconds
......@@ -64,7 +64,13 @@ func (a *runtimeAssets) Shell() *Shell {
}
func buildHandlerMap(specs []*HandlerSpec) map[string]*HandlerSpec {
m := make(map[string]*HandlerSpec)
// Create a handler map with a default 'file' spec.
m := map[string]*HandlerSpec{
"file": &HandlerSpec{
Name: "file",
Type: "file",
},
}
for _, h := range specs {
m[h.Name] = h
}
......
......@@ -6,7 +6,6 @@ import (
"fmt"
"log"
"os"
"strings"
"testing"
"time"
)
......@@ -211,7 +210,9 @@ func checkTwoUserAccountsAtoms(ra *runtimeAssets, datasets []*Dataset) error {
return errors.New("empty dataset name")
}
for _, atom := range ds.Atoms {
if !strings.HasPrefix(atom.Name, "users/") {
switch atom.Name {
case "account1", "account2":
default:
return fmt.Errorf("bad atom name: %s", atom.Name)
}
numAtoms++
......
......@@ -3,6 +3,7 @@ package tabacco
import (
"context"
"log"
"path/filepath"
"testing"
"time"
......@@ -12,16 +13,18 @@ import (
type dummyMetadataEntry struct {
backupID string
backupTS time.Time
name string
dsName string
host string
source string
path string
atom Atom
}
func (e dummyMetadataEntry) match(req *FindRequest) bool {
if req.Pattern != "" && !req.matchPattern(e.name) {
return false
if req.Pattern != "" {
if !req.matchPattern(e.path) {
return false
}
}
if req.Host != "" && req.Host != e.host {
return false
......@@ -48,65 +51,93 @@ type dummyMetadataStore struct {
log []dummyMetadataEntry
}
func (d *dummyMetadataStore) FindAtoms(_ context.Context, req *FindRequest) ([]*Backup, error) {
tmp := make(map[string]map[string][]dummyMetadataEntry)
for _, l := range d.log {
if !l.match(req) {
continue
// Argh! This is copy&pasted from server/service.go, but with minor
// modifications due to the different types... terrible.
func keepNumVersions(dbAtoms []dummyMetadataEntry, numVersions int) []dummyMetadataEntry {
// numVersions == 0 is remapped to 1.
if numVersions < 1 {
numVersions = 1
}
count := 0
tmp := make(map[string][]dummyMetadataEntry)
for _, a := range dbAtoms {
l := tmp[a.path]
if len(l) < numVersions {
l = append(l, a)
count++
}
tmp[a.path] = l
}
out := make([]dummyMetadataEntry, 0, count)
for _, l := range tmp {
out = append(out, l...)
}
return out
}
func groupByBackup(dbAtoms []dummyMetadataEntry) []*Backup {
// As we scan through dbAtoms, aggregate into Backups and Datasets.
backups := make(map[string]*Backup)
dsm := make(map[string]map[string]*Dataset)
m, ok := tmp[l.name]
for _, atom := range dbAtoms {
// Create the Backup object if it does not exist.
b, ok := backups[atom.backupID]
if !ok {
m = make(map[string][]dummyMetadataEntry)
tmp[l.name] = m
b = atom.toBackup()
backups[atom.backupID] = b
}
m[l.backupID] = append(m[l.name], l)
}
count := req.NumVersions
if count < 1 {
count = 1
}
// Accumulate output into Backups
btmp := make(map[string]*Backup)
for _, dsmap := range tmp {
for _, dslog := range dsmap {
ds := dslog[0].toDataset()
b := dslog[0].toBackup()
bb, ok := btmp[b.ID]
if !ok {
btmp[b.ID] = b
bb = b
}
bb.Datasets = append(bb.Datasets, ds)
ds.Atoms = nil
if len(dslog) > count {
dslog = dslog[len(dslog)-count:]
}
for _, l := range dslog {
ds.Atoms = append(ds.Atoms, l.atom)
}
// Create the Dataset object for this Backup in the
// two-level map (creating the intermediate map if
// necessary).
tmp, ok := dsm[atom.backupID]
if !ok {
tmp = make(map[string]*Dataset)
dsm[atom.backupID] = tmp
}
// Match datasets by their full path.
dsPath := filepath.Join(atom.source, atom.dsName)
ds, ok := tmp[dsPath]
if !ok {
ds = atom.toDataset()
tmp[dsPath] = ds
b.Datasets = append(b.Datasets, ds)
}
// Finally, add the atom to the dataset.
ds.Atoms = append(ds.Atoms, atom.atom)
}
var out []*Backup
for _, b := range btmp {
out := make([]*Backup, 0, len(backups))
for _, b := range backups {
out = append(out, b)
}
return out, nil
return out
}
func (d *dummyMetadataStore) FindAtoms(_ context.Context, req *FindRequest) ([]*Backup, error) {
var tmp []dummyMetadataEntry
for _, l := range d.log {
if !l.match(req) {
continue
}
tmp = append(tmp, l)
}
return groupByBackup(keepNumVersions(tmp, req.NumVersions)), nil
}
func (d *dummyMetadataStore) AddDataset(_ context.Context, backup *Backup, ds *Dataset) error {
log.Printf("AddDataset: %+v", *ds)
for _, atom := range ds.Atoms {
//name := fmt.Sprintf("%s/%s", ds.Name, atom.Name)
name := atom.Name
path := filepath.Join(ds.Source, ds.Name, atom.Name)
d.log = append(d.log, dummyMetadataEntry{
backupID: backup.ID,
backupTS: backup.Timestamp,
host: backup.Host,
name: name,
path: path,
dsName: ds.Name,
source: ds.Source,
atom: atom,
......
......@@ -9,6 +9,7 @@ CREATE TABLE log (
dataset_name VARCHAR(128),
dataset_source VARCHAR(128),
atom_name VARCHAR(255),
atom_full_path VARCHAR(255),
atom_path VARCHAR(255)
);
......
......@@ -79,6 +79,7 @@ CREATE TABLE log (
dataset_name VARCHAR(128),
dataset_source VARCHAR(128),
atom_name VARCHAR(255),
atom_full_path VARCHAR(255),
atom_path VARCHAR(255)
);
......@@ -97,7 +98,7 @@ func _1_initialize_schemaUpSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "1_initialize_schema.up.sql", size: 539, mode: os.FileMode(420), modTime: time.Unix(1560765826, 0)}
info := bindataFileInfo{name: "1_initialize_schema.up.sql", size: 576, mode: os.FileMode(420), modTime: time.Unix(1560809647, 0)}
a := &asset{bytes: bytes, info: info}
return a, nil
}
......
......@@ -5,6 +5,7 @@ import (
"database/sql"
"fmt"
"log"
"path/filepath"
"strings"
"time"
......@@ -19,12 +20,19 @@ type dbAtom struct {
DatasetName string
DatasetSource string
AtomName string
AtomFullPath string
AtomPath string
}
func makeAtoms(backup tabacco.Backup, ds tabacco.Dataset) []dbAtom {
var out []dbAtom
for _, atom := range ds.Atoms {
// It is here that we 'materialize' the concept of Atom names
// as paths, by concatenating source/dataset/atom and storing
// it as the atom name.
path := filepath.Join(ds.Source, ds.Name, atom.Name)
out = append(out, dbAtom{
BackupID: backup.ID,
BackupTimestamp: backup.Timestamp,
......@@ -33,6 +41,7 @@ func makeAtoms(backup tabacco.Backup, ds tabacco.Dataset) []dbAtom {
DatasetSource: ds.Source,
AtomName: atom.Name,
AtomPath: atom.Path,
AtomFullPath: path,
})
}
return out
......@@ -69,12 +78,12 @@ func keepNumVersions(dbAtoms []*dbAtom, numVersions int) []*dbAtom {
count := 0
tmp := make(map[string][]*dbAtom)
for _, a := range dbAtoms {
l := tmp[a.AtomName]
l := tmp[a.AtomFullPath]
if len(l) < numVersions {
l = append(l, a)
count++
}
tmp[a.AtomName] = l
tmp[a.AtomFullPath] = l
}
out := make([]*dbAtom, 0, count)
for _, l := range tmp {
......@@ -153,9 +162,9 @@ var statements = map[string]string{
INSERT INTO log (
backup_id, backup_timestamp, backup_host,
dataset_name, dataset_source,
atom_name, atom_path
atom_name, atom_path, atom_full_path
) VALUES (
?, ?, ?, ?, ?, ?, ?
?, ?, ?, ?, ?, ?, ?, ?
)
`,
}
......@@ -178,6 +187,7 @@ func (s *Service) AddDataset(ctx context.Context, backup tabacco.Backup, ds taba
dbAtom.DatasetSource,
dbAtom.AtomName,
dbAtom.AtomPath,
dbAtom.AtomFullPath,
); err != nil {
return err
}
......
......@@ -198,8 +198,15 @@ func (r *resticRepository) RestoreCmd(ctx context.Context, rctx RuntimeContext,
return strings.Join(cmd, " "), nil
}
// A special path for stdin datasets that is likely to be unused by the
// rest of the filesystem (the path namespace in Restic is global).
func datasetStdinPath(ds *Dataset) string {
dsPath := filepath.Join(ds.Source, ds.Name)
return fmt.Sprintf("/STDIN_%s", strings.Replace(dsPath, "/", "_", -1))
}
func (r *resticRepository) BackupStreamCmd(backup *Backup, ds *Dataset) string {
fakePath := fmt.Sprintf("/STDIN%s", strings.Replace(ds.Name, "/", "_", -1))
fakePath := datasetStdinPath(ds)
return fmt.Sprintf(
"%s backup --cleanup-cache --exclude-caches --tag %s --tag backup_id=%s --stdin --stdin-filename %s",
r.resticCmd(),
......@@ -215,7 +222,7 @@ func (r *resticRepository) RestoreStreamCmd(ctx context.Context, rctx RuntimeCon
return "", err
}
fakePath := fmt.Sprintf("/STDIN%s", strings.Replace(ds.Name, "/", "_", -1))
fakePath := datasetStdinPath(ds)
targetPath := filepath.Base(fakePath)
// Restore the file to a temporary directory, then pipe it.
......
......@@ -6,9 +6,9 @@ import (
"fmt"
"os"
"os/exec"
"path/filepath"
"time"
"git.autistici.org/ai3/tools/tabacco/util"
"gopkg.in/yaml.v2"
)
......@@ -22,25 +22,19 @@ type DatasetSpec struct {
// Parse a DatasetSpec and return a Dataset.
func (spec *DatasetSpec) Parse(ctx context.Context, src *SourceSpec) (*Dataset, error) {
// Build the atoms list, invoking the atoms_command if
// necessary, and creating actual atoms with absolute names.
name := filepath.Join(src.Name, spec.Name)
// Build the atoms list, invoking the atoms_command if necessary.
var atoms []Atom
for _, a := range spec.Atoms {
atoms = append(atoms, a.withPrefix(name))
}
atoms = append(atoms, spec.Atoms...)
if spec.AtomsCommand != "" {
var cmdAtoms []Atom
if err := runYAMLCommand(ctx, spec.AtomsCommand, &cmdAtoms); err != nil {
return nil, fmt.Errorf("source %s: dataset %s: error in atoms command: %v", src.Name, spec.Name, err)
}
for _, a := range cmdAtoms {
atoms = append(atoms, a.withPrefix(name))
}
atoms = append(atoms, cmdAtoms...)
}
return &Dataset{
Name: name,
Name: spec.Name,
Source: src.Name,
Atoms: atoms,
}, nil
......@@ -142,7 +136,16 @@ func (spec *SourceSpec) Check(handlers map[string]*HandlerSpec) error {
if len(spec.Datasets) == 0 && spec.DatasetsCommand == "" {
return errors.New("must specify one of 'datasets' or 'datasets_command'")
}
return nil
// Check the datasets, at least those that are provided
// statically.
merr := new(util.MultiError)
for _, ds := range spec.Datasets {
if err := ds.Check(); err != nil {
merr.Add(fmt.Errorf("dataset %s: %v", ds.Name, err))
}
}
return merr.OrNil()
}
func runYAMLCommand(ctx context.Context, cmd string, obj interface{}) error {
......
......@@ -3,7 +3,6 @@ package tabacco
import (
"context"
"fmt"
"path/filepath"
"regexp"
"strings"
"time"
......@@ -41,8 +40,9 @@ type Backup struct {
}
// An Atom is a bit of data that can be restored independently as part
// of a Dataset. The atom Name is an absolute path in the global atom
// namespace, so it is prefixed with the container Dataset name.
// of a Dataset. Atoms are identified uniquely by their absolute path
// in the global atom namespace: this path is built by concatenating
// the source name, the dataset name, and the atom name.
type Atom struct {
// Name (path-like).
Name string `json:"name"`
......@@ -52,13 +52,6 @@ type Atom struct {
Path string `json:"path,omitempty"`
}
func (a Atom) withPrefix(pfx string) Atom {
return Atom{
Name: filepath.Join(pfx, a.Name),
Path: a.Path,
}
}
// A Dataset describes a data set as a high level structure containing
// one or more atoms. The 1-to-many scenario is justified by the
// following use case: imagine a sql database server, we may want to
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment