diff --git a/db/db.go b/db/db.go new file mode 100644 index 0000000000000000000000000000000000000000..fc972521c08937f396fb8a49bbd8a4bf91cec666 --- /dev/null +++ b/db/db.go @@ -0,0 +1,35 @@ +package db + +import ( + "fmt" + "net/url" + "time" + + "git.autistici.org/ai3/tools/iprep/db/leveldb" + "git.autistici.org/ai3/tools/iprep/db/sqlite" + ippb "git.autistici.org/ai3/tools/iprep/proto" +) + +type DB interface { + AddAggregate(*ippb.Aggregate) error + WipeOldData(time.Duration) error + ScanIP(time.Time, string) (ippb.Map, error) + ScanType(time.Time, string) (ippb.Map, error) + Close() +} + +func Open(path string) (DB, error) { + u, err := url.Parse(path) + if err != nil { + return nil, err + } + + switch u.Scheme { + case "", "leveldb": + return leveldb.Open(u.Path) + case "sqlite": + return sqlite.Open(u.Path) + default: + return nil, fmt.Errorf("unsupported scheme %s", u.Scheme) + } +} diff --git a/db/leveldb_test.go b/db/db_test.go similarity index 74% rename from db/leveldb_test.go rename to db/db_test.go index f6979add099f6a1efdfe97025698d8e0f24af5d9..544d9bcf08f4f04189073c8efae7f8925f6cbd9c 100644 --- a/db/leveldb_test.go +++ b/db/db_test.go @@ -9,8 +9,9 @@ import ( "testing" "time" + "git.autistici.org/ai3/tools/iprep/db/leveldb" + "git.autistici.org/ai3/tools/iprep/db/sqlite" ippb "git.autistici.org/ai3/tools/iprep/proto" - "github.com/syndtr/goleveldb/leveldb/util" ) const manyEvents = 500000 @@ -39,14 +40,14 @@ func randomEvents(n int) *ippb.Aggregate { return aggr } -func TestWrite(t *testing.T) { +func runRWTest(t *testing.T, driver string) { dir, err := ioutil.TempDir("", "") if err != nil { t.Fatal(err) } defer os.RemoveAll(dir) - db, err := Open(dir + "/test.db") + db, err := Open(fmt.Sprintf("%s://%s", driver, dir+"/test.db")) if err != nil { t.Fatalf("Open: %v", err) } @@ -89,6 +90,15 @@ func TestWrite(t *testing.T) { // } } +func TestReadWrite_LevelDB(t *testing.T) { + runRWTest(t, "leveldb") +} + +func TestReadWrite_Sqlite(t *testing.T) { + sqlite.DebugMigrations = true + runRWTest(t, "sqlite") +} + func TestWipeOldData(t *testing.T) { dir, err := ioutil.TempDir("", "") if err != nil { @@ -110,14 +120,14 @@ func TestWipeOldData(t *testing.T) { } } -func BenchmarkWrite(b *testing.B) { +func runWriteBench(b *testing.B, driver string) { dir, err := ioutil.TempDir("", "") if err != nil { b.Fatal(err) } defer os.RemoveAll(dir) - db, err := Open(dir + "/test.db") + db, err := Open(fmt.Sprintf("%s://%s", driver, dir+"/test.db")) if err != nil { b.Fatalf("Open: %v", err) } @@ -137,22 +147,22 @@ func BenchmarkWrite(b *testing.B) { } } -func BenchmarkRead_SmallAggregate(b *testing.B) { - runReadBenchmark(b, 5) +func BenchmarkWrite_LevelDB(b *testing.B) { + runWriteBench(b, "leveldb") } -func BenchmarkRead_LargeAggregate(b *testing.B) { - runReadBenchmark(b, 1000) +func BenchmarkWrite_Sqlite(b *testing.B) { + runWriteBench(b, "sqlite") } -func runReadBenchmark(b *testing.B, eventsPerIP int) { +func runReadBenchmark(b *testing.B, driver string, eventsPerIP int) { dir, err := ioutil.TempDir("", "") if err != nil { b.Fatal(err) } defer os.RemoveAll(dir) - db, err := Open(dir + "/test.db") + db, err := Open(fmt.Sprintf("%s://%s", driver, dir+"/test.db")) if err != nil { b.Fatalf("Open: %v", err) } @@ -171,7 +181,10 @@ func runReadBenchmark(b *testing.B, eventsPerIP int) { } db.AddAggregate(a) - db.db.CompactRange(util.Range{}) + // Compact the database if it's LevelDB. + if ldb, ok := db.(*leveldb.DB); ok { + ldb.Compact() + } deadline := time.Now().Add(-1 * time.Hour) b.ResetTimer() @@ -189,3 +202,19 @@ func runReadBenchmark(b *testing.B, eventsPerIP int) { } } } + +func BenchmarkRead_LevelDB_SmallAggregate(b *testing.B) { + runReadBenchmark(b, "leveldb", 5) +} + +func BenchmarkRead_LevelDB_LargeAggregate(b *testing.B) { + runReadBenchmark(b, "leveldb", 1000) +} + +func BenchmarkRead_Sqlite_SmallAggregate(b *testing.B) { + runReadBenchmark(b, "sqlite", 5) +} + +func BenchmarkRead_Sqlite_LargeAggregate(b *testing.B) { + runReadBenchmark(b, "sqlite", 1000) +} diff --git a/db/leveldb.go b/db/leveldb/leveldb.go similarity index 97% rename from db/leveldb.go rename to db/leveldb/leveldb.go index 4410cb1e8235a5a3e7152730c2e46dad5478f80f..5420c67210ee2c5c02ef2fb29e8f4084465cfe3d 100644 --- a/db/leveldb.go +++ b/db/leveldb/leveldb.go @@ -1,4 +1,4 @@ -package db +package leveldb import ( "bytes" @@ -124,6 +124,10 @@ func (db *DB) ScanType(startTime time.Time, t string) (ippb.Map, error) { return db.scanIndex(startTime, typeIndexPrefix, []byte(t)) } +func (db *DB) Compact() error { + return db.db.CompactRange(util.Range{}) +} + func eventKeyFromID(id []byte) []byte { return append(eventPrefix, id...) } diff --git a/db/monotonic_id.go b/db/leveldb/monotonic_id.go similarity index 96% rename from db/monotonic_id.go rename to db/leveldb/monotonic_id.go index 8e9ef3b4b69aeceaf6011f9d0f3664b80ba523d8..b1d0ea45695259bc880b8213c5ce01446e8f4967 100644 --- a/db/monotonic_id.go +++ b/db/leveldb/monotonic_id.go @@ -1,4 +1,4 @@ -package db +package leveldb import ( "encoding/binary" diff --git a/db/sqlite/bindata.go b/db/sqlite/bindata.go new file mode 100644 index 0000000000000000000000000000000000000000..2b3e4dd4e5b4f00efbd5d12864fb7d75622e943e --- /dev/null +++ b/db/sqlite/bindata.go @@ -0,0 +1,243 @@ +// Code generated by go-bindata. +// sources: +// migrations/1_initialize_schema.down.sql +// migrations/1_initialize_schema.up.sql +// DO NOT EDIT! + +package sqlite + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "strings" + "time" +) +type asset struct { + bytes []byte + info os.FileInfo +} + +type bindataFileInfo struct { + name string + size int64 + mode os.FileMode + modTime time.Time +} + +func (fi bindataFileInfo) Name() string { + return fi.name +} +func (fi bindataFileInfo) Size() int64 { + return fi.size +} +func (fi bindataFileInfo) Mode() os.FileMode { + return fi.mode +} +func (fi bindataFileInfo) ModTime() time.Time { + return fi.modTime +} +func (fi bindataFileInfo) IsDir() bool { + return false +} +func (fi bindataFileInfo) Sys() interface{} { + return nil +} + +var __1_initialize_schemaDownSql = []byte(`-- DROP INDEX idx_events_timestamp; +DROP INDEX idx_events_type; +DROP INDEX idx_events_ip; +DROP TABLE events; +`) + +func _1_initialize_schemaDownSqlBytes() ([]byte, error) { + return __1_initialize_schemaDownSql, nil +} + +func _1_initialize_schemaDownSql() (*asset, error) { + bytes, err := _1_initialize_schemaDownSqlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "1_initialize_schema.down.sql", size: 109, mode: os.FileMode(420), modTime: time.Unix(1572462519, 0)} + a := &asset{bytes: bytes, info: info} + return a, nil +} + +var __1_initialize_schemaUpSql = []byte(` +CREATE TABLE events ( + ip TEXT, + event_type TEXT, + timestamp DATETIME, + count INTEGER +); + +CREATE INDEX idx_events_ip ON events (ip); +CREATE INDEX idx_events_type ON events (event_type); +-- CREATE INDEX idx_events_timestamp ON events (timestamp); +`) + +func _1_initialize_schemaUpSqlBytes() ([]byte, error) { + return __1_initialize_schemaUpSql, nil +} + +func _1_initialize_schemaUpSql() (*asset, error) { + bytes, err := _1_initialize_schemaUpSqlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "1_initialize_schema.up.sql", size: 254, mode: os.FileMode(420), modTime: time.Unix(1572465612, 0)} + a := &asset{bytes: bytes, info: info} + return a, nil +} + +// Asset loads and returns the asset for the given name. +// It returns an error if the asset could not be found or +// could not be loaded. +func Asset(name string) ([]byte, error) { + cannonicalName := strings.Replace(name, "\\", "/", -1) + if f, ok := _bindata[cannonicalName]; ok { + a, err := f() + if err != nil { + return nil, fmt.Errorf("Asset %s can't read by error: %v", name, err) + } + return a.bytes, nil + } + return nil, fmt.Errorf("Asset %s not found", name) +} + +// MustAsset is like Asset but panics when Asset would return an error. +// It simplifies safe initialization of global variables. +func MustAsset(name string) []byte { + a, err := Asset(name) + if err != nil { + panic("asset: Asset(" + name + "): " + err.Error()) + } + + return a +} + +// AssetInfo loads and returns the asset info for the given name. +// It returns an error if the asset could not be found or +// could not be loaded. +func AssetInfo(name string) (os.FileInfo, error) { + cannonicalName := strings.Replace(name, "\\", "/", -1) + if f, ok := _bindata[cannonicalName]; ok { + a, err := f() + if err != nil { + return nil, fmt.Errorf("AssetInfo %s can't read by error: %v", name, err) + } + return a.info, nil + } + return nil, fmt.Errorf("AssetInfo %s not found", name) +} + +// AssetNames returns the names of the assets. +func AssetNames() []string { + names := make([]string, 0, len(_bindata)) + for name := range _bindata { + names = append(names, name) + } + return names +} + +// _bindata is a table, holding each asset generator, mapped to its name. +var _bindata = map[string]func() (*asset, error){ + "1_initialize_schema.down.sql": _1_initialize_schemaDownSql, + "1_initialize_schema.up.sql": _1_initialize_schemaUpSql, +} + +// AssetDir returns the file names below a certain +// directory embedded in the file by go-bindata. +// For example if you run go-bindata on data/... and data contains the +// following hierarchy: +// data/ +// foo.txt +// img/ +// a.png +// b.png +// then AssetDir("data") would return []string{"foo.txt", "img"} +// AssetDir("data/img") would return []string{"a.png", "b.png"} +// AssetDir("foo.txt") and AssetDir("notexist") would return an error +// AssetDir("") will return []string{"data"}. +func AssetDir(name string) ([]string, error) { + node := _bintree + if len(name) != 0 { + cannonicalName := strings.Replace(name, "\\", "/", -1) + pathList := strings.Split(cannonicalName, "/") + for _, p := range pathList { + node = node.Children[p] + if node == nil { + return nil, fmt.Errorf("Asset %s not found", name) + } + } + } + if node.Func != nil { + return nil, fmt.Errorf("Asset %s not found", name) + } + rv := make([]string, 0, len(node.Children)) + for childName := range node.Children { + rv = append(rv, childName) + } + return rv, nil +} + +type bintree struct { + Func func() (*asset, error) + Children map[string]*bintree +} +var _bintree = &bintree{nil, map[string]*bintree{ + "1_initialize_schema.down.sql": &bintree{_1_initialize_schemaDownSql, map[string]*bintree{}}, + "1_initialize_schema.up.sql": &bintree{_1_initialize_schemaUpSql, map[string]*bintree{}}, +}} + +// RestoreAsset restores an asset under the given directory +func RestoreAsset(dir, name string) error { + data, err := Asset(name) + if err != nil { + return err + } + info, err := AssetInfo(name) + if err != nil { + return err + } + err = os.MkdirAll(_filePath(dir, filepath.Dir(name)), os.FileMode(0755)) + if err != nil { + return err + } + err = ioutil.WriteFile(_filePath(dir, name), data, info.Mode()) + if err != nil { + return err + } + err = os.Chtimes(_filePath(dir, name), info.ModTime(), info.ModTime()) + if err != nil { + return err + } + return nil +} + +// RestoreAssets restores an asset under the given directory recursively +func RestoreAssets(dir, name string) error { + children, err := AssetDir(name) + // File + if err != nil { + return RestoreAsset(dir, name) + } + // Dir + for _, child := range children { + err = RestoreAssets(dir, filepath.Join(name, child)) + if err != nil { + return err + } + } + return nil +} + +func _filePath(dir, name string) string { + cannonicalName := strings.Replace(name, "\\", "/", -1) + return filepath.Join(append([]string{dir}, strings.Split(cannonicalName, "/")...)...) +} + diff --git a/db/sqlite/driver.go b/db/sqlite/driver.go new file mode 100644 index 0000000000000000000000000000000000000000..aacaa48ebb48c94b5a2e85ee04e16caf2751a523 --- /dev/null +++ b/db/sqlite/driver.go @@ -0,0 +1,109 @@ +package sqlite + +import ( + "database/sql" + "time" + + ippb "git.autistici.org/ai3/tools/iprep/proto" +) + +var statements = map[string]string{ + "insert_event": ` +INSERT INTO events (ip, event_type, count, timestamp) VALUES (?, ?, ?, ?) +`, + "scan_by_ip": ` +SELECT + event_type, SUM(count) AS sum +FROM + events +WHERE + ip = ? + AND timestamp > ? +GROUP BY + event_type +`, +} + +type DB struct { + db *sql.DB + stmts StatementMap +} + +func Open(path string) (*DB, error) { + db, err := sqlOpen(path) + if err != nil { + return nil, err + } + + stmts, err := NewStatementMap(db, statements) + if err != nil { + db.Close() + return nil, err + } + + return &DB{ + db: db, + stmts: stmts, + }, nil +} + +func (db *DB) Close() { + db.db.Close() +} + +func (db *DB) AddAggregate(aggr *ippb.Aggregate) error { + tx, err := db.db.Begin() + if err != nil { + return err + } + stmt := db.stmts.Get(tx, "insert_event") + now := time.Now() + + for _, bt := range aggr.ByType { + for _, item := range bt.ByIp { + if _, err := stmt.Exec(item.Ip, bt.Type, item.Count, now); err != nil { + tx.Rollback() // nolint + return err + } + } + } + + return tx.Commit() +} + +func (db *DB) WipeOldData(age time.Duration) error { + _, err := db.db.Exec(` +DELETE FROM events WHERE timestamp < ? +`, time.Now().Add(-age)) + return err +} + +func (db *DB) ScanIP(startTime time.Time, ip string) (ippb.Map, error) { + tx, err := db.db.Begin() + if err != nil { + return nil, err + } + defer tx.Rollback() // nolint + + rows, err := db.stmts.Get(tx, "scan_by_ip").Query(ip, startTime) + if err != nil { + return nil, err + } + defer rows.Close() + + m := make(ippb.Map) + for rows.Next() { + var evType string + var count int64 + if err := rows.Scan(&evType, &count); err != nil { + return nil, err + } + m.Incr(evType, ip, count) + } + + return m, nil +} + +func (db *DB) ScanType(startTime time.Time, t string) (ippb.Map, error) { + return nil, nil +} diff --git a/db/sqlite/migrations/1_initialize_schema.down.sql b/db/sqlite/migrations/1_initialize_schema.down.sql new file mode 100644 index 0000000000000000000000000000000000000000..99470285fa99ce4be0a4790d6760b9ec37a918b2 --- /dev/null +++ b/db/sqlite/migrations/1_initialize_schema.down.sql @@ -0,0 +1,4 @@ +-- DROP INDEX idx_events_timestamp; +DROP INDEX idx_events_type; +DROP INDEX idx_events_ip; +DROP TABLE events; diff --git a/db/sqlite/migrations/1_initialize_schema.up.sql b/db/sqlite/migrations/1_initialize_schema.up.sql new file mode 100644 index 0000000000000000000000000000000000000000..7a728532e5c84a5c714e098fbd58b97905eb71ef --- /dev/null +++ b/db/sqlite/migrations/1_initialize_schema.up.sql @@ -0,0 +1,11 @@ + +CREATE TABLE events ( + ip TEXT, + event_type TEXT, + timestamp DATETIME, + count INTEGER +); + +CREATE INDEX idx_events_ip ON events (ip); +CREATE INDEX idx_events_type ON events (event_type); +-- CREATE INDEX idx_events_timestamp ON events (timestamp); diff --git a/db/sqlite/sql.go b/db/sqlite/sql.go new file mode 100644 index 0000000000000000000000000000000000000000..8b9472f51a20d0c01f4ac3ecb7559746eff68f91 --- /dev/null +++ b/db/sqlite/sql.go @@ -0,0 +1,102 @@ +//go:generate go-bindata --nocompress --pkg sqlite --ignore \.go$ -o bindata.go -prefix migrations/ ./migrations +package sqlite + +import ( + "database/sql" + "fmt" + "log" + + "github.com/mattes/migrate" + "github.com/mattes/migrate/database/sqlite3" + bindata "github.com/mattes/migrate/source/go-bindata" +) + +const dbDriver = "sqlite3" + +var DebugMigrations = false + +// Open a SQLite database and run the database migrations. +func sqlOpen(dburi string) (*sql.DB, error) { + db, err := sql.Open(dbDriver, dburi) + if err != nil { + return nil, err + } + + if err = runDatabaseMigrations(db); err != nil { + db.Close() // nolint + return nil, err + } + + return db, nil +} + +type migrateLogger struct{} + +func (l migrateLogger) Printf(format string, v ...interface{}) { + log.Printf("db: "+format, v...) +} + +func (l migrateLogger) Verbose() bool { return true } + +func runDatabaseMigrations(db *sql.DB) error { + si, err := bindata.WithInstance(bindata.Resource( + AssetNames(), + func(name string) ([]byte, error) { + return Asset(name) + })) + if err != nil { + return err + } + + di, err := sqlite3.WithInstance(db, &sqlite3.Config{ + MigrationsTable: sqlite3.DefaultMigrationsTable, + DatabaseName: "usermetadb", + }) + if err != nil { + return err + } + + m, err := migrate.NewWithInstance("bindata", si, dbDriver, di) + if err != nil { + return err + } + if DebugMigrations { + log.Printf("running database migrations") + m.Log = &migrateLogger{} + } + + if err := m.Up(); err != nil && err != migrate.ErrNoChange { + return err + } + return nil +} + +// A StatementMap holds named compiled statements. +type StatementMap map[string]*sql.Stmt + +// NewStatementMap compiles the given named statements and returns a +// new StatementMap. +func NewStatementMap(db *sql.DB, statements map[string]string) (StatementMap, error) { + stmts := make(map[string]*sql.Stmt) + for name, qstr := range statements { + stmt, err := db.Prepare(qstr) + if err != nil { + return nil, fmt.Errorf("error compiling statement '%s': %v", name, err) + } + stmts[name] = stmt + } + return StatementMap(stmts), nil +} + +// Close all resources associated with the StatementMap. +func (m StatementMap) Close() { + for _, s := range m { + s.Close() // nolint + } +} + +// Get a named compiled statement (or nil if not found), and associate +// it with the given transaction. +func (m StatementMap) Get(tx *sql.Tx, name string) *sql.Stmt { + return tx.Stmt(m[name]) +} diff --git a/server/server.go b/server/server.go index 75d5a27ad256e36894679ff110b18b23c914531c..86f0ef93b184f1ae56b4e475163b4dfe1b455452 100644 --- a/server/server.go +++ b/server/server.go @@ -17,7 +17,7 @@ import ( type Server struct { *script.Manager horizon time.Duration - db *db.DB + db db.DB stop chan struct{} }