Skip to content
Snippets Groups Projects
database.go 15.3 KiB
Newer Older
ale's avatar
ale committed
package liber

import (
	"bytes"
ale's avatar
ale committed
	cryptorand "crypto/rand"
ale's avatar
ale committed
	"encoding/binary"
ale's avatar
ale committed
	"errors"
ale's avatar
ale committed
	"fmt"
ale's avatar
ale committed
	"math/rand"
	"os"
	"path/filepath"
	"strconv"
	"time"

	"github.com/blevesearch/bleve/v2"
	"github.com/blevesearch/bleve/v2/analysis/analyzer/custom"
	_ "github.com/blevesearch/bleve/v2/analysis/analyzer/keyword"
	_ "github.com/blevesearch/bleve/v2/analysis/analyzer/simple"
	_ "github.com/blevesearch/bleve/v2/analysis/analyzer/standard"
	"github.com/blevesearch/bleve/v2/analysis/lang/en"
	"github.com/blevesearch/bleve/v2/analysis/token/edgengram"
	"github.com/blevesearch/bleve/v2/analysis/token/lowercase"
	"github.com/blevesearch/bleve/v2/analysis/tokenizer/unicode"
	blevegoleveldb "github.com/blevesearch/bleve/v2/index/upsidedown/store/goleveldb"
	blevemapping "github.com/blevesearch/bleve/v2/mapping"
	blevequery "github.com/blevesearch/bleve/v2/search/query"

	"github.com/syndtr/goleveldb/leveldb"
	ldbfilter "github.com/syndtr/goleveldb/leveldb/filter"
	ldbiter "github.com/syndtr/goleveldb/leveldb/iterator"
	ldbopt "github.com/syndtr/goleveldb/leveldb/opt"
	ldbutil "github.com/syndtr/goleveldb/leveldb/util"
ale's avatar
ale committed
)

var (
ale's avatar
ale committed
	BookBucket     = []byte("ebook")
	FileBucket     = []byte("file")
	BookFileBucket = []byte("ebook_file")
ale's avatar
ale committed

	keySeparator = byte('/')
)

type BookId uint64

func (id BookId) String() string {
	return strconv.FormatUint(uint64(id), 10)
}

func (id BookId) Key() []byte {
	var b [8]byte
	binary.LittleEndian.PutUint64(b[:], uint64(id))
	return b[:]
}

func NewID() BookId {
	return BookId(rand.Int63())
}

func ParseID(s string) BookId {
	id, _ := strconv.ParseUint(s, 10, 64)
	return BookId(id)
}

func ParseBinaryID(b []byte) BookId {
	return BookId(binary.LittleEndian.Uint64(b))
ale's avatar
ale committed
}

ale's avatar
ale committed
type Book struct {
	Id        BookId
	CoverPath string
	Metadata  *Metadata
}

ale's avatar
ale committed
func (b *Book) String() string {
	return fmt.Sprintf("%s (%s)", b.Metadata.String(), b.Id.String())
}

ale's avatar
ale committed
type File struct {
	Path     string
	FileType string
	Mtime    time.Time
	Size     int64
	Error    bool
	Id       BookId
}

func (f *File) HasChanged(info os.FileInfo) bool {
	return !info.ModTime().Equal(f.Mtime) || info.Size() != f.Size
}

ale's avatar
ale committed
func init() {
	// Seed the RNG to a random value.
	var seed int64
	binary.Read(cryptorand.Reader, binary.LittleEndian, &seed)
	rand.Seed(seed)
}

ale's avatar
ale committed
// The structure that gets actually indexed.
type flatBook struct {
	Title       string   `json:"title"`
	Author      []string `json:"author"`
	Description string   `json:"description"`
	ISBN        []string `json:"isbn"`
	Unique      []string `json:"_unique"`
	Suggest     []string `json:"_suggest"`
ale's avatar
ale committed
}

func (f *flatBook) Type() string {
	return "ebook"
}

func flatten(book *Book) *flatBook {
	suggest := []string{book.Metadata.Title}
	if len(book.Metadata.Creator) > 0 {
		suggest = append(suggest, book.Metadata.Creator...)
	}
ale's avatar
ale committed
	return &flatBook{
		Title:       book.Metadata.Title,
		Author:      book.Metadata.Creator,
		Description: book.Metadata.Description,
		ISBN:        book.Metadata.ISBN,
		Unique:      book.Metadata.Uniques(),
		Suggest:     suggest,
ale's avatar
ale committed
	}
}

