Commit ad30ec3d authored by ale's avatar ale

Allow metadata expiration

Adds the max_metadata_age_days parameter to the metadb configuration,
and runs a periodic thread to expire the old metadata.
parent 3ebc4afd
Pipeline #8008 failed with stages
in 58 seconds
......@@ -35,6 +35,7 @@ type metadbConfig struct {
Driver string `yaml:"db_driver"`
DBURI string `yaml:"db_uri"`
ServerConfig *serverutil.ServerConfig `yaml:"http_server"`
MaxAgeDays int `yaml:"max_metadata_age_days"`
}
func loadMetadbConfig(path string) (*metadbConfig, error) {
......
......@@ -10,7 +10,7 @@ import (
func TestDebugPage(t *testing.T) {
defer os.Remove(DBFILE)
svc, err := New("sqlite3", DBFILE)
svc, err := New("sqlite3", DBFILE, 0)
if err != nil {
t.Fatal(err)
}
......
......@@ -136,12 +136,14 @@ func groupByBackup(dbAtoms []*dbAtom) []*tabacco.Backup {
// Service implementation of the tabacco backup metadata server API.
type Service struct {
db *sql.DB
stmts statementMap
db *sql.DB
stmts statementMap
maxAge int
stop chan struct{}
}
// New creates a new service and returns it.
func New(dbDriver, dbURI string) (*Service, error) {
func New(dbDriver, dbURI string, maxMetadataAgeDays int) (*Service, error) {
db, err := openDB(dbDriver, dbURI)
if err != nil {
return nil, err
......@@ -152,11 +154,35 @@ func New(dbDriver, dbURI string) (*Service, error) {
return nil, err
}
return &Service{db, stmts}, nil
s := &Service{
db: db,
stmts: stmts,
maxAge: maxMetadataAgeDays,
stop: make(chan struct{}),
}
if s.maxAge > 0 {
go func() {
t := time.NewTicker(12 * time.Hour)
defer t.Stop()
for {
select {
case <-t.C:
if err := s.purge(); err != nil {
log.Printf("purge() error: %v", err)
}
case <-s.stop:
return
}
}
}()
}
return s, nil
}
// Close the underlying database and all associated resources.
func (s *Service) Close() {
close(s.stop)
s.db.Close() // nolint
}
......@@ -284,3 +310,18 @@ func (s *Service) FindAtoms(ctx context.Context, req *tabacco.FindRequest) ([]*t
return groupByBackup(keepNumVersions(atoms, req.NumVersions)), nil
}
func (s *Service) purge() error {
cutoff := time.Now().Add(time.Duration(-s.maxAge) * 24 * time.Hour)
ctx := context.Background()
return retryBusy(ctx, func() error {
return withTX(ctx, s.db, func(tx *sql.Tx) error {
_, err := tx.Exec(
`DELETE FROM log WHERE backup_timestamp < ?`,
cutoff,
)
return err
})
})
}
......@@ -41,7 +41,7 @@ func addTestEntry(t *testing.T, svc *Service, backupID, host, dsName string) {
func TestService_AddDataset(t *testing.T) {
defer os.Remove(DBFILE)
svc, err := New("sqlite3", DBFILE)
svc, err := New("sqlite3", DBFILE, 0)
if err != nil {
t.Fatal(err)
}
......@@ -52,7 +52,7 @@ func TestService_AddDataset(t *testing.T) {
func TestService_FindAtoms(t *testing.T) {
defer os.Remove(DBFILE)
svc, err := New("sqlite3", DBFILE)
svc, err := New("sqlite3", DBFILE, 0)
if err != nil {
t.Fatal(err)
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment