Commit 6a65647c authored by ale's avatar ale

Remove the name from Dataset and replace it with an id

This makes it easier to generate application-level atom paths that are
manageable. Datasets are given unique IDs at generation time.
parent ad361dfa
Pipeline #3483 failed with stage
in 42 seconds
......@@ -36,12 +36,11 @@ func (m *fakeManager) GetStatus() ([]jobs.Status, []jobs.Status, []jobs.Status)
func TestMakeSchedule(t *testing.T) {
sourceSpecs := []*SourceSpec{
&SourceSpec{
Name: "source1",
Name: "source1/users",
Handler: "file1",
Schedule: "@random_every 1d",
Datasets: []*DatasetSpec{
&DatasetSpec{
Name: "users",
Atoms: []Atom{
{
Name: "user1",
......
......@@ -83,13 +83,11 @@ func TestConfig_Parse(t *testing.T) {
Schedule: "@random_every 24h",
Datasets: []*DatasetSpec{
&DatasetSpec{
Name: "account1",
Atoms: []Atom{
{Name: "account1"},
},
},
{
Name: "account2",
Atoms: []Atom{
{Name: "account2"},
},
......@@ -117,7 +115,6 @@ func TestConfig_Parse(t *testing.T) {
Schedule: "@random_every 24h",
Datasets: []*DatasetSpec{
&DatasetSpec{
Name: "users",
Atoms: []Atom{
{Name: "account1"},
{Name: "account2"},
......@@ -144,7 +141,7 @@ func TestConfig_Parse(t *testing.T) {
Name: "users",
Handler: "file",
Schedule: "@random_every 24h",
DatasetsCommand: "echo '[{name: account1, atoms: [{name: account1}, {name: account2}]}]'",
DatasetsCommand: "echo '[{atoms: [{name: account1}, {name: account2}]}]'",
},
},
HandlerSpecs: []*HandlerSpec{
......@@ -206,8 +203,8 @@ func parseAllSources(ra *runtimeAssets, specs []*SourceSpec) ([]*Dataset, error)
func checkTwoUserAccountsAtoms(ra *runtimeAssets, datasets []*Dataset) error {
var numAtoms int
for _, ds := range datasets {
if ds.Name == "" {
return errors.New("empty dataset name")
if ds.ID == "" {
return errors.New("empty dataset ID")
}
for _, atom := range ds.Atoms {
switch atom.Name {
......
......@@ -109,8 +109,6 @@ func expandVars(s string, backup *Backup, ds *Dataset) string {
return key
case "backup.id":
return backup.ID
case "ds.name":
return ds.Name
case "atom.names":
names := make([]string, 0, len(ds.Atoms))
for _, a := range ds.Atoms {
......
......@@ -63,7 +63,7 @@ func (j *metadataJob) RunContext(ctx context.Context) error {
err := j.Job.RunContext(ctx)
if err == nil {
if merr := j.ms.AddDataset(ctx, j.backup, j.ds); merr != nil {
log.Printf("%s: error saving metadata: %v", j.ds.Name, merr)
log.Printf("%s@%s: error saving metadata: %v", j.ds.Source, j.ds.ID, merr)
}
}
return err
......
......@@ -14,7 +14,7 @@ import (
type dummyMetadataEntry struct {
backupID string
backupTS time.Time
dsName string
dsID string
host string
source string
path string
......@@ -35,7 +35,7 @@ func (e dummyMetadataEntry) match(req *FindRequest) bool {
func (e dummyMetadataEntry) toDataset() *Dataset {
return &Dataset{
Name: e.dsName,
ID: e.dsID,
Source: e.source,
}
}
......@@ -99,12 +99,11 @@ func groupByBackup(dbAtoms []dummyMetadataEntry) []*Backup {
tmp = make(map[string]*Dataset)
dsm[atom.backupID] = tmp
}
// Match datasets by their full path.
dsPath := filepath.Join(atom.source, atom.dsName)
ds, ok := tmp[dsPath]
// Match datasets by their unique ID.
ds, ok := tmp[atom.dsID]
if !ok {
ds = atom.toDataset()
tmp[dsPath] = ds
tmp[atom.dsID] = ds
b.Datasets = append(b.Datasets, ds)
}
......@@ -140,13 +139,13 @@ func (d *dummyMetadataStore) AddDataset(_ context.Context, backup *Backup, ds *D
log.Printf("AddDataset: %+v", *ds)
for _, atom := range ds.Atoms {
path := filepath.Join(ds.Source, ds.Name, atom.Name)
path := filepath.Join(ds.Source, atom.Name)
d.log = append(d.log, dummyMetadataEntry{
backupID: backup.ID,
backupTS: backup.Timestamp,
host: backup.Host,
path: path,
dsName: ds.Name,
dsID: ds.ID,
source: ds.Source,
atom: atom,
})
......@@ -189,11 +188,9 @@ func TestManager_Backup(t *testing.T) {
Schedule: "@random_every 1h",
Datasets: []*DatasetSpec{
&DatasetSpec{
Name: "user1",
Atoms: []Atom{{Name: "user1"}},
},
&DatasetSpec{
Name: "user2",
Atoms: []Atom{{Name: "user2"}},
},
},
......@@ -252,11 +249,11 @@ func TestManager_Backup(t *testing.T) {
}
// A pattern matching a single atom.
resp, err = store.FindAtoms(context.TODO(), &FindRequest{Pattern: "source1/user2/user2"})
resp, err = store.FindAtoms(context.TODO(), &FindRequest{Pattern: "source1/user2"})
if err != nil {
t.Fatal("FindAtoms", err)
}
if len(resp) != 1 {
t.Fatalf("bad FindAtoms(source1/user2/user2) response: %+v", resp)
t.Fatalf("bad FindAtoms(source1/user2) response: %+v", resp)
}
}
DROP INDEX idx_log_backup_id_and_dataset_name;
DROP INDEX idx_log_backup_id_and_dataset_id;
DROP INDEX idx_log_backup_id;
DROP INDEX idx_log_primary;
DROP TABLE log;
......
......@@ -6,13 +6,13 @@ CREATE TABLE log (
backup_id VARCHAR(128),
backup_timestamp DATETIME,
backup_host VARCHAR(128),
dataset_name VARCHAR(128),
dataset_id VARCHAR(128),
dataset_source VARCHAR(128),
atom_name VARCHAR(255),
atom_full_path VARCHAR(255),
atom_path VARCHAR(255)
);
CREATE UNIQUE INDEX idx_log_primary ON log (backup_id, dataset_name, atom_name);
CREATE UNIQUE INDEX idx_log_primary ON log (backup_id, dataset_id, atom_name);
CREATE INDEX idx_log_backup_id ON log (backup_id);
CREATE INDEX idx_log_backup_id_and_dataset_name ON log (backup_id, dataset_name);
CREATE INDEX idx_log_backup_id_and_dataset_id ON log (backup_id, dataset_id);
......@@ -46,7 +46,7 @@ func (fi bindataFileInfo) Sys() interface{} {
}
var __1_initialize_schemaDownSql = []byte(`
DROP INDEX idx_log_backup_id_and_dataset_name;
DROP INDEX idx_log_backup_id_and_dataset_id;
DROP INDEX idx_log_backup_id;
DROP INDEX idx_log_primary;
DROP TABLE log;
......@@ -63,7 +63,7 @@ func _1_initialize_schemaDownSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "1_initialize_schema.down.sql", size: 123, mode: os.FileMode(420), modTime: time.Unix(1535012987, 0)}
info := bindataFileInfo{name: "1_initialize_schema.down.sql", size: 121, mode: os.FileMode(420), modTime: time.Unix(1560930730, 0)}
a := &asset{bytes: bytes, info: info}
return a, nil
}
......@@ -76,16 +76,16 @@ CREATE TABLE log (
backup_id VARCHAR(128),
backup_timestamp DATETIME,
backup_host VARCHAR(128),
dataset_name VARCHAR(128),
dataset_id VARCHAR(128),
dataset_source VARCHAR(128),
atom_name VARCHAR(255),
atom_full_path VARCHAR(255),
atom_path VARCHAR(255)
);
CREATE UNIQUE INDEX idx_log_primary ON log (backup_id, dataset_name, atom_name);
CREATE UNIQUE INDEX idx_log_primary ON log (backup_id, dataset_id, atom_name);
CREATE INDEX idx_log_backup_id ON log (backup_id);
CREATE INDEX idx_log_backup_id_and_dataset_name ON log (backup_id, dataset_name);
CREATE INDEX idx_log_backup_id_and_dataset_id ON log (backup_id, dataset_id);
`)
func _1_initialize_schemaUpSqlBytes() ([]byte, error) {
......@@ -98,7 +98,7 @@ func _1_initialize_schemaUpSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "1_initialize_schema.up.sql", size: 576, mode: os.FileMode(420), modTime: time.Unix(1560809647, 0)}
info := bindataFileInfo{name: "1_initialize_schema.up.sql", size: 568, mode: os.FileMode(420), modTime: time.Unix(1560930732, 0)}
a := &asset{bytes: bytes, info: info}
return a, nil
}
......
......@@ -17,7 +17,7 @@ type dbAtom struct {
BackupID string
BackupTimestamp time.Time
BackupHost string
DatasetName string
DatasetID string
DatasetSource string
AtomName string
AtomFullPath string
......@@ -31,13 +31,13 @@ func makeAtoms(backup tabacco.Backup, ds tabacco.Dataset) []dbAtom {
// It is here that we 'materialize' the concept of Atom names
// as paths, by concatenating source/dataset/atom and storing
// it as the atom name.
path := filepath.Join(ds.Source, ds.Name, atom.Name)
path := filepath.Join(ds.Source, atom.Name)
out = append(out, dbAtom{
BackupID: backup.ID,
BackupTimestamp: backup.Timestamp,
BackupHost: backup.Host,
DatasetName: ds.Name,
DatasetID: ds.ID,
DatasetSource: ds.Source,
AtomName: atom.Name,
AtomPath: atom.Path,
......@@ -57,7 +57,7 @@ func (a *dbAtom) getBackup() *tabacco.Backup {
func (a *dbAtom) getDataset() *tabacco.Dataset {
return &tabacco.Dataset{
Name: a.DatasetName,
ID: a.DatasetID,
Source: a.DatasetSource,
}
}
......@@ -113,10 +113,10 @@ func groupByBackup(dbAtoms []*dbAtom) []*tabacco.Backup {
tmp = make(map[string]*tabacco.Dataset)
dsm[atom.BackupID] = tmp
}
ds, ok := tmp[atom.DatasetName]
ds, ok := tmp[atom.DatasetID]
if !ok {
ds = atom.getDataset()
tmp[atom.DatasetName] = ds
tmp[atom.DatasetID] = ds
b.Datasets = append(b.Datasets, ds)
}
......@@ -161,7 +161,7 @@ var statements = map[string]string{
"insert_atom": `
INSERT INTO log (
backup_id, backup_timestamp, backup_host,
dataset_name, dataset_source,
dataset_id, dataset_source,
atom_name, atom_path, atom_full_path
) VALUES (
?, ?, ?, ?, ?, ?, ?, ?
......@@ -183,7 +183,7 @@ func (s *Service) AddDataset(ctx context.Context, backup tabacco.Backup, ds taba
dbAtom.BackupID,
dbAtom.BackupTimestamp,
dbAtom.BackupHost,
dbAtom.DatasetName,
dbAtom.DatasetID,
dbAtom.DatasetSource,
dbAtom.AtomName,
dbAtom.AtomPath,
......@@ -223,7 +223,7 @@ func (s *Service) FindAtoms(ctx context.Context, req *tabacco.FindRequest) ([]*t
q := fmt.Sprintf(
`SELECT
backup_id, backup_timestamp, backup_host,
dataset_name, dataset_source,
dataset_id, dataset_source,
atom_name, atom_path
FROM log WHERE %s
ORDER BY backup_timestamp DESC`,
......@@ -240,7 +240,7 @@ func (s *Service) FindAtoms(ctx context.Context, req *tabacco.FindRequest) ([]*t
var a dbAtom
if err := rows.Scan(
&a.BackupID, &a.BackupTimestamp, &a.BackupHost,
&a.DatasetName, &a.DatasetSource,
&a.DatasetID, &a.DatasetSource,
&a.AtomName, &a.AtomPath,
); err != nil {
log.Printf("bad row: %v", err)
......
......@@ -21,7 +21,6 @@ func addTestEntry(t *testing.T, svc *Service, backupID, host, dsName string) {
Timestamp: time.Now(),
},
tabacco.Dataset{
Name: dsName,
Source: "file",
Atoms: []tabacco.Atom{
{
......
......@@ -148,22 +148,24 @@ func (r *resticRepository) Prepare(ctx context.Context, rctx RuntimeContext, bac
))
}
func resticBackupTags(backup *Backup, ds *Dataset) string {
return fmt.Sprintf("--tag dataset_id=%s --tag backup_id=%s", ds.ID, backup.ID)
}
func (r *resticRepository) BackupCmd(backup *Backup, ds *Dataset, sourcePaths []string) string {
return fmt.Sprintf(
"%s backup --cleanup-cache --exclude-caches --one-file-system --tag %s --tag backup_id=%s %s",
"%s backup --cleanup-cache --exclude-caches --one-file-system %s %s",
r.resticCmd(),
ds.Name,
backup.ID,
resticBackupTags(backup, ds),
strings.Join(sourcePaths, " "),
)
}
func (r *resticRepository) getSnapshotID(ctx context.Context, rctx RuntimeContext, backup *Backup, ds *Dataset) (string, error) {
data, err := rctx.Shell().Output(ctx, fmt.Sprintf(
"%s snapshots --json --tag backup_id=%s --tag %s",
"%s snapshots --json %s",
r.resticCmd(),
backup.ID,
ds.Name,
resticBackupTags(backup, ds),
))
if err != nil {
return "", err
......@@ -200,17 +202,16 @@ func (r *resticRepository) RestoreCmd(ctx context.Context, rctx RuntimeContext,
// A special path for stdin datasets that is likely to be unused by the
// rest of the filesystem (the path namespace in Restic is global).
func datasetStdinPath(ds *Dataset) string {
dsPath := filepath.Join(ds.Source, ds.Name)
dsPath := filepath.Join(ds.Source, ds.ID)
return fmt.Sprintf("/STDIN_%s", strings.Replace(dsPath, "/", "_", -1))
}
func (r *resticRepository) BackupStreamCmd(backup *Backup, ds *Dataset) string {
fakePath := datasetStdinPath(ds)
return fmt.Sprintf(
"%s backup --cleanup-cache --exclude-caches --tag %s --tag backup_id=%s --stdin --stdin-filename %s",
"%s backup --cleanup-cache --exclude-caches %s --stdin --stdin-filename %s",
r.resticCmd(),
ds.Name,
backup.ID,
resticBackupTags(backup, ds),
fakePath,
)
}
......
......@@ -155,7 +155,6 @@ func TestRestic(t *testing.T) {
},
Datasets: []*DatasetSpec{
&DatasetSpec{
Name: "files",
Atoms: []Atom{
{
Name: "f1",
......@@ -186,7 +185,6 @@ func TestRestic_Stream(t *testing.T) {
Schedule: "@random_every 1h",
Datasets: []*DatasetSpec{
&DatasetSpec{
Name: "f1",
Atoms: []Atom{
{
Name: "f1",
......@@ -212,7 +210,6 @@ func TestRestic_Stream_Compress(t *testing.T) {
Schedule: "@random_every 1h",
Datasets: []*DatasetSpec{
&DatasetSpec{
Name: "f1",
Atoms: []Atom{
{
Name: "f1",
......
......@@ -14,7 +14,7 @@ import (
// DatasetSpec describes a dataset in the configuration.
type DatasetSpec struct {
Name string `yaml:"name"`
//Name string `yaml:"name"`
Atoms []Atom `yaml:"atoms"`
AtomsCommand string `yaml:"atoms_command"`
......@@ -28,13 +28,13 @@ func (spec *DatasetSpec) Parse(ctx context.Context, src *SourceSpec) (*Dataset,
if spec.AtomsCommand != "" {
var cmdAtoms []Atom
if err := runYAMLCommand(ctx, spec.AtomsCommand, &cmdAtoms); err != nil {
return nil, fmt.Errorf("source %s: dataset %s: error in atoms command: %v", src.Name, spec.Name, err)
return nil, fmt.Errorf("source %s: error in atoms command: %v", src.Name, err)
}
atoms = append(atoms, cmdAtoms...)
}
return &Dataset{
Name: spec.Name,
ID: util.RandomID(),
Source: src.Name,
Atoms: atoms,
}, nil
......@@ -42,9 +42,6 @@ func (spec *DatasetSpec) Parse(ctx context.Context, src *SourceSpec) (*Dataset,
// Check syntactical validity of the DatasetSpec.
func (spec *DatasetSpec) Check() error {
if spec.Name == "" {
return errors.New("dataset name is not set")
}
if len(spec.Atoms) > 0 && spec.AtomsCommand != "" {
return errors.New("can't specify both 'atoms' and 'atoms_command'")
}
......@@ -102,7 +99,7 @@ func (spec *SourceSpec) Parse(ctx context.Context) ([]*Dataset, error) {
for _, dspec := range dspecs {
ds, err := dspec.Parse(ctx, spec)
if err != nil {
return nil, fmt.Errorf("error parsing dataset %s: %v", dspec.Name, err)
return nil, fmt.Errorf("error parsing dataset: %v", err)
}
datasets = append(datasets, ds)
}
......@@ -142,7 +139,7 @@ func (spec *SourceSpec) Check(handlers map[string]*HandlerSpec) error {
merr := new(util.MultiError)
for _, ds := range spec.Datasets {
if err := ds.Check(); err != nil {
merr.Add(fmt.Errorf("dataset %s: %v", ds.Name, err))
merr.Add(err)
}
}
return merr.OrNil()
......
......@@ -4,5 +4,4 @@ schedule: "@random_every 2m"
params:
path: /usr/share/misc
datasets:
- name: magic
atoms: [{name: magic}]
- atoms: [{name: magic}]
......@@ -77,7 +77,8 @@ type Atom struct {
type Dataset struct {
// Name of the dataset (path-like). Will be prepended to atom
// paths.
Name string `json:"name"`
//Name string `json:"name"`
ID string `json:"id"`
// Source is the name of the source that created this Dataset,
// stored so that the restore knows what to do.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment