Commit dc583566 authored by ale's avatar ale
Browse files

Add implementation of sql-based metadata server

parent 34d49feb
package metadb
//go:generate go-bindata --nocompress --pkg migrations --ignore \.go$ -o migrations/bindata.go -prefix migrations/ ./migrations
DROP INDEX idx_log_backup_id_and_dataset_name;
DROP INDEX idx_log_backup_id;
DROP INDEX idx_log_primary;
DROP TABLE log;
--- Initialize the database schema.
--- Denormalized log table.
CREATE TABLE log (
backup_id VARCHAR(128),
backup_timestamp DATETIME,
backup_host VARCHAR(128),
dataset_name VARCHAR(128),
dataset_handler VARCHAR(128),
atom_name VARCHAR(255),
atom_path VARCHAR(255),
atom_source_path TEXT,
atom_relative_path TEXT
);
CREATE UNIQUE INDEX idx_log_primary ON log (backup_id, dataset_name, atom_path);
CREATE INDEX idx_log_backup_id ON log (backup_id);
CREATE INDEX idx_log_backup_id_and_dataset_name ON log (backup_id, dataset_name);
// Code generated by go-bindata.
// sources:
// migrations/1_initialize_schema.down.sql
// migrations/1_initialize_schema.up.sql
// DO NOT EDIT!
package migrations
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
"time"
)
type asset struct {
bytes []byte
info os.FileInfo
}
type bindataFileInfo struct {
name string
size int64
mode os.FileMode
modTime time.Time
}
func (fi bindataFileInfo) Name() string {
return fi.name
}
func (fi bindataFileInfo) Size() int64 {
return fi.size
}
func (fi bindataFileInfo) Mode() os.FileMode {
return fi.mode
}
func (fi bindataFileInfo) ModTime() time.Time {
return fi.modTime
}
func (fi bindataFileInfo) IsDir() bool {
return false
}
func (fi bindataFileInfo) Sys() interface{} {
return nil
}
var __1_initialize_schemaDownSql = []byte(`
DROP INDEX idx_log_backup_id_and_dataset_name;
DROP INDEX idx_log_backup_id;
DROP INDEX idx_log_primary;
DROP TABLE log;
`)
func _1_initialize_schemaDownSqlBytes() ([]byte, error) {
return __1_initialize_schemaDownSql, nil
}
func _1_initialize_schemaDownSql() (*asset, error) {
bytes, err := _1_initialize_schemaDownSqlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "1_initialize_schema.down.sql", size: 123, mode: os.FileMode(436), modTime: time.Unix(1532974389, 0)}
a := &asset{bytes: bytes, info: info}
return a, nil
}
var __1_initialize_schemaUpSql = []byte(`
--- Initialize the database schema.
--- Denormalized log table.
CREATE TABLE log (
backup_id VARCHAR(128),
backup_timestamp DATETIME,
backup_host VARCHAR(128),
dataset_name VARCHAR(128),
dataset_handler VARCHAR(128),
atom_name VARCHAR(255),
atom_path VARCHAR(255),
atom_source_path TEXT,
atom_relative_path TEXT
);
CREATE UNIQUE INDEX idx_log_primary ON log (backup_id, dataset_name, atom_path);
CREATE INDEX idx_log_backup_id ON log (backup_id);
CREATE INDEX idx_log_backup_id_and_dataset_name ON log (backup_id, dataset_name);
`)
func _1_initialize_schemaUpSqlBytes() ([]byte, error) {
return __1_initialize_schemaUpSql, nil
}
func _1_initialize_schemaUpSql() (*asset, error) {
bytes, err := _1_initialize_schemaUpSqlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "1_initialize_schema.up.sql", size: 604, mode: os.FileMode(436), modTime: time.Unix(1532985344, 0)}
a := &asset{bytes: bytes, info: info}
return a, nil
}
// Asset loads and returns the asset for the given name.
// It returns an error if the asset could not be found or
// could not be loaded.
func Asset(name string) ([]byte, error) {
cannonicalName := strings.Replace(name, "\\", "/", -1)
if f, ok := _bindata[cannonicalName]; ok {
a, err := f()
if err != nil {
return nil, fmt.Errorf("Asset %s can't read by error: %v", name, err)
}
return a.bytes, nil
}
return nil, fmt.Errorf("Asset %s not found", name)
}
// MustAsset is like Asset but panics when Asset would return an error.
// It simplifies safe initialization of global variables.
func MustAsset(name string) []byte {
a, err := Asset(name)
if err != nil {
panic("asset: Asset(" + name + "): " + err.Error())
}
return a
}
// AssetInfo loads and returns the asset info for the given name.
// It returns an error if the asset could not be found or
// could not be loaded.
func AssetInfo(name string) (os.FileInfo, error) {
cannonicalName := strings.Replace(name, "\\", "/", -1)
if f, ok := _bindata[cannonicalName]; ok {
a, err := f()
if err != nil {
return nil, fmt.Errorf("AssetInfo %s can't read by error: %v", name, err)
}
return a.info, nil
}
return nil, fmt.Errorf("AssetInfo %s not found", name)
}
// AssetNames returns the names of the assets.
func AssetNames() []string {
names := make([]string, 0, len(_bindata))
for name := range _bindata {
names = append(names, name)
}
return names
}
// _bindata is a table, holding each asset generator, mapped to its name.
var _bindata = map[string]func() (*asset, error){
"1_initialize_schema.down.sql": _1_initialize_schemaDownSql,
"1_initialize_schema.up.sql": _1_initialize_schemaUpSql,
}
// AssetDir returns the file names below a certain
// directory embedded in the file by go-bindata.
// For example if you run go-bindata on data/... and data contains the
// following hierarchy:
// data/
// foo.txt
// img/
// a.png
// b.png
// then AssetDir("data") would return []string{"foo.txt", "img"}
// AssetDir("data/img") would return []string{"a.png", "b.png"}
// AssetDir("foo.txt") and AssetDir("notexist") would return an error
// AssetDir("") will return []string{"data"}.
func AssetDir(name string) ([]string, error) {
node := _bintree
if len(name) != 0 {
cannonicalName := strings.Replace(name, "\\", "/", -1)
pathList := strings.Split(cannonicalName, "/")
for _, p := range pathList {
node = node.Children[p]
if node == nil {
return nil, fmt.Errorf("Asset %s not found", name)
}
}
}
if node.Func != nil {
return nil, fmt.Errorf("Asset %s not found", name)
}
rv := make([]string, 0, len(node.Children))
for childName := range node.Children {
rv = append(rv, childName)
}
return rv, nil
}
type bintree struct {
Func func() (*asset, error)
Children map[string]*bintree
}
var _bintree = &bintree{nil, map[string]*bintree{
"1_initialize_schema.down.sql": &bintree{_1_initialize_schemaDownSql, map[string]*bintree{}},
"1_initialize_schema.up.sql": &bintree{_1_initialize_schemaUpSql, map[string]*bintree{}},
}}
// RestoreAsset restores an asset under the given directory
func RestoreAsset(dir, name string) error {
data, err := Asset(name)
if err != nil {
return err
}
info, err := AssetInfo(name)
if err != nil {
return err
}
err = os.MkdirAll(_filePath(dir, filepath.Dir(name)), os.FileMode(0755))
if err != nil {
return err
}
err = ioutil.WriteFile(_filePath(dir, name), data, info.Mode())
if err != nil {
return err
}
err = os.Chtimes(_filePath(dir, name), info.ModTime(), info.ModTime())
if err != nil {
return err
}
return nil
}
// RestoreAssets restores an asset under the given directory recursively
func RestoreAssets(dir, name string) error {
children, err := AssetDir(name)
// File
if err != nil {
return RestoreAsset(dir, name)
}
// Dir
for _, child := range children {
err = RestoreAssets(dir, filepath.Join(name, child))
if err != nil {
return err
}
}
return nil
}
func _filePath(dir, name string) string {
cannonicalName := strings.Replace(name, "\\", "/", -1)
return filepath.Join(append([]string{dir}, strings.Split(cannonicalName, "/")...)...)
}
package server
import (
"log"
"net/http"
"git.autistici.org/ai3/go-common/serverutil"
"git.autistici.org/ale/tabacco"
)
type httpServer struct {
*Service
}
type addDatasetRequest struct {
Backup tabacco.Backup `json:"backup"`
Dataset tabacco.Dataset `json:"dataset"`
}
func (s *httpServer) handleAddDataset(w http.ResponseWriter, r *http.Request) {
var req addDatasetRequest
if !serverutil.DecodeJSONRequest(w, r, &req) {
return
}
if err := s.AddDataset(r.Context(), req.Backup, req.Dataset); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
log.Printf("AddDataset(%+v) error: %v", req, err)
return
}
serverutil.EncodeJSONResponse(w, struct{}{})
}
func (s *httpServer) handleFindAtoms(w http.ResponseWriter, r *http.Request) {
var req tabacco.FindRequest
if !serverutil.DecodeJSONRequest(w, r, &req) {
return
}
resp, err := s.FindAtoms(r.Context(), req)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
log.Printf("FindAtoms(%+v) error: %v", req, err)
return
}
serverutil.EncodeJSONResponse(w, resp)
}
func (s *httpServer) Handler() http.Handler {
m := http.NewServeMux()
m.HandleFunc("/api/add_dataset", s.handleAddDataset)
m.HandleFunc("/api/find_atoms", s.handleFindAtoms)
return m
}
// Serve the specified service over HTTP with the given config.
func Serve(svc *Service, config *serverutil.ServerConfig, addr string) error {
httpSrv := &httpServer{svc}
return serverutil.Serve(httpSrv.Handler(), config, addr)
}
package server
import (
"context"
"database/sql"
"log"
"path/filepath"
"strings"
"time"
"git.autistici.org/ale/tabacco"
)
// An atom, as represented in the database, denormalized.
type dbAtom struct {
BackupID string
BackupTimestamp time.Time
BackupHost string
DatasetName string
DatasetHandler string
AtomName string
AtomPath string
AtomSourcePath string
AtomRelativePath string
}
func makeAtoms(backup tabacco.Backup, ds tabacco.Dataset) []dbAtom {
var out []dbAtom
for _, atom := range ds.Atoms {
out = append(out, dbAtom{
BackupID: backup.ID,
BackupTimestamp: backup.Timestamp,
BackupHost: backup.Host,
DatasetName: ds.Name,
DatasetHandler: ds.Handler,
AtomName: atom.Name,
AtomPath: filepath.Join(ds.Name, atom.Name),
})
}
return out
}
func (a *dbAtom) getBackup() *tabacco.Backup {
return &tabacco.Backup{
ID: a.BackupID,
Timestamp: a.BackupTimestamp,
Host: a.BackupHost,
}
}
func (a *dbAtom) getDataset() *tabacco.Dataset {
return &tabacco.Dataset{
Name: a.DatasetName,
Handler: a.DatasetHandler,
}
}
func (a *dbAtom) getAtom() tabacco.Atom {
return tabacco.Atom{
Name: a.AtomName,
RelativePath: a.AtomRelativePath,
SourcePath: a.AtomSourcePath,
}
}
func normalizeAtoms(dbAtoms []dbAtom) [][]tabacco.Version {
// Accumulate versions keyed by backup ID first, dataset name
// next.
backupMap := make(map[string]*tabacco.Backup)
dsMap := make(map[string]map[string]*tabacco.Dataset)
for _, atom := range dbAtoms {
// Create the Backup object if it does not exist.
if _, ok := backupMap[atom.BackupID]; !ok {
backupMap[atom.BackupID] = atom.getBackup()
}
// Create the Dataset object for this Backup in the
// two-level map (creating the intermediate map if
// necessary).
tmp, ok := dsMap[atom.BackupID]
if !ok {
tmp = make(map[string]*tabacco.Dataset)
dsMap[atom.BackupID] = tmp
}
ds, ok := tmp[atom.DatasetName]
if !ok {
ds = atom.getDataset()
tmp[atom.DatasetName] = ds
}
// Finally, add the atom to the dataset.
ds.Atoms = append(ds.Atoms, atom.getAtom())
}
// Now dump the maps to a Version array.
var out [][]tabacco.Version
for backupID, tmp := range dsMap {
backup := backupMap[backupID]
var tmpv []tabacco.Version
for _, ds := range tmp {
tmpv = append(tmpv, tabacco.Version{Backup: *backup, Dataset: *ds})
}
out = append(out, tmpv)
}
return out
}
// Service implementation of the tabacco backup metadata server API.
type Service struct {
db *sql.DB
stmts statementMap
}
// New creates a new service and returns it.
func New(dbDriver, dbURI string) (*Service, error) {
db, err := openDB(dbDriver, dbURI)
if err != nil {
return nil, err
}
stmts, err := newStatementMap(db, statements)
if err != nil {
return nil, err
}
return &Service{db, stmts}, nil
}
// Close the underlying database and all associated resources.
func (s *Service) Close() {
s.db.Close() // nolint
}
var statements = map[string]string{
"insert_atom": `
INSERT INTO log (
backup_id, backup_timestamp, backup_host,
dataset_name, dataset_handler,
atom_name, atom_path, atom_source_path, atom_relative_path
) VALUES (
?, ?, ?, ?, ?, ?, ?, ?, ?
)
`,
}
// AddDataset adds metadata about a successful backup of a Dataset,
// along with its parent Backup object.
func (s *Service) AddDataset(ctx context.Context, backup tabacco.Backup, ds tabacco.Dataset) error {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback() // nolint
for _, dbAtom := range makeAtoms(backup, ds) {
if _, err := s.stmts.get(tx, "insert_atom").Exec(
dbAtom.BackupID,
dbAtom.BackupTimestamp,
dbAtom.BackupHost,
dbAtom.DatasetName,
dbAtom.DatasetHandler,
dbAtom.AtomName,
dbAtom.AtomPath,
dbAtom.AtomSourcePath,
dbAtom.AtomRelativePath,
); err != nil {
return err
}
}
return tx.Commit()
}
// FindAtoms searches for atoms meeting a particular criteria and
// returns them grouped by backup and dataset (the atoms will be
// contained within the dataset).
func (s *Service) FindAtoms(ctx context.Context, req tabacco.FindRequest) ([][]tabacco.Version, error) {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return nil, err
}
defer tx.Rollback() // nolint
var where []string
var args []interface{}
if req.Host != "" {
where = append(where, "backup_host = ?")
args = append(args, req.Host)
}
if req.Pattern != "" {
where = append(where, "atom_path LIKE ?")
args = append(args, strings.Replace(req.Pattern, "*", "%", -1))
}
q := ` SELECT
backup_id, backup_timestamp, backup_host,
dataset_name, dataset_handler,
atom_name, atom_path, atom_source_path, atom_relative_path
FROM log WHERE ` + strings.Join(where, " AND ")
rows, err := tx.Query(q, args...)
if err != nil {
return nil, err
}
defer rows.Close() // nolint
var atoms []dbAtom
for rows.Next() {
var a dbAtom
if err := rows.Scan(
&a.BackupID, &a.BackupTimestamp, &a.BackupHost,
&a.DatasetName, &a.DatasetHandler,
&a.AtomName, &a.AtomPath, &a.AtomSourcePath, &a.AtomRelativePath,
); err != nil {
log.Printf("bad row: %v", err)
continue
}
atoms = append(atoms, a)
}
if err := rows.Err(); err != nil {
return nil, err
}
return normalizeAtoms(atoms), nil
}
package server
import (
"context"
"fmt"
"os"
"testing"
"time"
"git.autistici.org/ale/tabacco"
)
const DBFILE = ".service_test.db"
func addTestEntry(t *testing.T, svc *Service, backupID, host, dsName string) {
err := svc.AddDataset(
context.Background(),
tabacco.Backup{
ID: backupID,
Host: host,
Timestamp: time.Now(),
},
tabacco.Dataset{
Name: dsName,
Handler: "file",
Atoms: []tabacco.Atom{
{
Name: "sub1",
SourcePath: "/path/dataset1/sub1",
},
{
Name: "sub2",
SourcePath: "/path/dataset1/sub2",
},
},
},
)
if err != nil {
t.Fatal("AddDataset", err)
}