From 320ab20e2cbb288ce3221726a224a49fe889a09b Mon Sep 17 00:00:00 2001
From: ale <ale@incal.net>
Date: Wed, 30 Oct 2019 20:37:23 +0000
Subject: [PATCH] Add generic db interface, with sqlite implementation

The sqlite implementation is more or less as fast (or slow, depending
on how you see it) as the LevelDB one, but it is slightly simpler...
---
 db/db.go                                      |  35 +++
 db/{leveldb_test.go => db_test.go}            |  53 +++-
 db/{ => leveldb}/leveldb.go                   |   6 +-
 db/{ => leveldb}/monotonic_id.go              |   2 +-
 db/sqlite/bindata.go                          | 243 ++++++++++++++++++
 db/sqlite/driver.go                           | 109 ++++++++
 .../migrations/1_initialize_schema.down.sql   |   4 +
 .../migrations/1_initialize_schema.up.sql     |  11 +
 db/sqlite/sql.go                              | 102 ++++++++
 server/server.go                              |   2 +-
 10 files changed, 552 insertions(+), 15 deletions(-)
 create mode 100644 db/db.go
 rename db/{leveldb_test.go => db_test.go} (74%)
 rename db/{ => leveldb}/leveldb.go (97%)
 rename db/{ => leveldb}/monotonic_id.go (96%)
 create mode 100644 db/sqlite/bindata.go
 create mode 100644 db/sqlite/driver.go
 create mode 100644 db/sqlite/migrations/1_initialize_schema.down.sql
 create mode 100644 db/sqlite/migrations/1_initialize_schema.up.sql
 create mode 100644 db/sqlite/sql.go

diff --git a/db/db.go b/db/db.go
new file mode 100644
index 0000000..fc97252
--- /dev/null
+++ b/db/db.go
@@ -0,0 +1,35 @@
+package db
+
+import (
+	"fmt"
+	"net/url"
+	"time"
+
+	"git.autistici.org/ai3/tools/iprep/db/leveldb"
+	"git.autistici.org/ai3/tools/iprep/db/sqlite"
+	ippb "git.autistici.org/ai3/tools/iprep/proto"
+)
+
+type DB interface {
+	AddAggregate(*ippb.Aggregate) error
+	WipeOldData(time.Duration) error
+	ScanIP(time.Time, string) (ippb.Map, error)
+	ScanType(time.Time, string) (ippb.Map, error)
+	Close()
+}
+
+func Open(path string) (DB, error) {
+	u, err := url.Parse(path)
+	if err != nil {
+		return nil, err
+	}
+
+	switch u.Scheme {
+	case "", "leveldb":
+		return leveldb.Open(u.Path)
+	case "sqlite":
+		return sqlite.Open(u.Path)
+	default:
+		return nil, fmt.Errorf("unsupported scheme %s", u.Scheme)
+	}
+}
diff --git a/db/leveldb_test.go b/db/db_test.go
similarity index 74%
rename from db/leveldb_test.go
rename to db/db_test.go
index f6979ad..544d9bc 100644
--- a/db/leveldb_test.go
+++ b/db/db_test.go
@@ -9,8 +9,9 @@ import (
 	"testing"
 	"time"
 
+	"git.autistici.org/ai3/tools/iprep/db/leveldb"
+	"git.autistici.org/ai3/tools/iprep/db/sqlite"
 	ippb "git.autistici.org/ai3/tools/iprep/proto"
-	"github.com/syndtr/goleveldb/leveldb/util"
 )
 
 const manyEvents = 500000
@@ -39,14 +40,14 @@ func randomEvents(n int) *ippb.Aggregate {
 	return aggr
 }
 
