manager_test.go 5.63 KB
Newer Older
ale's avatar
ale committed
1 2 3 4
package tabacco

import (
	"context"
ale's avatar
ale committed
5
	"log"
6
	"path/filepath"
7
	"sync"
ale's avatar
ale committed
8 9
	"testing"
	"time"
10

ale's avatar
ale committed
11
	"git.autistici.org/ai3/tools/tabacco/jobs"
ale's avatar
ale committed
12 13 14 15 16
)

type dummyMetadataEntry struct {
	backupID string
	backupTS time.Time
17
	dsID     string
ale's avatar
ale committed
18
	host     string
ale's avatar
ale committed
19
	source   string
20
	path     string
ale's avatar
ale committed
21 22 23
	atom     Atom
}

ale's avatar
ale committed
24
func (e dummyMetadataEntry) match(req *FindRequest) bool {
25 26 27 28
	if req.Pattern != "" {
		if !req.matchPattern(e.path) {
			return false
		}
ale's avatar
ale committed
29 30 31 32 33 34 35
	}
	if req.Host != "" && req.Host != e.host {
		return false
	}
	return true
}

ale's avatar
ale committed
36 37
func (e dummyMetadataEntry) toDataset() *Dataset {
	return &Dataset{
38
		ID:     e.dsID,
ale's avatar
ale committed
39
		Source: e.source,
ale's avatar
ale committed
40 41 42
	}
}

ale's avatar
ale committed
43 44
func (e dummyMetadataEntry) toBackup() *Backup {
	return &Backup{
ale's avatar
ale committed
45 46 47 48 49 50 51
		ID:        e.backupID,
		Timestamp: e.backupTS,
		Host:      e.host,
	}
}

type dummyMetadataStore struct {
52
	mx  sync.Mutex
ale's avatar
ale committed
53 54 55
	log []dummyMetadataEntry
}

56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
// Argh! This is copy&pasted from server/service.go, but with minor
// modifications due to the different types... terrible.
func keepNumVersions(dbAtoms []dummyMetadataEntry, numVersions int) []dummyMetadataEntry {
	// numVersions == 0 is remapped to 1.
	if numVersions < 1 {
		numVersions = 1
	}

	count := 0
	tmp := make(map[string][]dummyMetadataEntry)
	for _, a := range dbAtoms {
		l := tmp[a.path]
		if len(l) < numVersions {
			l = append(l, a)
			count++
ale's avatar
ale committed
71
		}
72 73 74 75 76 77 78 79
		tmp[a.path] = l
	}
	out := make([]dummyMetadataEntry, 0, count)
	for _, l := range tmp {
		out = append(out, l...)
	}
	return out
}
ale's avatar
ale committed
80

81 82 83 84 85 86 87 88
func groupByBackup(dbAtoms []dummyMetadataEntry) []*Backup {
	// As we scan through dbAtoms, aggregate into Backups and Datasets.
	backups := make(map[string]*Backup)
	dsm := make(map[string]map[string]*Dataset)

	for _, atom := range dbAtoms {
		// Create the Backup object if it does not exist.
		b, ok := backups[atom.backupID]
ale's avatar
ale committed
89
		if !ok {
90 91
			b = atom.toBackup()
			backups[atom.backupID] = b
ale's avatar
ale committed
92
		}
93 94 95 96 97 98 99 100 101

		// Create the Dataset object for this Backup in the
		// two-level map (creating the intermediate map if
		// necessary).
		tmp, ok := dsm[atom.backupID]
		if !ok {
			tmp = make(map[string]*Dataset)
			dsm[atom.backupID] = tmp
		}
102 103
		// Match datasets by their unique ID.
		ds, ok := tmp[atom.dsID]
104 105
		if !ok {
			ds = atom.toDataset()
106
			tmp[atom.dsID] = ds
107 108 109 110 111
			b.Datasets = append(b.Datasets, ds)
		}

		// Finally, add the atom to the dataset.
		ds.Atoms = append(ds.Atoms, atom.atom)
ale's avatar
ale committed
112 113
	}

114 115 116
	out := make([]*Backup, 0, len(backups))
	for _, b := range backups {
		out = append(out, b)
ale's avatar
ale committed
117
	}
118 119
	return out
}
ale's avatar
ale committed
120

121
func (d *dummyMetadataStore) FindAtoms(_ context.Context, req *FindRequest) ([]*Backup, error) {
122 123 124
	d.mx.Lock()
	defer d.mx.Unlock()

125 126 127 128
	var tmp []dummyMetadataEntry
	for _, l := range d.log {
		if !l.match(req) {
			continue
ale's avatar
ale committed
129
		}
130
		tmp = append(tmp, l)
ale's avatar
ale committed
131
	}
132 133

	return groupByBackup(keepNumVersions(tmp, req.NumVersions)), nil
ale's avatar
ale committed
134 135
}