var defaultTextAnalyzer = "standard"

func metadataDocumentMapping() *blevemapping.DocumentMapping {
	md := bleve.NewDocumentStaticMapping()
ale's avatar
ale committed

ale's avatar
ale committed
	textFieldMapping := bleve.NewTextFieldMapping()
	textFieldMapping.Store = false
	textFieldMapping.Analyzer = defaultTextAnalyzer
ale's avatar
ale committed

	authorFieldMapping := bleve.NewTextFieldMapping()
	authorFieldMapping.Store = false
ale's avatar
ale committed
	authorFieldMapping.Analyzer = "simple"
ale's avatar
ale committed

ale's avatar
ale committed
	keywordFieldMapping := bleve.NewTextFieldMapping()
	keywordFieldMapping.Store = false
	keywordFieldMapping.Analyzer = "keyword"
	keywordFieldMapping.IncludeInAll = false
ale's avatar
ale committed

	suggestFieldMapping := bleve.NewTextFieldMapping()
	suggestFieldMapping.Store = false
	suggestFieldMapping.Analyzer = "edgeNgram"
	suggestFieldMapping.IncludeTermVectors = false
	suggestFieldMapping.IncludeInAll = false

ale's avatar
ale committed
	md.AddFieldMappingsAt("title", textFieldMapping)
	md.AddFieldMappingsAt("author", authorFieldMapping)
	md.AddFieldMappingsAt("description", textFieldMapping)
	md.AddFieldMappingsAt("isbn", keywordFieldMapping)
	md.AddFieldMappingsAt("_unique", keywordFieldMapping)
	md.AddFieldMappingsAt("_suggest", suggestFieldMapping)
ale's avatar
ale committed

	return md
}

func defaultIndexMapping() blevemapping.IndexMapping {
ale's avatar
ale committed
	i := bleve.NewIndexMapping()

	err := i.AddCustomTokenFilter("edgeNgram325",
		map[string]interface{}{
			"type": edgengram.Name,
			"min":  3.0,
			"max":  25.0,
		})
	if err != nil {
		log.Fatal("creating token filter edgeNgram325:", err)
	}

	err = i.AddCustomAnalyzer("edgeNgram",
		map[string]interface{}{
			"type":      custom.Name,
			"tokenizer": unicode.Name,
			"token_filters": []interface{}{
				en.PossessiveName,
				lowercase.Name,
				en.StopName,
				"edgeNgram325",
			},
		})
	if err != nil {
		log.Fatal("creating custom analyzer edgeNgram:", err)
ale's avatar
ale committed
	i.AddDocumentMapping("ebook", metadataDocumentMapping())
ale's avatar
ale committed
	i.DefaultAnalyzer = defaultTextAnalyzer
	i.DefaultType = "ebook"
ale's avatar
ale committed
	return i
}

type Database struct {
	path  string
	ldb   *leveldb.DB
ale's avatar
ale committed
	index bleve.Index
}

func NewDb(path string) (*Database, error) {
	// Make sure that path exists.
	if _, err := os.Stat(path); err != nil {
		if err := os.Mkdir(path, 0700); err != nil {
			return nil, err
		}
	}

	// Initialize our database and the index.
	d := &Database{path: path}
ale's avatar
ale committed
	if err := d.setupLevelDb(filepath.Join(path, "db")); err != nil {
		return nil, err
	}
	if err := d.setupIndex(filepath.Join(path, "index")); err != nil {
		return nil, err
	}
	return d, nil
}

func (db *Database) setupLevelDb(path string) error {
ale's avatar
ale committed
	// Use 128MB of cache and a small Bloom filter.
	opts := &ldbopt.Options{
		Filter:             ldbfilter.NewBloomFilter(10),
		BlockCacheCapacity: 1 << 28, // 256MB
	}

	ldb, err := leveldb.OpenFile(path, opts)
ale's avatar
ale committed
	if err != nil {
		return err
	}
	db.ldb = ldb
ale's avatar
ale committed
	return nil
}

func (db *Database) setupIndex(path string) error {
	var err error
	if _, serr := os.Stat(path); serr == nil {
		db.index, err = bleve.Open(path)
	} else {
ale's avatar
ale committed
		// Create a new Bleve index, backed by goleveldb.
		db.index, err = bleve.NewUsing(path, defaultIndexMapping(), bleve.Config.DefaultIndexType, blevegoleveldb.Name, map[string]interface{}{
			"create_if_missing":         true,
			"write_buffer_size":         2 << 25,
			"lru_cache_capacity":        2 << 27,
			"bloom_filter_bits_per_key": 10,
		})
ale's avatar
ale committed
	}
	if err != nil {
		return err
	}
	return nil
}

var schemaVersionKey = []byte("_liber_schema_version")

func (db *Database) getSchemaVersion() uint64 {
	data, err := db.ldb.Get(schemaVersionKey, nil)
	if err != nil {
		return 0
	}
	return binary.LittleEndian.Uint64(data)
}

func (db *Database) setSchemaVersion(v uint64) {
	var b [8]byte
	binary.LittleEndian.PutUint64(b[:], v)
	db.ldb.Put(schemaVersionKey, b[:], nil)
}

type databaseMigration struct {
	version uint64
	run     func(db *Database) error
}

func (db *Database) runMigrations(migrations []databaseMigration) error {
	version := db.getSchemaVersion()
	for _, m := range migrations {
		if m.version < version {
			continue
		}
		if err := m.run(db); err != nil {
			return err
		}
		version = m.version
		db.setSchemaVersion(version)
	}
	return nil
}

ale's avatar
ale committed
func (db *Database) Close() {
	db.index.Close()
	db.ldb.Close()
ale's avatar
ale committed
}

func (db *Database) GetBook(bookid BookId) (*Book, error) {
	var b Book
	if err := db.Get(BookBucket, bookid.Key(), &b); err != nil {
		return nil, err
	}
	return &b, nil
}

func (db *Database) GetFile(path string) (*File, error) {
	var f File
	if err := db.Get(FileBucket, []byte(path), &f); err != nil {
		return nil, err
	}
	return &f, nil
}

ale's avatar
ale committed
func (db *Database) GetBookFiles(bookid BookId) ([]*File, error) {
	start, end := keyRange(bktToKey(BookFileBucket, bookid.Key()))
	it := db.ldb.NewIterator(
		&ldbutil.Range{
			Start: start,
			Limit: end,
		},
		nil,
	)
	defer it.Release()

ale's avatar
ale committed
	var out []*File
	for it.Next() {
ale's avatar
ale committed
		var filepath string
		if gob.NewDecoder(bytes.NewReader(it.Value())).Decode(&filepath) == nil {
ale's avatar
ale committed
			if file, err := db.GetFile(filepath); err == nil {
				out = append(out, file)
			}
		}
	}
	if err := it.Error(); err != nil {
		return nil, err
	}
ale's avatar
ale committed
	return out, nil
}

ale's avatar
ale committed
func (db *Database) Get(bucket, key []byte, obj interface{}) error {
	data, err := db.ldb.Get(bktToKey(bucket, key), nil)
ale's avatar
ale committed
	if err != nil {
		return err
	}
	return gob.NewDecoder(bytes.NewReader(data)).Decode(obj)
ale's avatar
ale committed
}

func (db *Database) PutBook(b *Book) error {
	if err := db.Put(BookBucket, b.Id.Key(), b); err != nil {
		return err
	}
ale's avatar
ale committed
	return db.index.Index(b.Id.String(), flatten(b))
ale's avatar
ale committed
}

ale's avatar
ale committed
func fileBookKey(path string, bookid BookId) []byte {
	return bytes.Join([][]byte{bookid.Key(), []byte(path)}, []byte{keySeparator})
}

ale's avatar
ale committed
func (db *Database) PutFile(f *File) error {
ale's avatar
ale committed
	if err := db.Put(FileBucket, []byte(f.Path), f); err != nil {
		return err
	}
	if !f.Error {
		return db.Put(BookFileBucket, fileBookKey(f.Path, f.Id), f.Path)
	}
	return nil
ale's avatar
ale committed
}

func (db *Database) RawPut(key, value []byte) error {
	return db.ldb.Put(key, value, nil)
ale's avatar
ale committed
func (db *Database) Put(bucket, key []byte, obj interface{}) error {
	var buf bytes.Buffer
	if err := gob.NewEncoder(&buf).Encode(obj); err != nil {
ale's avatar
ale committed
		return err
	}
	return db.RawPut(bktToKey(bucket, key), buf.Bytes())
ale's avatar
ale committed
}

func (db *Database) DeleteBook(bookid BookId) error {
	db.Delete(BookBucket, bookid.Key())
	return db.index.Delete(bookid.String())
}

ale's avatar
ale committed
func (db *Database) DeleteFile(path string) error {
	f, err := db.GetFile(path)
	if err != nil {
		return nil
	}

	db.Delete(FileBucket, []byte(path))
	db.Delete(BookFileBucket, fileBookKey(path, f.Id))

	// Delete the book if there are no files left.
	if files, err := db.GetBookFiles(f.Id); err == nil && len(files) == 0 {
		db.DeleteBook(f.Id)
	}

	return nil
}

ale's avatar
ale committed
func (db *Database) Delete(bucket, key []byte) error {
	return db.ldb.Delete(bktToKey(bucket, key), nil)
ale's avatar
ale committed
}

type DatabaseIterator struct {
	iter ldbiter.Iterator
ale's avatar
ale committed
}

func (i *DatabaseIterator) Close() error {
	defer i.iter.Release()
	return i.iter.Error()
ale's avatar
ale committed
}

func (i *DatabaseIterator) Next() bool {
	return i.iter.Next()
ale's avatar
ale committed
}

func (i *DatabaseIterator) Id() BookId {
	return keyToId(i.iter.Key())
}

func (i *DatabaseIterator) RawKey() []byte {
	return i.iter.Key()
}

func (i *DatabaseIterator) RawValue() []byte {
	return i.iter.Value()
}

ale's avatar
ale committed
func (i *DatabaseIterator) Value(obj interface{}) error {
	return gob.NewDecoder(bytes.NewReader(i.iter.Value())).Decode(obj)
ale's avatar
ale committed
}

// Scan an entire bucket.
func (db *Database) Scan(bucket []byte) *DatabaseIterator {
	start, end := keyRange(bucket)
	it := db.ldb.NewIterator(&ldbutil.Range{
		Start: start,
		Limit: end,
	}, &ldbopt.ReadOptions{
		DontFillCache: true,
	})

	return &DatabaseIterator{iter: it}
ale's avatar
ale committed
}

func writeBytes(w io.Writer, b []byte) error {
	binary.Write(w, binary.LittleEndian, uint32(len(b)))
	_, err := w.Write(b)
	return err
}

func readBytes(r io.Reader) ([]byte, error) {
	var szb [4]byte
	n, err := io.ReadFull(r, szb[:])
	if err != nil {
		return nil, err
	}
	if n != 4 {
		return nil, errors.New("short header read")
	}

	sz := binary.LittleEndian.Uint32(szb[:])
	b := make([]byte, sz)
	n, err = io.ReadFull(r, b)
	if n != int(sz) {
		return nil, errors.New("short data read")
	}
	return b, err
}

// Dump the contents of the database to a Writer.
func (db *Database) Dump(w io.Writer) error {
	it := db.ldb.NewIterator(nil, &ldbopt.ReadOptions{DontFillCache: true})
	defer it.Release()
	count := 0
	for it.Next() {
		writeBytes(w, it.Key())
		writeBytes(w, it.Value())
		count++
	}
	log.Printf("dumped %d entries from the database", count)
	return nil
}

// Restore a backup to the current database (assuming it is empty).
func (db *Database) Restore(r io.Reader) error {
	count := 0
	for {
		key, err := readBytes(r)
		if err == io.EOF {
			break
		}
		if err != nil {
			return err
		}
		value, err := readBytes(r)
		if err == io.EOF {
			return errors.New("unexpected eof")
		}
		if err != nil {
			return err
		}
		db.RawPut(key, value)
		count++
		if count%1000 == 0 {
			log.Printf("restored %d entries", count)
		}
	log.Printf("restore complete (%d entries)", count)
	return db.Reindex()
}

// Reindex the entire database. This is an administrative operation,
// to be performed after an incompatible index schema change. It will
// delete the existing index and re-create it from scratch.
func (db *Database) Reindex() error {
	// Close the index, delete it, and re-open it.
	db.index.Close()

	indexPath := filepath.Join(db.path, "index")
	if err := os.RemoveAll(indexPath); err != nil {
		return err
	}
	if err := db.setupIndex(indexPath); err != nil {
		return err
	}

	// Scan the database and re-index everything. Use batch
	// indexing to achieve decent performance.
	i := 0
	start := time.Now()
	batch := db.index.NewBatch()
	if err := db.onAllBooks(func(book *Book) error {
		batch.Index(book.Id.String(), flatten(book))
		i++
		if i%100 == 0 {
			if err := db.index.Batch(batch); err != nil {
				return err
			}
			batch = db.index.NewBatch()
		}
	}); err != nil {
		return err
	}

	// Flush the batch indexer if not empty.
	if err := db.index.Batch(batch); err != nil {
		return err
	}

	log.Printf("re-indexing complete, %d documents in %g seconds", i, time.Since(start).Seconds())
	return nil
}

// Call a function for all books in the database.
func (db *Database) onAllBooks(f func(*Book) error) error {
	it := db.Scan(BookBucket)
	for it.Next() {
		var book Book
		if err := it.Value(&book); err != nil {
		if err := f(&book); err != nil {
			return err
		}
	return it.Close()
ale's avatar
ale committed
type SearchResult struct {
	Results    []*Book
	NumResults int
}

func (db *Database) doSearch(query blevequery.Query, offset, limit int) (*SearchResult, error) {
ale's avatar
ale committed
	req := bleve.NewSearchRequestOptions(query, limit, offset, false)
	result, err := db.index.Search(req)
	if err != nil {
		return nil, err
	}

	sr := SearchResult{NumResults: int(result.Total)}
	for _, r := range result.Hits {
		if book, err := db.GetBook(ParseID(r.ID)); err == nil {
			sr.Results = append(sr.Results, book)
		}
	}
	return &sr, nil
}

// Search the database with a query string.
func (db *Database) Search(queryStr string, offset, limit int) (*SearchResult, error) {
	return db.doSearch(bleve.NewQueryStringQuery(queryStr), offset, limit)
}

// Autocomplete runs a fuzzy search for a term.
func (db *Database) Suggest(term string) (*SearchResult, error) {
	query := bleve.NewTermQuery(term)
	query.SetField("_suggest")
	return db.doSearch(query, 0, 20)
ale's avatar
ale committed
}

// Find a book matching the given metadata, if possible.
ale's avatar
ale committed
func (db *Database) Find(uniqueIds []string) (*Book, error) {
	var queries []blevequery.Query
	var query blevequery.Query
ale's avatar
ale committed

ale's avatar
ale committed
	for _, u := range uniqueIds {
		q := bleve.NewTermQuery(u)
		q.SetField("_unique")
		queries = append(queries, q)
ale's avatar
ale committed
	}
	if len(queries) > 0 {
		query = bleve.NewDisjunctionQuery(queries...)
ale's avatar
ale committed
	} else {
ale's avatar
ale committed
		query = queries[0]
ale's avatar
ale committed
	}

	search := bleve.NewSearchRequest(query)
	result, err := db.index.Search(search)
	if err != nil {
		return nil, err
	}
ale's avatar
ale committed
	if len(result.Hits) == 0 {
		return nil, errors.New("no matches found")
ale's avatar
ale committed
	}
ale's avatar
ale committed

	return db.GetBook(ParseID(result.Hits[0].ID))
ale's avatar
ale committed
}

func bktToKey(bucket, key []byte) []byte {
	return bytes.Join([][]byte{bucket, key}, []byte{keySeparator})
}

// Input is a full key (including bucket).
func keyToId(key []byte) BookId {
	n := bytes.Index(key, []byte{keySeparator})
	if n < 0 {
		return 0
	}
	return ParseBinaryID(key[n+1:])
ale's avatar
ale committed
}

func keyRange(prefix []byte) ([]byte, []byte) {
	start := make([]byte, len(prefix)+1)
	end := make([]byte, len(prefix)+1)
	copy(start, prefix)
	copy(end, prefix)
	start[len(prefix)] = keySeparator
	end[len(prefix)] = keySeparator + 1
	return start, end
}