-func TestWrite(t *testing.T) {
+func runRWTest(t *testing.T, driver string) {
 	dir, err := ioutil.TempDir("", "")
 	if err != nil {
 		t.Fatal(err)
 	}
 	defer os.RemoveAll(dir)
 
-	db, err := Open(dir + "/test.db")
+	db, err := Open(fmt.Sprintf("%s://%s", driver, dir+"/test.db"))
 	if err != nil {
 		t.Fatalf("Open: %v", err)
 	}
@@ -89,6 +90,15 @@ func TestWrite(t *testing.T) {
 	// }
 }
 
+func TestReadWrite_LevelDB(t *testing.T) {
+	runRWTest(t, "leveldb")
+}
+
+func TestReadWrite_Sqlite(t *testing.T) {
+	sqlite.DebugMigrations = true
+	runRWTest(t, "sqlite")
+}
+
 func TestWipeOldData(t *testing.T) {
 	dir, err := ioutil.TempDir("", "")
 	if err != nil {
@@ -110,14 +120,14 @@ func TestWipeOldData(t *testing.T) {
 	}
 }
 
-func BenchmarkWrite(b *testing.B) {
+func runWriteBench(b *testing.B, driver string) {
 	dir, err := ioutil.TempDir("", "")
 	if err != nil {
 		b.Fatal(err)
 	}
 	defer os.RemoveAll(dir)
 
-	db, err := Open(dir + "/test.db")
+	db, err := Open(fmt.Sprintf("%s://%s", driver, dir+"/test.db"))
 	if err != nil {
 		b.Fatalf("Open: %v", err)
 	}
@@ -137,22 +147,22 @@ func BenchmarkWrite(b *testing.B) {
 	}
 }
 
-func BenchmarkRead_SmallAggregate(b *testing.B) {
-	runReadBenchmark(b, 5)
+func BenchmarkWrite_LevelDB(b *testing.B) {
+	runWriteBench(b, "leveldb")
 }
 
-func BenchmarkRead_LargeAggregate(b *testing.B) {
-	runReadBenchmark(b, 1000)
+func BenchmarkWrite_Sqlite(b *testing.B) {
+	runWriteBench(b, "sqlite")
 }
 
-func runReadBenchmark(b *testing.B, eventsPerIP int) {
+func runReadBenchmark(b *testing.B, driver string, eventsPerIP int) {
 	dir, err := ioutil.TempDir("", "")
 	if err != nil {
 		b.Fatal(err)
 	}
 	defer os.RemoveAll(dir)
 
-	db, err := Open(dir + "/test.db")
+	db, err := Open(fmt.Sprintf("%s://%s", driver, dir+"/test.db"))
 	if err != nil {
 		b.Fatalf("Open: %v", err)
 	}
@@ -171,7 +181,10 @@ func runReadBenchmark(b *testing.B, eventsPerIP int) {
 	}
 	db.AddAggregate(a)
 
-	db.db.CompactRange(util.Range{})
+	// Compact the database if it's LevelDB.
+	if ldb, ok := db.(*leveldb.DB); ok {
+		ldb.Compact()
+	}
 
 	deadline := time.Now().Add(-1 * time.Hour)
 	b.ResetTimer()
@@ -189,3 +202,19 @@ func runReadBenchmark(b *testing.B, eventsPerIP int) {
 		}
 	}
 }
+
+func BenchmarkRead_LevelDB_SmallAggregate(b *testing.B) {
+	runReadBenchmark(b, "leveldb", 5)
+}
+
+func BenchmarkRead_LevelDB_LargeAggregate(b *testing.B) {
+	runReadBenchmark(b, "leveldb", 1000)
+}
+
+func BenchmarkRead_Sqlite_SmallAggregate(b *testing.B) {
+	runReadBenchmark(b, "sqlite", 5)
+}
+
+func BenchmarkRead_Sqlite_LargeAggregate(b *testing.B) {
+	runReadBenchmark(b, "sqlite", 1000)
+}
diff --git a/db/leveldb.go b/db/leveldb/leveldb.go
similarity index 97%
rename from db/leveldb.go
rename to db/leveldb/leveldb.go
index 4410cb1..5420c67 100644
--- a/db/leveldb.go
+++ b/db/leveldb/leveldb.go
@@ -1,4 +1,4 @@
-package db
+package leveldb
 
 import (
 	"bytes"
@@ -124,6 +124,10 @@ func (db *DB) ScanType(startTime time.Time, t string) (ippb.Map, error) {
 	return db.scanIndex(startTime, typeIndexPrefix, []byte(t))
 }
 
+func (db *DB) Compact() error {
+	return db.db.CompactRange(util.Range{})
+}
+
 func eventKeyFromID(id []byte) []byte {
 	return append(eventPrefix, id...)
 }
diff --git a/db/monotonic_id.go b/db/leveldb/monotonic_id.go
similarity index 96%
rename from db/monotonic_id.go
rename to db/leveldb/monotonic_id.go
index 8e9ef3b..b1d0ea4 100644
--- a/db/monotonic_id.go
+++ b/db/leveldb/monotonic_id.go
@@ -1,4 +1,4 @@
-package db
+package leveldb
 
 import (
 	"encoding/binary"
diff --git a/db/sqlite/bindata.go b/db/sqlite/bindata.go
new file mode 100644
index 0000000..2b3e4dd
--- /dev/null
+++ b/db/sqlite/bindata.go
@@ -0,0 +1,243 @@
+// Code generated by go-bindata.
+// sources:
+// migrations/1_initialize_schema.down.sql
+// migrations/1_initialize_schema.up.sql
+// DO NOT EDIT!
+
+package sqlite
+
+import (
+	"fmt"
+	"io/ioutil"
+	"os"
+	"path/filepath"
+	"strings"
+	"time"
+)
+type asset struct {
+	bytes []byte
+	info  os.FileInfo
+}
+
+type bindataFileInfo struct {
+	name    string
+	size    int64
+	mode    os.FileMode
+	modTime time.Time
+}
+
+func (fi bindataFileInfo) Name() string {
+	return fi.name
+}
+func (fi bindataFileInfo) Size() int64 {
+	return fi.size
+}
+func (fi bindataFileInfo) Mode() os.FileMode {
+	return fi.mode
+}
+func (fi bindataFileInfo) ModTime() time.Time {
+	return fi.modTime
+}
+func (fi bindataFileInfo) IsDir() bool {
+	return false
+}
+func (fi bindataFileInfo) Sys() interface{} {
+	return nil
+}
+
+var __1_initialize_schemaDownSql = []byte(`-- DROP INDEX idx_events_timestamp;
+DROP INDEX idx_events_type;
+DROP INDEX idx_events_ip;
+DROP TABLE events;
+`)
+
+func _1_initialize_schemaDownSqlBytes() ([]byte, error) {
+	return __1_initialize_schemaDownSql, nil
+}
+
+func _1_initialize_schemaDownSql() (*asset, error) {
+	bytes, err := _1_initialize_schemaDownSqlBytes()
+	if err != nil {
+		return nil, err
+	}
+
+	info := bindataFileInfo{name: "1_initialize_schema.down.sql", size: 109, mode: os.FileMode(420), modTime: time.Unix(1572462519, 0)}
+	a := &asset{bytes: bytes, info: info}
+	return a, nil
+}
+
+var __1_initialize_schemaUpSql = []byte(`
+CREATE TABLE events (
+	ip TEXT,
+	event_type TEXT,
+	timestamp DATETIME,
+        count INTEGER
+);
+
+CREATE INDEX idx_events_ip ON events (ip);
+CREATE INDEX idx_events_type ON events (event_type);
+-- CREATE INDEX idx_events_timestamp ON events (timestamp);
+`)
+
+func _1_initialize_schemaUpSqlBytes() ([]byte, error) {
+	return __1_initialize_schemaUpSql, nil
+}
+
+func _1_initialize_schemaUpSql() (*asset, error) {
+	bytes, err := _1_initialize_schemaUpSqlBytes()
+	if err != nil {
+		return nil, err
+	}
+
+	info := bindataFileInfo{name: "1_initialize_schema.up.sql", size: 254, mode: os.FileMode(420), modTime: time.Unix(1572465612, 0)}
+	a := &asset{bytes: bytes, info: info}
+	return a, nil
+}
+
+// Asset loads and returns the asset for the given name.
+// It returns an error if the asset could not be found or
+// could not be loaded.
+func Asset(name string) ([]byte, error) {
+	cannonicalName := strings.Replace(name, "\\", "/", -1)
+	if f, ok := _bindata[cannonicalName]; ok {
+		a, err := f()
+		if err != nil {
+			return nil, fmt.Errorf("Asset %s can't read by error: %v", name, err)
+		}
+		return a.bytes, nil
+	}
+	return nil, fmt.Errorf("Asset %s not found", name)
+}
+
+// MustAsset is like Asset but panics when Asset would return an error.
+// It simplifies safe initialization of global variables.
+func MustAsset(name string) []byte {
+	a, err := Asset(name)
+	if err != nil {
+		panic("asset: Asset(" + name + "): " + err.Error())
+	}
+
+	return a
+}
+
+// AssetInfo loads and returns the asset info for the given name.
+// It returns an error if the asset could not be found or
+// could not be loaded.
+func AssetInfo(name string) (os.FileInfo, error) {
+	cannonicalName := strings.Replace(name, "\\", "/", -1)
+	if f, ok := _bindata[cannonicalName]; ok {
+		a, err := f()
+		if err != nil {
+			return nil, fmt.Errorf("AssetInfo %s can't read by error: %v", name, err)
+		}
+		return a.info, nil
+	}
+	return nil, fmt.Errorf("AssetInfo %s not found", name)
+}
+
+// AssetNames returns the names of the assets.
+func AssetNames() []string {
+	names := make([]string, 0, len(_bindata))
+	for name := range _bindata {
+		names = append(names, name)
+	}
+	return names
+}
+
+// _bindata is a table, holding each asset generator, mapped to its name.
+var _bindata = map[string]func() (*asset, error){
+	"1_initialize_schema.down.sql": _1_initialize_schemaDownSql,
+	"1_initialize_schema.up.sql": _1_initialize_schemaUpSql,
+}
+
+// AssetDir returns the file names below a certain
+// directory embedded in the file by go-bindata.
+// For example if you run go-bindata on data/... and data contains the
+// following hierarchy:
+//     data/
+//       foo.txt
+//       img/
+//         a.png
+//         b.png
+// then AssetDir("data") would return []string{"foo.txt", "img"}
+// AssetDir("data/img") would return []string{"a.png", "b.png"}
+// AssetDir("foo.txt") and AssetDir("notexist") would return an error
+// AssetDir("") will return []string{"data"}.
+func AssetDir(name string) ([]string, error) {
+	node := _bintree
+	if len(name) != 0 {
+		cannonicalName := strings.Replace(name, "\\", "/", -1)
+		pathList := strings.Split(cannonicalName, "/")
+		for _, p := range pathList {
+			node = node.Children[p]
+			if node == nil {
+				return nil, fmt.Errorf("Asset %s not found", name)
+			}
+		}
+	}
+	if node.Func != nil {
+		return nil, fmt.Errorf("Asset %s not found", name)
+	}
+	rv := make([]string, 0, len(node.Children))
+	for childName := range node.Children {
+		rv = append(rv, childName)
+	}
+	return rv, nil
+}
+
+type bintree struct {
+	Func     func() (*asset, error)
+	Children map[string]*bintree
+}
+var _bintree = &bintree{nil, map[string]*bintree{
+	"1_initialize_schema.down.sql": &bintree{_1_initialize_schemaDownSql, map[string]*bintree{}},
+	"1_initialize_schema.up.sql": &bintree{_1_initialize_schemaUpSql, map[string]*bintree{}},
+}}
+
+// RestoreAsset restores an asset under the given directory
+func RestoreAsset(dir, name string) error {
+	data, err := Asset(name)
+	if err != nil {
+		return err
+	}
+	info, err := AssetInfo(name)
+	if err != nil {
+		return err
+	}
+	err = os.MkdirAll(_filePath(dir, filepath.Dir(name)), os.FileMode(0755))
+	if err != nil {
+		return err
+	}
+	err = ioutil.WriteFile(_filePath(dir, name), data, info.Mode())
+	if err != nil {
+		return err
+	}
+	err = os.Chtimes(_filePath(dir, name), info.ModTime(), info.ModTime())
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+// RestoreAssets restores an asset under the given directory recursively
+func RestoreAssets(dir, name string) error {
+	children, err := AssetDir(name)
+	// File
+	if err != nil {
+		return RestoreAsset(dir, name)
+	}
+	// Dir
+	for _, child := range children {
+		err = RestoreAssets(dir, filepath.Join(name, child))
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func _filePath(dir, name string) string {
+	cannonicalName := strings.Replace(name, "\\", "/", -1)
+	return filepath.Join(append([]string{dir}, strings.Split(cannonicalName, "/")...)...)
+}
+
diff --git a/db/sqlite/driver.go b/db/sqlite/driver.go
new file mode 100644
index 0000000..aacaa48
--- /dev/null
+++ b/db/sqlite/driver.go
@@ -0,0 +1,109 @@
+package sqlite
+
+import (
+	"database/sql"
+	"time"
+
+	ippb "git.autistici.org/ai3/tools/iprep/proto"
+)
+
+var statements = map[string]string{
+	"insert_event": `
+INSERT INTO events (ip, event_type, count, timestamp) VALUES (?, ?, ?, ?)
+`,
+	"scan_by_ip": `
+SELECT
+  event_type, SUM(count) AS sum
+FROM
+  events
+WHERE
+  ip = ?
+  AND timestamp > ?
+GROUP BY
+  event_type
+`,
+}
+
+type DB struct {
+	db    *sql.DB
+	stmts StatementMap
+}
+
+func Open(path string) (*DB, error) {
+	db, err := sqlOpen(path)
+	if err != nil {
+		return nil, err
+	}
+
+	stmts, err := NewStatementMap(db, statements)
+	if err != nil {
+		db.Close()
+		return nil, err
+	}
+
+	return &DB{
+		db:    db,
+		stmts: stmts,
+	}, nil
+}
+
+func (db *DB) Close() {
+	db.db.Close()
+}
+
+func (db *DB) AddAggregate(aggr *ippb.Aggregate) error {
+	tx, err := db.db.Begin()
+	if err != nil {
+		return err
+	}
+	stmt := db.stmts.Get(tx, "insert_event")
+	now := time.Now()
+
+	for _, bt := range aggr.ByType {
+		for _, item := range bt.ByIp {
+			if _, err := stmt.Exec(item.Ip, bt.Type, item.Count, now); err != nil {
+				tx.Rollback() // nolint
+				return err
+			}
+		}
+	}
+
+	return tx.Commit()
+}
+
+func (db *DB) WipeOldData(age time.Duration) error {
+	_, err := db.db.Exec(`
+DELETE FROM events WHERE timestamp < ?
+`, time.Now().Add(-age))
+	return err
+}
+
+func (db *DB) ScanIP(startTime time.Time, ip string) (ippb.Map, error) {
+	tx, err := db.db.Begin()
+	if err != nil {
+		return nil, err
+	}
+	defer tx.Rollback() // nolint
+
+	rows, err := db.stmts.Get(tx, "scan_by_ip").Query(ip, startTime)
+	if err != nil {
+		return nil, err
+	}
+	defer rows.Close()
+
+	m := make(ippb.Map)
+	for rows.Next() {
+		var evType string
+		var count int64
+		if err := rows.Scan(&evType, &count); err != nil {
+			return nil, err
+		}
+		m.Incr(evType, ip, count)
+	}
+
+	return m, nil
+}
+
+func (db *DB) ScanType(startTime time.Time, t string) (ippb.Map, error) {
+	return nil, nil
+}
diff --git a/db/sqlite/migrations/1_initialize_schema.down.sql b/db/sqlite/migrations/1_initialize_schema.down.sql
new file mode 100644
index 0000000..9947028
--- /dev/null
+++ b/db/sqlite/migrations/1_initialize_schema.down.sql
@@ -0,0 +1,4 @@
+-- DROP INDEX idx_events_timestamp;
+DROP INDEX idx_events_type;
+DROP INDEX idx_events_ip;
+DROP TABLE events;
diff --git a/db/sqlite/migrations/1_initialize_schema.up.sql b/db/sqlite/migrations/1_initialize_schema.up.sql
new file mode 100644
index 0000000..7a72853
--- /dev/null
+++ b/db/sqlite/migrations/1_initialize_schema.up.sql
@@ -0,0 +1,11 @@
+
+CREATE TABLE events (
+	ip TEXT,
+	event_type TEXT,
+	timestamp DATETIME,
+        count INTEGER
+);
+
+CREATE INDEX idx_events_ip ON events (ip);
+CREATE INDEX idx_events_type ON events (event_type);
+-- CREATE INDEX idx_events_timestamp ON events (timestamp);
diff --git a/db/sqlite/sql.go b/db/sqlite/sql.go
new file mode 100644
index 0000000..8b9472f
--- /dev/null
+++ b/db/sqlite/sql.go
@@ -0,0 +1,102 @@
+//go:generate go-bindata --nocompress --pkg sqlite --ignore \.go$ -o bindata.go -prefix migrations/ ./migrations
+package sqlite
+
+import (
+	"database/sql"
+	"fmt"
+	"log"
+
+	"github.com/mattes/migrate"
+	"github.com/mattes/migrate/database/sqlite3"
+	bindata "github.com/mattes/migrate/source/go-bindata"
+)
+
+const dbDriver = "sqlite3"
+
+var DebugMigrations = false
+
+// Open a SQLite database and run the database migrations.
+func sqlOpen(dburi string) (*sql.DB, error) {
+	db, err := sql.Open(dbDriver, dburi)
+	if err != nil {
+		return nil, err
+	}
+
+	if err = runDatabaseMigrations(db); err != nil {
+		db.Close() // nolint
+		return nil, err
+	}
+
+	return db, nil
+}
+
+type migrateLogger struct{}
+
+func (l migrateLogger) Printf(format string, v ...interface{}) {
+	log.Printf("db: "+format, v...)
+}
+
+func (l migrateLogger) Verbose() bool { return true }
+
+func runDatabaseMigrations(db *sql.DB) error {
+	si, err := bindata.WithInstance(bindata.Resource(
+		AssetNames(),
+		func(name string) ([]byte, error) {
+			return Asset(name)
+		}))
+	if err != nil {
+		return err
+	}
+
+	di, err := sqlite3.WithInstance(db, &sqlite3.Config{
+		MigrationsTable: sqlite3.DefaultMigrationsTable,
+		DatabaseName:    "usermetadb",
+	})
+	if err != nil {
+		return err
+	}
+
+	m, err := migrate.NewWithInstance("bindata", si, dbDriver, di)
+	if err != nil {
+		return err
+	}
+	if DebugMigrations {
+		log.Printf("running database migrations")
+		m.Log = &migrateLogger{}
+	}
+
+	if err := m.Up(); err != nil && err != migrate.ErrNoChange {
+		return err
+	}
+	return nil
+}
+
+// A StatementMap holds named compiled statements.
+type StatementMap map[string]*sql.Stmt
+
+// NewStatementMap compiles the given named statements and returns a
+// new StatementMap.
+func NewStatementMap(db *sql.DB, statements map[string]string) (StatementMap, error) {
+	stmts := make(map[string]*sql.Stmt)
+	for name, qstr := range statements {
+		stmt, err := db.Prepare(qstr)
+		if err != nil {
+			return nil, fmt.Errorf("error compiling statement '%s': %v", name, err)
+		}
+		stmts[name] = stmt
+	}
+	return StatementMap(stmts), nil
+}
+
+// Close all resources associated with the StatementMap.
+func (m StatementMap) Close() {
+	for _, s := range m {
+		s.Close() // nolint
+	}
+}
+
+// Get a named compiled statement (or nil if not found), and associate
+// it with the given transaction.
+func (m StatementMap) Get(tx *sql.Tx, name string) *sql.Stmt {
+	return tx.Stmt(m[name])
+}
diff --git a/server/server.go b/server/server.go
index 75d5a27..86f0ef9 100644
--- a/server/server.go
+++ b/server/server.go
@@ -17,7 +17,7 @@ import (
 type Server struct {
 	*script.Manager
 	horizon time.Duration
-	db      *db.DB
+	db      db.DB
 	stop    chan struct{}
 }
 
-- 
GitLab