ale's avatar
ale committed
136
func (d *dummyMetadataStore) AddDataset(_ context.Context, backup *Backup, ds *Dataset) error {
137 138 139
	d.mx.Lock()
	defer d.mx.Unlock()

ale's avatar
ale committed
140
	log.Printf("AddDataset: %+v", *ds)
ale's avatar
ale committed
141
	for _, atom := range ds.Atoms {
142
		path := filepath.Join(ds.Source, atom.Name)
ale's avatar
ale committed
143 144 145 146
		d.log = append(d.log, dummyMetadataEntry{
			backupID: backup.ID,
			backupTS: backup.Timestamp,
			host:     backup.Host,
147
			path:     path,
148
			dsID:     ds.ID,
ale's avatar
ale committed
149
			source:   ds.Source,
ale's avatar
ale committed
150 151 152 153 154 155
			atom:     atom,
		})
	}
	return nil
}

ale's avatar
ale committed
156
func TestManager_Backup(t *testing.T) {
ale's avatar
ale committed
157 158 159 160 161 162 163 164 165
	store := &dummyMetadataStore{}
	repoSpec := RepositorySpec{
		Name: "main",
		Type: "restic",
		Params: map[string]interface{}{
			"uri":      "/tmp/restic/repo",
			"password": "testpass",
		},
	}
ale's avatar
ale committed
166 167
	handlerSpecs := []*HandlerSpec{
		&HandlerSpec{
ale's avatar
ale committed
168 169 170 171 172
			Name: "file1",
			Type: "file",
			Params: map[string]interface{}{
				"path": "/source/of/file1",
			},
ale's avatar
ale committed
173
			//PreBackupCommand: "echo hello",
ale's avatar
ale committed
174
		},
ale's avatar
ale committed
175
		&HandlerSpec{
ale's avatar
ale committed
176 177 178
			Name: "dbpipe",
			Type: "pipe",
			Params: map[string]interface{}{
ale's avatar
ale committed
179
				"backup_command":  "echo ${backup.id} ${ds.name} ${atom.names}",
ale's avatar
ale committed
180
				"restore_command": "cat",
ale's avatar
ale committed
181 182 183
			},
		},
	}
ale's avatar
ale committed
184 185 186 187 188 189 190 191
	sourceSpecs := []*SourceSpec{
		&SourceSpec{
			Name:     "source1",
			Handler:  "file1",
			Schedule: "@random_every 1h",
			Datasets: []*DatasetSpec{
				&DatasetSpec{
					Atoms: []Atom{{Name: "user1"}},
ale's avatar
ale committed
192
				},
ale's avatar
ale committed
193 194
				&DatasetSpec{
					Atoms: []Atom{{Name: "user2"}},
ale's avatar
ale committed
195 196 197
				},
			},
		},
ale's avatar
ale committed
198 199 200 201 202
		&SourceSpec{
			Name:            "source2",
			Handler:         "dbpipe",
			Schedule:        "@random_every 1h",
			DatasetsCommand: "echo '[{name: users, atoms: [{name: user1}, {name: user2}]}]'",
ale's avatar
ale committed
203 204
		},
	}
ale's avatar
ale committed
205
	queueSpec := &jobs.QueueSpec{
ale's avatar
ale committed
206
		Concurrency: 2,
ale's avatar
ale committed
207
	}
ale's avatar
ale committed
208 209

	// Run the backup.
ale's avatar
ale committed
210
	configMgr, err := NewConfigManager(&Config{
ale's avatar
ale committed
211 212 213 214 215 216 217 218 219 220 221 222
		Queue:        queueSpec,
		Repository:   repoSpec,
		HandlerSpecs: handlerSpecs,
		SourceSpecs:  sourceSpecs,
		DryRun:       true,
	})
	if err != nil {
		t.Fatal(err)
	}
	defer configMgr.Close()

	m, err := NewManager(context.TODO(), configMgr, store)
ale's avatar
ale committed
223 224 225 226 227
	if err != nil {
		t.Fatal(err)
	}
	defer m.Close()

ale's avatar
ale committed
228
	for _, src := range configMgr.current().SourceSpecs() {
ale's avatar
ale committed
229 230 231 232 233 234 235
		backup, err := m.Backup(context.TODO(), src)
		if err != nil {
			t.Fatal(err)
		}
		if backup.ID == "" || backup.Host == "" {
			t.Fatalf("empty fields in backup: %+v", backup)
		}
ale's avatar
ale committed
236 237 238 239
	}

	// Try to find atoms in the metadata store.
	// Let's try with a pattern first.
ale's avatar
ale committed
240
	resp, err := store.FindAtoms(context.TODO(), &FindRequest{Pattern: "source1/*", NumVersions: 1})
ale's avatar
ale committed
241 242 243
	if err != nil {
		t.Fatal("FindAtoms", err)
	}
ale's avatar
ale committed
244 245 246 247 248
	if len(resp) != 1 {
		t.Fatalf("bad FindAtoms(source1/*) response: %+v", resp)
	}
	if l := len(resp[0].Datasets); l != 2 {
		t.Fatalf("bad number of datasets returned by FindAtoms(source1/*): got %d, expected 2", l)
ale's avatar
ale committed
249 250 251
	}

	// A pattern matching a single atom.
252
	resp, err = store.FindAtoms(context.TODO(), &FindRequest{Pattern: "source1/user2"})
ale's avatar
ale committed
253 254 255 256
	if err != nil {
		t.Fatal("FindAtoms", err)
	}
	if len(resp) != 1 {
257
		t.Fatalf("bad FindAtoms(source1/user2) response: %+v", resp)
ale's avatar
ale committed
258 259
	}
}