Commit 1de863a6 authored by ale's avatar ale

Implement progressive user log compaction

Trim the number of log entries per user to 100 at most, but only run
the (relatively expensive) compaction for users that actually log in:
this is determined by keeping a small in memory cache of active
usernames hooked to 'add_log'.

Also comes with a simple implementation of cron-like background jobs.
parent 72123379
package server
import (
"database/sql"
"sync"
"time"
)
var (
// How often to run the user log compaction.
compactionInterval = 600 * time.Second
// How many log entries to keep when compacting.
userKeepLogCount = 100
// How often to run the log pruner.
pruneInterval = 24 * time.Hour
// Delete entries older than this many days.
pruneCutoffDays = 365
)
// Call a function at semi-regular intervals.
type cronJob struct {
f func()
d time.Duration
stopCh chan struct{}
doneCh chan struct{}
}
func newCron(f func(), d time.Duration) *cronJob {
j := &cronJob{
f: f,
d: d,
stopCh: make(chan struct{}),
doneCh: make(chan struct{}),
}
go j.run()
return j
}
// Stop the job and wait for it to exit.
func (j *cronJob) Stop() {
close(j.stopCh)
<-j.doneCh
}
// Wait some time, or be interrupted by Stop().
func (j *cronJob) waitSomeTime() bool {
t := time.NewTimer(j.d)
defer t.Stop()
select {
case <-j.stopCh:
return false
case <-t.C:
return true
}
}
func (j *cronJob) run() {
defer close(j.doneCh)
for {
if !j.waitSomeTime() {
return
}
j.f()
}
}
type usernameSet struct {
mx sync.Mutex
usernames map[string]struct{}
}
func newUsernameSet() *usernameSet {
return &usernameSet{
usernames: make(map[string]struct{}),
}
}
func (s *usernameSet) add(username string) {
s.mx.Lock()
s.usernames[username] = struct{}{}
s.mx.Unlock()
}
func (s *usernameSet) foreach(f func(string)) {
s.mx.Lock()
for username := range s.usernames {
delete(s.usernames, username)
f(username)
}
s.mx.Unlock()
}
// Leisurely consolidate logs for users, but only when necessary, and
// proceeding at a slow pace. We wake up once per minute, and clean up
// the log history for users that have logged in since.
func compactLogs(db *sql.DB, stmts statementMap, username string, count int) error {
tx, err := db.Begin()
if err != nil {
return err
}
if _, err := stmts.get(tx, "consolidate_userlog").Exec(username, count); err != nil {
return err
}
return tx.Commit()
}
// Remove old entries (both logs and devices) from the database.
func pruneLogs(db *sql.DB, stmts statementMap) error {
// Run each batch deletion in its own transaction, just to be nice.
cutoff := time.Now().AddDate(0, 0, -pruneCutoffDays)
for _, stmt := range []string{"prune_userlog", "prune_device_info"} {
tx, err := db.Begin()
if err != nil {
return err
}
if _, err := stmts.get(tx, stmt).Exec(cutoff); err != nil {
tx.Rollback() // nolint
return err
}
if err := tx.Commit(); err != nil {
return err
}
}
return nil
}
package server
import (
"database/sql"
"os"
"testing"
)
func countLogs(db *sql.DB) int {
var count int
db.QueryRow("SELECT COUNT(*) FROM userlog").Scan(&count) // nolint
return count
}
func TestPruneLogs(t *testing.T) {
defer os.Remove("test.db")
db, err := openDB("test.db")
if err != nil {
t.Fatal(err)
}
defer db.Close()
stmts, err := newStatementMap(db, userlogDBStatements)
if err != nil {
t.Fatal(err)
}
bulkLoadTestLogs(t, db)
n := countLogs(db)
if n == 0 {
t.Fatal("no logs loaded?")
}
if err := pruneLogs(db, stmts); err != nil {
t.Fatal("pruneLogs():", err)
}
n2 := countLogs(db)
if n2 >= n {
t.Fatalf("unexpected log count after prune: %d (should be < %d)", n2, n)
}
}
func countLogsForUser(db *sql.DB, username string) int {
var count int
db.QueryRow("SELECT COUNT(*) FROM userlog WHERE username = ?", username).Scan(&count) // nolint
return count
}
func TestCompactLogs(t *testing.T) {
defer os.Remove("test.db")
db, err := openDB("test.db")
if err != nil {
t.Fatal(err)
}
defer db.Close()
stmts, err := newStatementMap(db, userlogDBStatements)
if err != nil {
t.Fatal(err)
}
bulkLoadTestLogs(t, db)
user1 := randomUsernames[0]
user2 := randomUsernames[1]
n1 := countLogsForUser(db, user1)
n2 := countLogsForUser(db, user2)
if n1 == 0 {
t.Fatal("no logs loaded?")
}
// Find a reasonable limit.
limit := n1 / 2
if err := compactLogs(db, stmts, user1, limit); err != nil {
t.Fatal("compactLogs():", err)
}
n1post := countLogsForUser(db, user1)
n2post := countLogsForUser(db, user2)
if n2post != n2 {
t.Errorf("log compaction removed logs for other users (%s: %d -> %d)", user2, n2, n2post)
}
if n1post == 0 {
t.Error("log compaction wiped all user logs")
}
if n1post != limit {
t.Errorf("unexpected log count after compaction: %d (should be %d)", n1post, limit)
}
}
......@@ -2,6 +2,7 @@ package server
import (
"database/sql"
"log"
"time"
"git.autistici.org/id/auth"
......@@ -51,6 +52,27 @@ var userlogDBStatements = map[string]string{
"prune_device_info": `
DELETE FROM devices WHERE last_seen < ?
`,
// Cut the log history for a specific user. This version is
// sqlite-specific (uses "rowid"), when version 3.25 reaches
// Debian stable we'll be able to switch to using a window
// function (as in the commented version below).
"consolidate_userlog": `
DELETE FROM userlog WHERE rowid IN (
SELECT rowid
FROM userlog
WHERE username = ?
ORDER BY timestamp DESC LIMIT -1 OFFSET ?
)
`,
// "consolidate_userlog": `
// WITH rows AS (
// SELECT rowid, ROW_NUMBER()
// OVER (PARTITION BY username ORDER BY timestamp DESC) n
// FROM userlog
// WHERE username = ? AND n > 100
// )
// DELETE FROM userlog WHERE rowid IN (SELECT n FROM rows)
// `,
// Retrieve logs for a specific user.
"get_user_logs": `
......@@ -85,8 +107,10 @@ var userlogDBStatements = map[string]string{
}
type userlogDB struct {
db *sql.DB
stmts statementMap
db *sql.DB
stmts statementMap
cronjobs []*cronJob
pendingCompaction *usernameSet
}
func newUserlogDB(db *sql.DB) (*userlogDB, error) {
......@@ -94,13 +118,34 @@ func newUserlogDB(db *sql.DB) (*userlogDB, error) {
if err != nil {
return nil, err
}
pendingCompaction := newUsernameSet()
cronjobs := []*cronJob{
newCron(func() {
pendingCompaction.foreach(func(username string) {
if err := compactLogs(db, stmts, username, userKeepLogCount); err != nil {
log.Printf("error cleaning up logs for user %s: %v", username, err)
}
})
}, compactionInterval),
newCron(func() {
if err := pruneLogs(db, stmts); err != nil {
log.Printf("error in log pruning: %v", err)
}
}, pruneInterval),
}
return &userlogDB{
db: db,
stmts: stmts,
db: db,
stmts: stmts,
cronjobs: cronjobs,
pendingCompaction: pendingCompaction,
}, nil
}
func (u *userlogDB) Close() {
for _, j := range u.cronjobs {
j.Stop()
}
u.stmts.Close()
}
......@@ -155,7 +200,7 @@ func (u *userlogDB) AddLog(entry *usermetadb.LogEntry) error {
if err != nil {
return err
}
defer tx.Rollback()
defer tx.Rollback() // nolint
// If we're given a DeviceInfo entry, update it or create it
// if it does not exist yet.
......@@ -185,6 +230,10 @@ func (u *userlogDB) AddLog(entry *usermetadb.LogEntry) error {
return err
}
if entry.Username != "" {
u.pendingCompaction.add(entry.Username)
}
return tx.Commit()
}
......@@ -201,7 +250,7 @@ func (u *userlogDB) GetUserLogs(username string, maxDays, limit int) ([]*usermet
if err != nil {
return nil, err
}
defer tx.Rollback()
defer tx.Rollback() // nolint
var out []*usermetadb.LogEntry
rows, err := u.stmts.get(tx, "get_user_logs").Query(username, cutoff, limit)
......@@ -243,7 +292,7 @@ func (u *userlogDB) GetUserDevices(username string) ([]*usermetadb.MetaDeviceInf
if err != nil {
return nil, err
}
defer tx.Rollback()
defer tx.Rollback() // nolint
var out []*usermetadb.MetaDeviceInfo
rows, err := u.stmts.get(tx, "devices_with_counts").Query(username)
......@@ -271,27 +320,6 @@ func (u *userlogDB) GetUserDevices(username string) ([]*usermetadb.MetaDeviceInf
return out, rows.Err()
}
const pruneCutoffDays = 365
func (u *userlogDB) Prune() error {
// Run each batch deletion in its own transaction, just to be nice.
cutoff := time.Now().AddDate(0, 0, -pruneCutoffDays)
for _, stmt := range []string{"prune_userlog", "prune_device_info"} {
tx, err := u.db.Begin()
if err != nil {
return err
}
if _, err := u.stmts.get(tx, stmt).Exec(cutoff); err != nil {
tx.Rollback() // nolint
return err
}
if err := tx.Commit(); err != nil {
return err
}
}
return nil
}
var timestampRoundSeconds int64 = 7200
func roundTimestamp(t time.Time, r int64) time.Time {
......
......@@ -241,38 +241,3 @@ func TestUserlog_GetUserLogs(t *testing.T) {
t.Fatal("GetUserLogs(nonexisting) returned non-nil")
}
}
func countLogs(db *sql.DB) int {
var count int
db.QueryRow("SELECT COUNT(*) FROM userlog").Scan(&count)
return count
}
func TestUserlog_Prune(t *testing.T) {
defer os.Remove("test.db")
db, err := openDB("test.db")
if err != nil {
t.Fatal(err)
}
defer db.Close()
bulkLoadTestLogs(t, db)
n := countLogs(db)
if n == 0 {
t.Fatal("no logs loaded?")
}
ulog, err := newUserlogDB(db)
if err != nil {
t.Fatal(err)
}
defer ulog.Close()
if err := ulog.Prune(); err != nil {
t.Fatal("Prune():", err)
}
n2 := countLogs(db)
if n2 >= n {
t.Fatalf("unexpected log count after prune: %d (should be < %d)", n2, n)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment