diff --git a/backend_leveldb.go b/backend_leveldb.go new file mode 100644 index 0000000000000000000000000000000000000000..f84d557acbb303ce688e9a3af0f4c790a53f4dfd --- /dev/null +++ b/backend_leveldb.go @@ -0,0 +1,250 @@ +package liber + +import ( + "bytes" + "encoding/binary" + + "github.com/syndtr/goleveldb/leveldb" + ldbfilter "github.com/syndtr/goleveldb/leveldb/filter" + ldbiter "github.com/syndtr/goleveldb/leveldb/iterator" + ldbopt "github.com/syndtr/goleveldb/leveldb/opt" + ldbutil "github.com/syndtr/goleveldb/leveldb/util" +) + +var ( + ldbBookBucket = []byte("ebook") + ldbFileBucket = []byte("file") + ldbBookFileBucket = []byte("ebook_file") +) + +type leveldbDB struct { + ldb *leveldb.DB +} + +func newLevelDBBackend(path string) (*leveldbDB, error) { + // Use 128MB of cache and a small Bloom filter. + opts := &ldbopt.Options{ + Filter: ldbfilter.NewBloomFilter(10), + BlockCacheCapacity: 1 << 28, // 256MB + } + + ldb, err := leveldb.OpenFile(path, opts) + if err != nil { + return nil, err + } + + l := &leveldbDB{ldb} + if err := l.runMigrations(leveldbMigrations); err != nil { + ldb.Close() + return nil, err + } + + return l, nil +} + +func (db *leveldbDB) Close() { + db.ldb.Close() +} + +var ( + schemaVersionKey = []byte("_liber_schema_version") + leveldbMigrations []databaseMigration +) + +func (db *leveldbDB) getSchemaVersion() uint64 { + data, err := db.ldb.Get(schemaVersionKey, nil) + if err != nil { + return 0 + } + return binary.LittleEndian.Uint64(data) +} + +func (db *leveldbDB) setSchemaVersion(v uint64) { + var b [8]byte + binary.LittleEndian.PutUint64(b[:], v) + db.ldb.Put(schemaVersionKey, b[:], nil) +} + +type databaseMigration struct { + version uint64 + run func(db *leveldb.DB) error +} + +func (db *leveldbDB) runMigrations(migrations []databaseMigration) error { + version := db.getSchemaVersion() + for _, m := range migrations { + if m.version < version { + continue + } + if err := m.run(db.ldb); err != nil { + return err + } + version = m.version + db.setSchemaVersion(version) + } + return nil +} + +func (db *leveldbDB) get(bucket, key []byte, obj interface{}) error { + data, err := db.ldb.Get(levelDBBucketToKey(bucket, key), nil) + if err != nil { + return err + } + return gobDec(data, obj) +} + +func (db *leveldbDB) GetBook(bookid BookId) (*Book, error) { + var b Book + if err := db.get(ldbBookBucket, bookid.Key(), &b); err != nil { + return nil, err + } + return &b, nil +} + +func (db *leveldbDB) GetFile(path string) (*File, error) { + var f File + if err := db.get(ldbFileBucket, []byte(path), &f); err != nil { + return nil, err + } + return &f, nil +} + +func (db *leveldbDB) GetBookFiles(bookid BookId) ([]*File, error) { + start, end := keyRange(levelDBBucketToKey(ldbBookFileBucket, bookid.Key())) + it := db.ldb.NewIterator( + &ldbutil.Range{ + Start: start, + Limit: end, + }, + nil, + ) + defer it.Release() + + var out []*File + for it.Next() { + var filepath string + if gobDec(it.Value(), &filepath) == nil { + if file, err := db.GetFile(filepath); err == nil { + out = append(out, file) + } + } + } + if err := it.Error(); err != nil { + return nil, err + } + return out, nil +} + +func (db *leveldbDB) put(bucket, key []byte, obj interface{}) error { + data, err := gobEnc(obj) + if err != nil { + return err + } + if bucket != nil { + key = levelDBBucketToKey(bucket, key) + } + return db.ldb.Put(key, data, nil) +} + +func (db *leveldbDB) PutBook(b *Book) error { + return db.put(ldbBookBucket, b.Id.Key(), b) +} + +func (db *leveldbDB) PutFile(f *File) error { + if err := db.put(ldbFileBucket, []byte(f.Path), f); err != nil { + return err + } + if !f.Error { + return db.put(ldbBookFileBucket, levelDBFileBookKey(f.Path, f.Id), f.Path) + } + return nil +} + +func (db *leveldbDB) delete(bucket, key []byte) error { + return db.ldb.Delete(levelDBBucketToKey(bucket, key), nil) +} + +func (db *leveldbDB) DeleteBook(bookid BookId) error { + return db.delete(ldbBookBucket, bookid.Key()) +} + +func (db *leveldbDB) DeleteFile(path string) error { + f, err := db.GetFile(path) + if err != nil { + return nil + } + + db.delete(ldbFileBucket, []byte(path)) + db.delete(ldbBookFileBucket, levelDBFileBookKey(path, f.Id)) + + // Delete the book if there are no files left. + if files, err := db.GetBookFiles(f.Id); err == nil && len(files) == 0 { + db.DeleteBook(f.Id) + } + + return nil +} + +func (db *leveldbDB) scanBucket(bucket []byte) ldbiter.Iterator { + start, end := keyRange(bucket) + return db.ldb.NewIterator(&ldbutil.Range{ + Start: start, + Limit: end, + }, &ldbopt.ReadOptions{ + DontFillCache: true, + }) +} + +func (db *leveldbDB) ScanBooks() <-chan *Book { + ch := make(chan *Book, 100) + go func() { + defer close(ch) + iter := db.scanBucket(ldbBookBucket) + defer iter.Release() + for iter.Next() { + var b Book + if err := gobDec(iter.Value(), &b); err != nil { + continue + } + ch <- &b + } + }() + return ch +} + +func (db *leveldbDB) ScanFiles() <-chan *File { + ch := make(chan *File, 100) + go func() { + defer close(ch) + iter := db.scanBucket(ldbFileBucket) + defer iter.Release() + for iter.Next() { + var f File + if err := gobDec(iter.Value(), &f); err != nil { + continue + } + ch <- &f + } + }() + return ch +} + +var ldbKeySeparator = []byte{'/'} + +func levelDBBucketToKey(bucket, key []byte) []byte { + return bytes.Join([][]byte{bucket, key}, ldbKeySeparator) +} + +func levelDBFileBookKey(path string, bookid BookId) []byte { + return bytes.Join([][]byte{bookid.Key(), []byte(path)}, ldbKeySeparator) +} + +func keyRange(prefix []byte) ([]byte, []byte) { + start := make([]byte, len(prefix)+1) + end := make([]byte, len(prefix)+1) + copy(start, prefix) + copy(end, prefix) + start[len(prefix)] = ldbKeySeparator[0] + end[len(prefix)] = ldbKeySeparator[0] + 1 + return start, end +} diff --git a/database.go b/database.go index f0c0ce8cac3d5c31caf6421a25d029a1f242cdb9..b5fc26570a6c6a41ae82fd0942e3d202dc35cd64 100644 --- a/database.go +++ b/database.go @@ -27,20 +27,6 @@ import ( blevegoleveldb "github.com/blevesearch/bleve/index/store/goleveldb" blevemapping "github.com/blevesearch/bleve/mapping" blevequery "github.com/blevesearch/bleve/search/query" - - "github.com/syndtr/goleveldb/leveldb" - ldbfilter "github.com/syndtr/goleveldb/leveldb/filter" - ldbiter "github.com/syndtr/goleveldb/leveldb/iterator" - ldbopt "github.com/syndtr/goleveldb/leveldb/opt" - ldbutil "github.com/syndtr/goleveldb/leveldb/util" -) - -var ( - BookBucket = []byte("ebook") - FileBucket = []byte("file") - BookFileBucket = []byte("ebook_file") - - keySeparator = byte('/') ) type BookId uint64 @@ -196,13 +182,29 @@ func defaultIndexMapping() blevemapping.IndexMapping { return i } +type backend interface { + Close() + + GetBook(BookId) (*Book, error) + GetFile(string) (*File, error) + GetBookFiles(BookId) ([]*File, error) + PutBook(*Book) error + PutFile(*File) error + DeleteBook(BookId) error + DeleteFile(string) error + + ScanBooks() <-chan *Book + ScanFiles() <-chan *File +} + type Database struct { + backend + path string - ldb *leveldb.DB index bleve.Index } -func NewDb(path string) (*Database, error) { +func NewDb(backendType, path string) (*Database, error) { // Make sure that path exists. if _, err := os.Stat(path); err != nil { if err := os.Mkdir(path, 0700); err != nil { @@ -210,32 +212,26 @@ func NewDb(path string) (*Database, error) { } } - // Initialize our database and the index. - d := &Database{path: path} - if err := d.setupLevelDb(filepath.Join(path, "db")); err != nil { - return nil, err + var b backend + switch backendType { + case "leveldb": + ldb, err := newLevelDBBackend(path) + if err != nil { + return nil, err + } + b = ldb + default: + return nil, errors.New("unknown backend") } + + d := &Database{backend: b, path: path} + // Initialize the index. if err := d.setupIndex(filepath.Join(path, "index")); err != nil { return nil, err } return d, nil } -func (db *Database) setupLevelDb(path string) error { - // Use 128MB of cache and a small Bloom filter. - opts := &ldbopt.Options{ - Filter: ldbfilter.NewBloomFilter(10), - BlockCacheCapacity: 1 << 28, // 256MB - } - - ldb, err := leveldb.OpenFile(path, opts) - if err != nil { - return err - } - db.ldb = ldb - return nil -} - func (db *Database) setupIndex(path string) error { var err error if _, serr := os.Stat(path); serr == nil { @@ -255,198 +251,23 @@ func (db *Database) setupIndex(path string) error { return nil } -var schemaVersionKey = []byte("_liber_schema_version") - -func (db *Database) getSchemaVersion() uint64 { - data, err := db.ldb.Get(schemaVersionKey, nil) - if err != nil { - return 0 - } - return binary.LittleEndian.Uint64(data) -} - -func (db *Database) setSchemaVersion(v uint64) { - var b [8]byte - binary.LittleEndian.PutUint64(b[:], v) - db.ldb.Put(schemaVersionKey, b[:], nil) -} - -type databaseMigration struct { - version uint64 - run func(db *Database) error -} - -func (db *Database) runMigrations(migrations []databaseMigration) error { - version := db.getSchemaVersion() - for _, m := range migrations { - if m.version < version { - continue - } - if err := m.run(db); err != nil { - return err - } - version = m.version - db.setSchemaVersion(version) - } - return nil -} - func (db *Database) Close() { db.index.Close() - db.ldb.Close() -} - -func (db *Database) GetBook(bookid BookId) (*Book, error) { - var b Book - if err := db.Get(BookBucket, bookid.Key(), &b); err != nil { - return nil, err - } - return &b, nil -} - -func (db *Database) GetFile(path string) (*File, error) { - var f File - if err := db.Get(FileBucket, []byte(path), &f); err != nil { - return nil, err - } - return &f, nil -} - -func (db *Database) GetBookFiles(bookid BookId) ([]*File, error) { - start, end := keyRange(bktToKey(BookFileBucket, bookid.Key())) - it := db.ldb.NewIterator( - &ldbutil.Range{ - Start: start, - Limit: end, - }, - nil, - ) - defer it.Release() - - var out []*File - for it.Next() { - var filepath string - if gob.NewDecoder(bytes.NewReader(it.Value())).Decode(&filepath) == nil { - if file, err := db.GetFile(filepath); err == nil { - out = append(out, file) - } - } - } - if err := it.Error(); err != nil { - return nil, err - } - return out, nil -} - -func (db *Database) Get(bucket, key []byte, obj interface{}) error { - data, err := db.ldb.Get(bktToKey(bucket, key), nil) - if err != nil { - return err - } - return gob.NewDecoder(bytes.NewReader(data)).Decode(obj) + db.backend.Close() } func (db *Database) PutBook(b *Book) error { - if err := db.Put(BookBucket, b.Id.Key(), b); err != nil { + if err := db.backend.PutBook(b); err != nil { return err } return db.index.Index(b.Id.String(), flatten(b)) } -func fileBookKey(path string, bookid BookId) []byte { - return bytes.Join([][]byte{bookid.Key(), []byte(path)}, []byte{keySeparator}) -} - -func (db *Database) PutFile(f *File) error { - if err := db.Put(FileBucket, []byte(f.Path), f); err != nil { - return err - } - if !f.Error { - return db.Put(BookFileBucket, fileBookKey(f.Path, f.Id), f.Path) - } - return nil -} - -func (db *Database) RawPut(key, value []byte) error { - return db.ldb.Put(key, value, nil) -} - -func (db *Database) Put(bucket, key []byte, obj interface{}) error { - var buf bytes.Buffer - if err := gob.NewEncoder(&buf).Encode(obj); err != nil { - return err - } - return db.RawPut(bktToKey(bucket, key), buf.Bytes()) -} - func (db *Database) DeleteBook(bookid BookId) error { - db.Delete(BookBucket, bookid.Key()) + db.backend.DeleteBook(bookid) return db.index.Delete(bookid.String()) } -func (db *Database) DeleteFile(path string) error { - f, err := db.GetFile(path) - if err != nil { - return nil - } - - db.Delete(FileBucket, []byte(path)) - db.Delete(BookFileBucket, fileBookKey(path, f.Id)) - - // Delete the book if there are no files left. - if files, err := db.GetBookFiles(f.Id); err == nil && len(files) == 0 { - db.DeleteBook(f.Id) - } - - return nil -} - -func (db *Database) Delete(bucket, key []byte) error { - return db.ldb.Delete(bktToKey(bucket, key), nil) -} - -type DatabaseIterator struct { - iter ldbiter.Iterator -} - -func (i *DatabaseIterator) Close() error { - defer i.iter.Release() - return i.iter.Error() -} - -func (i *DatabaseIterator) Next() bool { - return i.iter.Next() -} - -func (i *DatabaseIterator) Id() BookId { - return keyToId(i.iter.Key()) -} - -func (i *DatabaseIterator) RawKey() []byte { - return i.iter.Key() -} - -func (i *DatabaseIterator) RawValue() []byte { - return i.iter.Value() -} - -func (i *DatabaseIterator) Value(obj interface{}) error { - return gob.NewDecoder(bytes.NewReader(i.iter.Value())).Decode(obj) -} - -// Scan an entire bucket. -func (db *Database) Scan(bucket []byte) *DatabaseIterator { - start, end := keyRange(bucket) - it := db.ldb.NewIterator(&ldbutil.Range{ - Start: start, - Limit: end, - }, &ldbopt.ReadOptions{ - DontFillCache: true, - }) - - return &DatabaseIterator{iter: it} -} - func writeBytes(w io.Writer, b []byte) error { binary.Write(w, binary.LittleEndian, uint32(len(b))) _, err := w.Write(b) @@ -472,14 +293,45 @@ func readBytes(r io.Reader) ([]byte, error) { return b, err } +var ( + dumpTypeBook = []byte("book") + dumpTypeFile = []byte("file") +) + +func gobEnc(obj interface{}) ([]byte, error) { + var buf bytes.Buffer + if err := gob.NewEncoder(&buf).Encode(obj); err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +func gobDec(data []byte, obj interface{}) error { + return gob.NewDecoder(bytes.NewReader(data)).Decode(obj) +} + +func dumpBook(w io.Writer, b *Book) { + writeBytes(w, dumpTypeBook) + data, _ := gobEnc(b) + writeBytes(w, data) +} + +func dumpFile(w io.Writer, f *File) { + writeBytes(w, dumpTypeFile) + data, _ := gobEnc(f) + writeBytes(w, data) +} + // Dump the contents of the database to a Writer. func (db *Database) Dump(w io.Writer) error { - it := db.ldb.NewIterator(nil, &ldbopt.ReadOptions{DontFillCache: true}) - defer it.Release() - count := 0 - for it.Next() { - writeBytes(w, it.Key()) - writeBytes(w, it.Value()) + var count int + for b := range db.backend.ScanBooks() { + dumpBook(w, b) + if files, err := db.backend.GetBookFiles(b.Id); err == nil { + for _, f := range files { + dumpFile(w, f) + } + } count++ } log.Printf("dumped %d entries from the database", count) @@ -488,24 +340,42 @@ func (db *Database) Dump(w io.Writer) error { // Restore a backup to the current database (assuming it is empty). func (db *Database) Restore(r io.Reader) error { - count := 0 + var count int for { - key, err := readBytes(r) + objType, err := readBytes(r) if err == io.EOF { break - } - if err != nil { + } else if err != nil { return err } - value, err := readBytes(r) + + objData, err := readBytes(r) if err == io.EOF { return errors.New("unexpected eof") - } - if err != nil { + } else if err != nil { return err } - db.RawPut(key, value) + switch { + case bytes.Equal(objType, dumpTypeBook): + var b Book + if err := gobDec(objData, &b); err != nil { + return err + } + if err := db.PutBook(&b); err != nil { + return err + } + case bytes.Equal(objType, dumpTypeFile): + var f File + if err := gobDec(objData, &f); err != nil { + return err + } + if err := db.PutFile(&f); err != nil { + return err + } + default: + continue + } count++ if count%1000 == 0 { log.Printf("restored %d entries", count) @@ -560,17 +430,12 @@ func (db *Database) Reindex() error { // Call a function for all books in the database. func (db *Database) onAllBooks(f func(*Book) error) error { - it := db.Scan(BookBucket) - for it.Next() { - var book Book - if err := it.Value(&book); err != nil { - continue - } - if err := f(&book); err != nil { + for book := range db.ScanBooks() { + if err := f(book); err != nil { return err } } - return it.Close() + return nil } type SearchResult struct { @@ -633,26 +498,3 @@ func (db *Database) Find(uniqueIds []string) (*Book, error) { return db.GetBook(ParseID(result.Hits[0].ID)) } - -func bktToKey(bucket, key []byte) []byte { - return bytes.Join([][]byte{bucket, key}, []byte{keySeparator}) -} - -// Input is a full key (including bucket). -func keyToId(key []byte) BookId { - n := bytes.Index(key, []byte{keySeparator}) - if n < 0 { - return 0 - } - return ParseBinaryID(key[n+1:]) -} - -func keyRange(prefix []byte) ([]byte, []byte) { - start := make([]byte, len(prefix)+1) - end := make([]byte, len(prefix)+1) - copy(start, prefix) - copy(end, prefix) - start[len(prefix)] = keySeparator - end[len(prefix)] = keySeparator + 1 - return start, end -} diff --git a/database_test.go b/database_test.go index 574392cec0e1f96547c831f7c5dd6e8386ee7bca..ca9ea22f0f00540aa465c14e6a21256be4e0677c 100644 --- a/database_test.go +++ b/database_test.go @@ -24,7 +24,7 @@ func (td *testDatabase) Close() { func newEmptyTestDatabase(t *testing.T) (*testDatabase, *Database) { path, _ := ioutil.TempDir("", "testdb-") - db, err := NewDb(path) + db, err := NewDb("leveldb", path) if err != nil { t.Fatalf("NewDb(): %v", err) } @@ -33,7 +33,7 @@ func newEmptyTestDatabase(t *testing.T) (*testDatabase, *Database) { func newTestDatabase(t *testing.T) (*testDatabase, *Database) { path, _ := ioutil.TempDir("", "testdb-") - db, err := NewDb(path) + db, err := NewDb("leveldb", path) if err != nil { t.Fatalf("NewDb(): %v", err) } diff --git a/sync.go b/sync.go index db4b959f7fada0d51b0b7c42a9fe90879af968c8..b16ced567eec76fbcea559b993923f052b084901 100644 --- a/sync.go +++ b/sync.go @@ -195,13 +195,7 @@ func (db *Database) findMissing(srv SyncClient) chan string { go func() { n := 0 var req diffRequest - iter := db.Scan(BookBucket) - defer iter.Close() - for iter.Next() { - var book Book - if err := iter.Value(&book); err != nil { - continue - } + for book := range db.ScanBooks() { req.Candidates = append(req.Candidates, uniqueIds{ Id: book.Id.String(), Unique: book.Metadata.Uniques(), diff --git a/update.go b/update.go index 6f7a0a7f727573d6853f5830b46da9e2ef69dee0..fc28c2f3c1be1dbe6324a1c22b0906de5f8e9a4c 100644 --- a/update.go +++ b/update.go @@ -49,13 +49,7 @@ type updateContext struct { } func (uc *updateContext) dbFileScanner(fileCh chan fileData) { - iter := uc.db.Scan(FileBucket) - defer iter.Close() - for iter.Next() { - var f File - if err := iter.Value(&f); err != nil { - continue - } + for f := range uc.db.ScanFiles() { fileCh <- fileData{ source: SourceDB, path: f.Path, diff --git a/update_test.go b/update_test.go index 553e77d89fd9cb611603c4999731d42ed37f9393..6f80c86b56394826d2f7ee5acea37c732c54e76d 100644 --- a/update_test.go +++ b/update_test.go @@ -22,34 +22,18 @@ func createTestFs(fs map[string]string) string { func checkDbPathIntegrity(t *testing.T, db *Database) { // Files should have relative paths. - iter := db.Scan(FileBucket) - for iter.Next() { - var f File - if err := iter.Value(&f); err != nil { - t.Fatal(err) - } + for f := range db.ScanFiles() { if strings.HasPrefix(f.Path, "/") { t.Errorf("file has absolute path: %v", f.Path) } } - if err := iter.Close(); err != nil { - t.Fatalf("Scan(FileBucket) error: %v", err) - } // Book cover images should have relative paths. - iter = db.Scan(BookBucket) - for iter.Next() { - var b Book - if err := iter.Value(&b); err != nil { - t.Fatal(err) - } + for b := range db.ScanBooks() { if b.CoverPath != "" && strings.HasPrefix(b.CoverPath, "/") { t.Errorf("file has absolute path: %v", b.CoverPath) } } - if err := iter.Close(); err != nil { - t.Fatalf("Scan(BookBucket) error: %v", err) - } } func TestDatabase_Update(t *testing.T) {