Commit 5aff704b authored by ale's avatar ale

Merge branch 'log-consolidation' into 'master'

Log compaction

See merge request !2
parents 11678523 643c757d
Pipeline #2477 passed with stages
in 3 minutes
......@@ -35,7 +35,7 @@ func (d *analysisService) CheckDevice(username string, deviceInfo *auth.DeviceIn
if err != nil {
return false, err
}
defer tx.Rollback()
defer tx.Rollback() // nolint
var seen bool
err = d.stmts.get(tx, "check_device_info").QueryRow(username, deviceInfo.ID).Scan(&seen)
......
......@@ -65,7 +65,7 @@ func (l *lastloginDB) AddLastLogin(entry *usermetadb.LastLoginEntry) error {
if err != nil {
return err
}
defer tx.Rollback()
defer tx.Rollback() // nolint
stmt := "insert_or_replace_last_login"
args := []interface{}{
......@@ -87,7 +87,7 @@ func (l *lastloginDB) GetLastLogin(username string, service string) ([]*usermeta
if err != nil {
return nil, err
}
defer tx.Rollback()
defer tx.Rollback() // nolint
var stmt string
var args []interface{}
......
package server
import (
"database/sql"
"sync"
"time"
)
var (
// How often to run the user log compaction.
compactionInterval = 600 * time.Second
// How many log entries to keep when compacting.
userKeepLogCount = 100
// How often to run the log pruner.
pruneInterval = 24 * time.Hour
// Delete entries older than this many days.
pruneCutoffDays = 365
)
// Call a function at semi-regular intervals.
type cronJob struct {
f func()
d time.Duration
stopCh chan struct{}
doneCh chan struct{}
}
func newCron(f func(), d time.Duration) *cronJob {
j := &cronJob{
f: f,
d: d,
stopCh: make(chan struct{}),
doneCh: make(chan struct{}),
}
go j.run()
return j
}
// Stop the job and wait for it to exit.
func (j *cronJob) Stop() {
close(j.stopCh)
<-j.doneCh
}
// Wait some time, or be interrupted by Stop().
func (j *cronJob) waitSomeTime() bool {
t := time.NewTimer(j.d)
defer t.Stop()
select {
case <-j.stopCh:
return false
case <-t.C:
return true
}
}
func (j *cronJob) run() {
defer close(j.doneCh)
for {
if !j.waitSomeTime() {
return
}
j.f()
}
}
type usernameSet struct {
mx sync.Mutex
usernames map[string]struct{}
}
func newUsernameSet() *usernameSet {
return &usernameSet{
usernames: make(map[string]struct{}),
}
}
func (s *usernameSet) add(username string) {
s.mx.Lock()
s.usernames[username] = struct{}{}
s.mx.Unlock()
}
func (s *usernameSet) foreach(f func(string)) {
s.mx.Lock()
for username := range s.usernames {
delete(s.usernames, username)
f(username)
}
s.mx.Unlock()
}
// Leisurely consolidate logs for users, but only when necessary, and
// proceeding at a slow pace. We wake up once per minute, and clean up
// the log history for users that have logged in since.
func compactLogs(db *sql.DB, stmts statementMap, username string, count int) error {
tx, err := db.Begin()
if err != nil {
return err
}
if _, err := stmts.get(tx, "consolidate_userlog").Exec(username, count); err != nil {
return err
}
return tx.Commit()
}
// Remove old entries (both logs and devices) from the database.
func pruneLogs(db *sql.DB, stmts statementMap) error {
// Run each batch deletion in its own transaction, just to be nice.
cutoff := time.Now().AddDate(0, 0, -pruneCutoffDays)
for _, stmt := range []string{"prune_userlog", "prune_device_info"} {
tx, err := db.Begin()
if err != nil {
return err
}
if _, err := stmts.get(tx, stmt).Exec(cutoff); err != nil {
tx.Rollback() // nolint
return err
}
if err := tx.Commit(); err != nil {
return err
}
}
return nil
}
package server
import (
"database/sql"
"os"
"testing"
)
func countLogs(db *sql.DB) int {
var count int
db.QueryRow("SELECT COUNT(*) FROM userlog").Scan(&count) // nolint
return count
}
func TestPruneLogs(t *testing.T) {
defer os.Remove("test.db")
db, err := openDB("test.db")
if err != nil {
t.Fatal(err)
}
defer db.Close()
stmts, err := newStatementMap(db, userlogDBStatements)
if err != nil {
t.Fatal(err)
}
bulkLoadTestLogs(t, db)
n := countLogs(db)
if n == 0 {
t.Fatal("no logs loaded?")
}
if err := pruneLogs(db, stmts); err != nil {
t.Fatal("pruneLogs():", err)
}
n2 := countLogs(db)
if n2 >= n {
t.Fatalf("unexpected log count after prune: %d (should be < %d)", n2, n)
}
}
func countLogsForUser(db *sql.DB, username string) int {
var count int
db.QueryRow("SELECT COUNT(*) FROM userlog WHERE username = ?", username).Scan(&count) // nolint
return count
}
func TestCompactLogs(t *testing.T) {
defer os.Remove("test.db")
db, err := openDB("test.db")
if err != nil {
t.Fatal(err)
}
defer db.Close()
stmts, err := newStatementMap(db, userlogDBStatements)
if err != nil {
t.Fatal(err)
}
bulkLoadTestLogs(t, db)
user1 := randomUsernames[0]
user2 := randomUsernames[1]
n1 := countLogsForUser(db, user1)
n2 := countLogsForUser(db, user2)
if n1 == 0 {
t.Fatal("no logs loaded?")
}
// Find a reasonable limit.
limit := n1 / 2
if err := compactLogs(db, stmts, user1, limit); err != nil {
t.Fatal("compactLogs():", err)
}
n1post := countLogsForUser(db, user1)
n2post := countLogsForUser(db, user2)
if n2post != n2 {
t.Errorf("log compaction removed logs for other users (%s: %d -> %d)", user2, n2, n2post)
}
if n1post == 0 {
t.Error("log compaction wiped all user logs")
}
if n1post != limit {
t.Errorf("unexpected log count after compaction: %d (should be %d)", n1post, limit)
}
}
......@@ -68,13 +68,13 @@ func TestServer_AddLastLogin(t *testing.T) {
}
}
exp_resp := *entries[0]
resp, err := c.GetLastLogin(context.Background(), "", exp_resp.Username, exp_resp.Service)
expResp := *entries[0]
resp, err := c.GetLastLogin(context.Background(), "", expResp.Username, expResp.Service)
if err != nil {
t.Fatal(err)
}
if *resp[0] != exp_resp {
t.Fatalf("Last login entries differ:\nSet %v\nGot %v", exp_resp, *resp[0])
if *resp[0] != expResp {
t.Fatalf("Last login entries differ:\nSet %v\nGot %v", expResp, *resp[0])
}
}
......@@ -2,6 +2,7 @@ package server
import (
"database/sql"
"log"
"time"
"git.autistici.org/id/auth"
......@@ -51,6 +52,27 @@ var userlogDBStatements = map[string]string{
"prune_device_info": `
DELETE FROM devices WHERE last_seen < ?
`,
// Cut the log history for a specific user. This version is
// sqlite-specific (uses "rowid"), when version 3.25 reaches
// Debian stable we'll be able to switch to using a window
// function (as in the commented version below).
"consolidate_userlog": `
DELETE FROM userlog WHERE rowid IN (
SELECT rowid
FROM userlog
WHERE username = ?
ORDER BY timestamp DESC LIMIT -1 OFFSET ?
)
`,
// "consolidate_userlog": `
// WITH rows AS (
// SELECT rowid, ROW_NUMBER()
// OVER (PARTITION BY username ORDER BY timestamp DESC) n
// FROM userlog
// WHERE username = ? AND n > 100
// )
// DELETE FROM userlog WHERE rowid IN (SELECT n FROM rows)
// `,
// Retrieve logs for a specific user.
"get_user_logs": `
......@@ -85,8 +107,10 @@ var userlogDBStatements = map[string]string{
}
type userlogDB struct {
db *sql.DB
stmts statementMap
db *sql.DB
stmts statementMap
cronjobs []*cronJob
pendingCompaction *usernameSet
}
func newUserlogDB(db *sql.DB) (*userlogDB, error) {
......@@ -94,13 +118,34 @@ func newUserlogDB(db *sql.DB) (*userlogDB, error) {
if err != nil {
return nil, err
}
pendingCompaction := newUsernameSet()
cronjobs := []*cronJob{
newCron(func() {
pendingCompaction.foreach(func(username string) {
if err := compactLogs(db, stmts, username, userKeepLogCount); err != nil {
log.Printf("error cleaning up logs for user %s: %v", username, err)
}
})
}, compactionInterval),
newCron(func() {
if err := pruneLogs(db, stmts); err != nil {
log.Printf("error in log pruning: %v", err)
}
}, pruneInterval),
}
return &userlogDB{
db: db,
stmts: stmts,
db: db,
stmts: stmts,
cronjobs: cronjobs,
pendingCompaction: pendingCompaction,
}, nil
}
func (u *userlogDB) Close() {
for _, j := range u.cronjobs {
j.Stop()
}
u.stmts.Close()
}
......@@ -155,7 +200,7 @@ func (u *userlogDB) AddLog(entry *usermetadb.LogEntry) error {
if err != nil {
return err
}
defer tx.Rollback()
defer tx.Rollback() // nolint
// If we're given a DeviceInfo entry, update it or create it
// if it does not exist yet.
......@@ -185,6 +230,10 @@ func (u *userlogDB) AddLog(entry *usermetadb.LogEntry) error {
return err
}
if entry.Username != "" {
u.pendingCompaction.add(entry.Username)
}
return tx.Commit()
}
......@@ -201,7 +250,7 @@ func (u *userlogDB) GetUserLogs(username string, maxDays, limit int) ([]*usermet
if err != nil {
return nil, err
}
defer tx.Rollback()
defer tx.Rollback() // nolint
var out []*usermetadb.LogEntry
rows, err := u.stmts.get(tx, "get_user_logs").Query(username, cutoff, limit)
......@@ -243,7 +292,7 @@ func (u *userlogDB) GetUserDevices(username string) ([]*usermetadb.MetaDeviceInf
if err != nil {
return nil, err
}
defer tx.Rollback()
defer tx.Rollback() // nolint
var out []*usermetadb.MetaDeviceInfo
rows, err := u.stmts.get(tx, "devices_with_counts").Query(username)
......@@ -271,27 +320,6 @@ func (u *userlogDB) GetUserDevices(username string) ([]*usermetadb.MetaDeviceInf
return out, rows.Err()
}
const pruneCutoffDays = 365
func (u *userlogDB) Prune() error {
// Run each batch deletion in its own transaction, just to be nice.
cutoff := time.Now().AddDate(0, 0, -pruneCutoffDays)
for _, stmt := range []string{"prune_userlog", "prune_device_info"} {
tx, err := u.db.Begin()
if err != nil {
return err
}
if _, err := u.stmts.get(tx, stmt).Exec(cutoff); err != nil {
tx.Rollback() // nolint
return err
}
if err := tx.Commit(); err != nil {
return err
}
}
return nil
}
var timestampRoundSeconds int64 = 7200
func roundTimestamp(t time.Time, r int64) time.Time {
......
......@@ -33,7 +33,7 @@ func choose(l []string) string {
func generateRandomDeviceInfo() *auth.DeviceInfo {
var buf [8]byte
crand.Read(buf[:])
crand.Read(buf[:]) // nolint
id := hex.EncodeToString(buf[:])
return &auth.DeviceInfo{
ID: id,
......@@ -101,6 +101,7 @@ func bulkLoadTestLogs(t testing.TB, db *sql.DB) *usermetadb.LogEntry {
now := time.Now()
for username, devs := range userDevices {
for _, dev := range devs {
// nolint
insertDeviceInfoStmt.Exec(
username,
dev.ID,
......@@ -241,38 +242,3 @@ func TestUserlog_GetUserLogs(t *testing.T) {
t.Fatal("GetUserLogs(nonexisting) returned non-nil")
}
}
func countLogs(db *sql.DB) int {
var count int
db.QueryRow("SELECT COUNT(*) FROM userlog").Scan(&count)
return count
}
func TestUserlog_Prune(t *testing.T) {
defer os.Remove("test.db")
db, err := openDB("test.db")
if err != nil {
t.Fatal(err)
}
defer db.Close()
bulkLoadTestLogs(t, db)
n := countLogs(db)
if n == 0 {
t.Fatal("no logs loaded?")
}
ulog, err := newUserlogDB(db)
if err != nil {
t.Fatal(err)
}
defer ulog.Close()
if err := ulog.Prune(); err != nil {
t.Fatal("Prune():", err)
}
n2 := countLogs(db)
if n2 >= n {
t.Fatalf("unexpected log count after prune: %d (should be < %d)", n2, n)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment