Commit e5a46ad2 authored by ale's avatar ale

Refactor database layer

Handle sqlite concurrency issues properly, retrying on BUSY
errors. Use proper transaction wrappers.
parent 5d50b4e7
Pipeline #7155 passed with stages
in 1 minute and 53 seconds
......@@ -175,41 +175,32 @@ var statements = map[string]string{
// AddDataset adds metadata about a successful backup of a Dataset,
// along with its parent Backup object.
func (s *Service) AddDataset(ctx context.Context, backup tabacco.Backup, ds tabacco.Dataset) error {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback() // nolint
for _, dbAtom := range makeAtoms(backup, ds) {
if _, err := s.stmts.get(tx, "insert_atom").Exec(
dbAtom.BackupID,
dbAtom.BackupTimestamp,
dbAtom.BackupHost,
dbAtom.DatasetID,
dbAtom.DatasetSnapshotID,
dbAtom.DatasetSource,
dbAtom.AtomName,
dbAtom.AtomPath,
dbAtom.AtomFullPath,
); err != nil {
return err
}
}
return tx.Commit()
return retryBusy(ctx, func() error {
return withTX(ctx, s.db, func(tx *sql.Tx) error {
for _, dbAtom := range makeAtoms(backup, ds) {
if _, err := s.stmts.get(tx, "insert_atom").Exec(
dbAtom.BackupID,
dbAtom.BackupTimestamp,
dbAtom.BackupHost,
dbAtom.DatasetID,
dbAtom.DatasetSnapshotID,
dbAtom.DatasetSource,
dbAtom.AtomName,
dbAtom.AtomPath,
dbAtom.AtomFullPath,
); err != nil {
return err
}
}
return nil
})
})
}
// FindAtoms searches for atoms meeting a particular criteria and
// returns them grouped by backup and dataset (the atoms will be
// contained within the dataset).
func (s *Service) FindAtoms(ctx context.Context, req *tabacco.FindRequest) ([]*tabacco.Backup, error) {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return nil, err
}
defer tx.Rollback() // nolint
// Build the SQL query. Assemble the WHERE clauses and their
// respective arguments first.
var where []string
......@@ -237,26 +228,30 @@ func (s *Service) FindAtoms(ctx context.Context, req *tabacco.FindRequest) ([]*t
ORDER BY backup_timestamp DESC`,
strings.Join(where, " AND "),
)
rows, err := tx.Query(q, args...)
if err != nil {
return nil, err
}
defer rows.Close() // nolint
var atoms []*dbAtom
for rows.Next() {
var a dbAtom
if err := rows.Scan(
&a.BackupID, &a.BackupTimestamp, &a.BackupHost,
&a.DatasetID, &a.DatasetSnapshotID, &a.DatasetSource,
&a.AtomName, &a.AtomPath, &a.AtomFullPath,
); err != nil {
log.Printf("bad row: %v", err)
continue
err := withReadonlyTX(ctx, s.db, func(tx *sql.Tx) error {
rows, err := tx.Query(q, args...)
if err != nil {
return err
}
atoms = append(atoms, &a)
}
if err := rows.Err(); err != nil {
defer rows.Close() // nolint
for rows.Next() {
var a dbAtom
if err := rows.Scan(
&a.BackupID, &a.BackupTimestamp, &a.BackupHost,
&a.DatasetID, &a.DatasetSnapshotID, &a.DatasetSource,
&a.AtomName, &a.AtomPath, &a.AtomFullPath,
); err != nil {
log.Printf("bad row: %v", err)
continue
}
atoms = append(atoms, &a)
}
return rows.Err()
})
if err != nil {
return nil, err
}
......
......@@ -73,10 +73,10 @@ func TestService_FindAtoms(t *testing.T) {
},
)
if err != nil {
t.Fatal("FindAtoms", err)
t.Fatal("FindAtoms(1)", err)
}
if len(vv) != 10 {
t.Fatalf("bad result: %+v", vv)
t.Fatalf("bad result(1): %+v", vv)
}
// Augmenting the previous search with a host filter should
......@@ -89,9 +89,9 @@ func TestService_FindAtoms(t *testing.T) {
},
)
if err != nil {
t.Fatal("FindAtoms", err)
t.Fatal("FindAtoms(2)", err)
}
if len(vv) != 1 {
t.Fatalf("bad result: %+v", vv)
t.Fatalf("bad result(2): %+v", vv)
}
}
package server
import (
"context"
"database/sql"
"errors"
"fmt"
"log"
"math/rand"
"strings"
"time"
"github.com/mattes/migrate"
"github.com/mattes/migrate/database/sqlite3"
msqlite3 "github.com/mattes/migrate/database/sqlite3"
bindata "github.com/mattes/migrate/source/go-bindata"
"github.com/mattn/go-sqlite3"
"git.autistici.org/ai3/tools/tabacco/metadb/migrations"
)
func openDB(dbDriver, dbURI string) (*sql.DB, error) {
if dbDriver == "sqlite3" && !strings.Contains(dbURI, "?") {
dbURI += "?cache=shared&_busy_timeout=10000&_journal=WAL&_mutex=full"
}
db, err := sql.Open(dbDriver, dbURI)
if err != nil {
return nil, err
}
// Serialize connections for the sqlite driver, to
// prevent locking issues.
if dbDriver == "sqlite3" {
db.SetMaxOpenConns(1)
}
if err = runDatabaseMigrations(db, dbDriver); err != nil {
db.Close() // nolint
return nil, err
......@@ -50,8 +54,8 @@ func runDatabaseMigrations(db *sql.DB, dbDriver string) error {
return err
}
di, err := sqlite3.WithInstance(db, &sqlite3.Config{
MigrationsTable: sqlite3.DefaultMigrationsTable,
di, err := msqlite3.WithInstance(db, &msqlite3.Config{
MigrationsTable: msqlite3.DefaultMigrationsTable,
DatabaseName: "usermetadb",
})
if err != nil {
......@@ -94,3 +98,52 @@ func (m statementMap) Close() {
func (m statementMap) get(tx *sql.Tx, name string) *sql.Stmt {
return tx.Stmt(m[name])
}
func withTX(ctx context.Context, db *sql.DB, f func(*sql.Tx) error) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
if err := f(tx); err != nil {
tx.Rollback() // nolint
return err
}
return tx.Commit()
}
func withReadonlyTX(ctx context.Context, db *sql.DB, f func(*sql.Tx) error) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback() // nolint
return f(tx)
}
func isBusy(err error) bool {
switch e := err.(type) {
case sqlite3.Error:
return e.Code == 5
default:
return false
}
}
var defaultQueryTimeout = 3 * time.Second
func retryBusy(ctx context.Context, f func() error) error {
deadline, ok := ctx.Deadline()
if !ok {
deadline = time.Now().Add(defaultQueryTimeout)
}
for time.Now().Before(deadline) {
if err := f(); !isBusy(err) {
return err
}
// Random sleep, max 1ms.
time.Sleep(time.Duration(rand.Float64()) * time.Millisecond)
}
return errors.New("query timed out waiting to lock the database")
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment