Skip to content
Snippets Groups Projects
database.go 12.2 KiB
Newer Older
ale's avatar
ale committed
package liber

import (
	"bytes"
ale's avatar
ale committed
	cryptorand "crypto/rand"
ale's avatar
ale committed
	"encoding/binary"
ale's avatar
ale committed
	"errors"
ale's avatar
ale committed
	"fmt"
ale's avatar
ale committed
	"math/rand"
	"os"
	"path/filepath"
	"strconv"
	"time"

ale's avatar
ale committed
	"git.autistici.org/ale/liber/Godeps/_workspace/src/github.com/blevesearch/bleve"
	_ "git.autistici.org/ale/liber/Godeps/_workspace/src/github.com/blevesearch/bleve/analysis/analyzers/custom_analyzer"
	_ "git.autistici.org/ale/liber/Godeps/_workspace/src/github.com/blevesearch/bleve/analysis/analyzers/keyword_analyzer"
	_ "git.autistici.org/ale/liber/Godeps/_workspace/src/github.com/blevesearch/bleve/analysis/analyzers/simple_analyzer"
	_ "git.autistici.org/ale/liber/Godeps/_workspace/src/github.com/blevesearch/bleve/analysis/analyzers/standard_analyzer"
	_ "git.autistici.org/ale/liber/Godeps/_workspace/src/github.com/blevesearch/bleve/analysis/token_filters/edge_ngram_filter"

	"git.autistici.org/ale/liber/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb"
	ldbfilter "git.autistici.org/ale/liber/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/filter"
	ldbiter "git.autistici.org/ale/liber/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/iterator"
	ldbopt "git.autistici.org/ale/liber/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt"
	ldbutil "git.autistici.org/ale/liber/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util"
ale's avatar
ale committed
)

var (
ale's avatar
ale committed
	BookBucket     = []byte("ebook")
	FileBucket     = []byte("file")
	BookFileBucket = []byte("ebook_file")
ale's avatar
ale committed

	keySeparator = byte('/')
)

type BookId uint64

func (id BookId) String() string {
	return strconv.FormatUint(uint64(id), 10)
}

func (id BookId) Key() []byte {
	var buf bytes.Buffer
	binary.Write(&buf, binary.LittleEndian, id)
	return buf.Bytes()
}

ale's avatar
ale committed
type Book struct {
	Id        BookId
	CoverPath string
	Metadata  *Metadata
}

ale's avatar
ale committed
func (b *Book) String() string {
	return fmt.Sprintf("%s (%s)", b.Metadata.String(), b.Id.String())
}

ale's avatar
ale committed
type File struct {
	Path     string
	FileType string
	Mtime    time.Time
	Size     int64
	Error    bool
	Id       BookId
}

func (f *File) HasChanged(info os.FileInfo) bool {
	return !info.ModTime().Equal(f.Mtime) || info.Size() != f.Size
}

ale's avatar
ale committed
func init() {
	// Seed the RNG to a random value.
	var seed int64
	binary.Read(cryptorand.Reader, binary.LittleEndian, &seed)
	rand.Seed(seed)
}

ale's avatar
ale committed
func NewID() BookId {
	return BookId(rand.Int63())
}

func ParseID(s string) BookId {
	id, _ := strconv.ParseUint(s, 10, 64)
	return BookId(id)
}

ale's avatar
ale committed
// The structure that gets actually indexed.
type flatBook struct {
	Title       string   `json:"title"`
	Author      []string `json:"author"`
	Description string   `json:"description"`
	ISBN        []string `json:"isbn"`
	Unique      []string `json:"_unique"`
	Suggest     []string `json:"_suggest"`
ale's avatar
ale committed
}

func (f *flatBook) Type() string {
	return "ebook"
}

func flatten(book *Book) *flatBook {
	suggest := []string{book.Metadata.Title}
	if len(book.Metadata.Creator) > 0 {
		suggest = append(suggest, book.Metadata.Creator...)
	}
ale's avatar
ale committed
	return &flatBook{
		Title:       book.Metadata.Title,
		Author:      book.Metadata.Creator,
		Description: book.Metadata.Description,
		ISBN:        book.Metadata.ISBN,
		Unique:      book.Metadata.Uniques(),
		Suggest:     suggest,
ale's avatar
ale committed
	}
}

var defaultTextAnalyzer = "standard"

ale's avatar
ale committed
func metadataDocumentMapping() *bleve.DocumentMapping {
	md := bleve.NewDocumentMapping()

ale's avatar
ale committed
	textFieldMapping := bleve.NewTextFieldMapping()
	textFieldMapping.Store = false
	textFieldMapping.Analyzer = defaultTextAnalyzer
ale's avatar
ale committed

	authorFieldMapping := bleve.NewTextFieldMapping()
	authorFieldMapping.Store = false
ale's avatar
ale committed
	authorFieldMapping.Analyzer = "simple"
ale's avatar
ale committed

ale's avatar
ale committed
	keywordFieldMapping := bleve.NewTextFieldMapping()
	keywordFieldMapping.Store = false
	keywordFieldMapping.Analyzer = "keyword"
	keywordFieldMapping.IncludeInAll = false
ale's avatar
ale committed

	suggestFieldMapping := bleve.NewTextFieldMapping()
	suggestFieldMapping.Store = false
	suggestFieldMapping.Analyzer = "edgeNgram"
	suggestFieldMapping.IncludeTermVectors = false
	suggestFieldMapping.IncludeInAll = false

ale's avatar
ale committed
	md.AddFieldMappingsAt("title", textFieldMapping)
	md.AddFieldMappingsAt("author", authorFieldMapping)
	md.AddFieldMappingsAt("description", textFieldMapping)
	md.AddFieldMappingsAt("isbn", keywordFieldMapping)
	md.AddFieldMappingsAt("_unique", keywordFieldMapping)
	md.AddFieldMappingsAt("_suggest", suggestFieldMapping)
ale's avatar
ale committed

	return md
}

func defaultIndexMapping() *bleve.IndexMapping {
	i := bleve.NewIndexMapping()

	err := i.AddCustomTokenFilter("edgeNgram325",
		map[string]interface{}{
			"type": "edge_ngram",
			"min":  3.0,
			"max":  25.0,
		})
	if err != nil {
		log.Fatal(err)
	}

	err = i.AddCustomAnalyzer("edgeNgram",
		map[string]interface{}{
			"type":      "custom",
			"tokenizer": "unicode",
			"token_filters": []interface{}{
				"to_lower",
				"stop_en",
				"edgeNgram325",
			},
		})
	if err != nil {
		log.Fatal(err)
	}

ale's avatar
ale committed
	i.AddDocumentMapping("ebook", metadataDocumentMapping())
ale's avatar
ale committed
	i.DefaultAnalyzer = defaultTextAnalyzer
	i.DefaultType = "ebook"
ale's avatar
ale committed
	return i
}

type Database struct {
	path  string
	ldb   *leveldb.DB
ale's avatar
ale committed
	index bleve.Index
}

func NewDb(path string) (*Database, error) {
	// Make sure that path exists.
	if _, err := os.Stat(path); err != nil {
		if err := os.Mkdir(path, 0700); err != nil {
			return nil, err
		}
	}

	// Initialize our database and the index.
	d := &Database{path: path}
ale's avatar
ale committed
	if err := d.setupLevelDb(filepath.Join(path, "db")); err != nil {
		return nil, err
	}
	if err := d.setupIndex(filepath.Join(path, "index")); err != nil {
		return nil, err
	}
	return d, nil
}

func (db *Database) setupLevelDb(path string) error {
ale's avatar
ale committed
	// Use 256MB of cache and a small Bloom filter.
	opts := &ldbopt.Options{
		Filter:             ldbfilter.NewBloomFilter(10),
		BlockCacheCapacity: 2 << 28,
	}

	ldb, err := leveldb.OpenFile(path, opts)
ale's avatar
ale committed
	if err != nil {
		return err
	}
	db.ldb = ldb
ale's avatar
ale committed
	return nil
}

func (db *Database) setupIndex(path string) error {
	var err error
	if _, serr := os.Stat(path); serr == nil {
		db.index, err = bleve.Open(path)
	} else {
		db.index, err = bleve.New(path, defaultIndexMapping())
	}
	if err != nil {
		return err
	}
	return nil
}

func (db *Database) Close() {
	db.index.Close()
	db.ldb.Close()
ale's avatar
ale committed
}

func (db *Database) GetBook(bookid BookId) (*Book, error) {
	var b Book
	if err := db.Get(BookBucket, bookid.Key(), &b); err != nil {
		return nil, err
	}
	return &b, nil
}

func (db *Database) GetFile(path string) (*File, error) {
	var f File
	if err := db.Get(FileBucket, []byte(path), &f); err != nil {
		return nil, err
	}
	return &f, nil
}

ale's avatar
ale committed
func (db *Database) GetBookFiles(bookid BookId) ([]*File, error) {
	start, end := keyRange(bktToKey(BookFileBucket, bookid.Key()))
	it := db.ldb.NewIterator(
		&ldbutil.Range{
			Start: start,
			Limit: end,
		},
		nil,
	)
	defer it.Release()

ale's avatar
ale committed
	var out []*File
	for it.Next() {
ale's avatar
ale committed
		var filepath string
		if gob.NewDecoder(bytes.NewReader(it.Value())).Decode(&filepath) == nil {
ale's avatar
ale committed
			if file, err := db.GetFile(filepath); err == nil {
				out = append(out, file)
			}
		}
	}
	if err := it.Error(); err != nil {
		return nil, err
	}
ale's avatar
ale committed
	return out, nil
}

ale's avatar
ale committed
func (db *Database) Get(bucket, key []byte, obj interface{}) error {
	data, err := db.ldb.Get(bktToKey(bucket, key), nil)
ale's avatar
ale committed
	if err != nil {
		return err
	}
	return gob.NewDecoder(bytes.NewReader(data)).Decode(obj)
ale's avatar
ale committed
}

func (db *Database) PutBook(b *Book) error {
	if err := db.Put(BookBucket, b.Id.Key(), b); err != nil {
		return err
	}
ale's avatar
ale committed
	return db.index.Index(b.Id.String(), flatten(b))
ale's avatar
ale committed
}

ale's avatar
ale committed
func fileBookKey(path string, bookid BookId) []byte {
	return bytes.Join([][]byte{bookid.Key(), []byte(path)}, []byte{keySeparator})
}

ale's avatar
ale committed
func (db *Database) PutFile(f *File) error {
ale's avatar
ale committed
	if err := db.Put(FileBucket, []byte(f.Path), f); err != nil {
		return err
	}
	if !f.Error {
		return db.Put(BookFileBucket, fileBookKey(f.Path, f.Id), f.Path)
	}
	return nil
ale's avatar
ale committed
}

func (db *Database) RawPut(key, value []byte) error {
	return db.ldb.Put(key, value, nil)
ale's avatar
ale committed
func (db *Database) Put(bucket, key []byte, obj interface{}) error {
	var buf bytes.Buffer
	if err := gob.NewEncoder(&buf).Encode(obj); err != nil {
ale's avatar
ale committed
		return err
	}
	return db.RawPut(bktToKey(bucket, key), buf.Bytes())
ale's avatar
ale committed
}

func (db *Database) DeleteBook(bookid BookId) error {
	db.Delete(BookBucket, bookid.Key())
	return db.index.Delete(bookid.String())
}

ale's avatar
ale committed
func (db *Database) DeleteFile(path string) error {
	f, err := db.GetFile(path)
	if err != nil {
		return nil
	}

	db.Delete(FileBucket, []byte(path))
	db.Delete(BookFileBucket, fileBookKey(path, f.Id))

	// Delete the book if there are no files left.
	if files, err := db.GetBookFiles(f.Id); err == nil && len(files) == 0 {
		db.DeleteBook(f.Id)
	}

	return nil
}

ale's avatar
ale committed
func (db *Database) Delete(bucket, key []byte) error {
	return db.ldb.Delete(bktToKey(bucket, key), nil)
ale's avatar
ale committed
}

type DatabaseIterator struct {
	iter ldbiter.Iterator
ale's avatar
ale committed
}

func (i *DatabaseIterator) Close() error {
	defer i.iter.Release()
	return i.iter.Error()
ale's avatar
ale committed
}

func (i *DatabaseIterator) Next() bool {
	return i.iter.Next()
ale's avatar
ale committed
}

func (i *DatabaseIterator) Id() BookId {
	return keyToId(i.iter.Key())
}

func (i *DatabaseIterator) RawKey() []byte {
	return i.iter.Key()
}

func (i *DatabaseIterator) RawValue() []byte {
	return i.iter.Value()
}

ale's avatar
ale committed
func (i *DatabaseIterator) Value(obj interface{}) error {
	return gob.NewDecoder(bytes.NewReader(i.iter.Value())).Decode(obj)
ale's avatar
ale committed
}

// Scan an entire bucket.
func (db *Database) Scan(bucket []byte) *DatabaseIterator {
	start, end := keyRange(bucket)
	it := db.ldb.NewIterator(&ldbutil.Range{
		Start: start,
		Limit: end,
	}, &ldbopt.ReadOptions{
		DontFillCache: true,
	})

	return &DatabaseIterator{iter: it}
ale's avatar
ale committed
}

// Reindex the entire database. This is an administrative operation,
// to be performed after an incompatible index schema change. It will
// delete the existing index and re-create it from scratch.
func (db *Database) Reindex() error {
	// Close the index, delete it, and re-open it.
	db.index.Close()

	indexPath := filepath.Join(db.path, "index")
	if err := os.RemoveAll(indexPath); err != nil {
		return err
	}
	if err := db.setupIndex(indexPath); err != nil {
		return err
	}

	// Scan the database and re-index everything.
	i := db.Scan(BookBucket)
	for i.Next() {
		var book Book
		if err := i.Value(&book); err != nil {
			continue
		}
		db.index.Index(i.Id().String(), flatten(&book))
	}
	return i.Close()
ale's avatar
ale committed
type SearchResult struct {
	Results    []*Book
	NumResults int
}

func (db *Database) doSearch(query bleve.Query, offset, limit int) (*SearchResult, error) {
	req := bleve.NewSearchRequestOptions(query, limit, offset, false)
	result, err := db.index.Search(req)
	if err != nil {
		return nil, err
	}

	sr := SearchResult{NumResults: int(result.Total)}
	for _, r := range result.Hits {
		if book, err := db.GetBook(ParseID(r.ID)); err == nil {
			sr.Results = append(sr.Results, book)
		}
	}
	return &sr, nil
}

// Search the database with a query string.
func (db *Database) Search(queryStr string, offset, limit int) (*SearchResult, error) {
	return db.doSearch(bleve.NewQueryStringQuery(queryStr), offset, limit)
}

// Autocomplete runs a fuzzy search for a term.
func (db *Database) Suggest(term string) (*SearchResult, error) {
	query := bleve.NewTermQuery(term).SetField("_suggest")
	return db.doSearch(query, 0, 20)
ale's avatar
ale committed
}

// Find a book matching the given metadata, if possible.
ale's avatar
ale committed
func (db *Database) Find(uniqueIds []string) (*Book, error) {
	var queries []bleve.Query
ale's avatar
ale committed
	var query bleve.Query

ale's avatar
ale committed
	for _, u := range uniqueIds {
		queries = append(queries, bleve.NewTermQuery(u).SetField("_unique"))
	}
	if len(queries) > 0 {
ale's avatar
ale committed
		query = bleve.NewDisjunctionQuery(queries)
	} else {
ale's avatar
ale committed
		query = queries[0]
ale's avatar
ale committed
	}

	search := bleve.NewSearchRequest(query)
	result, err := db.index.Search(search)
	if err != nil {
		return nil, err
	}
ale's avatar
ale committed
	if len(result.Hits) == 0 {
		return nil, errors.New("no matches found")
ale's avatar
ale committed
	}
ale's avatar
ale committed

	return db.GetBook(ParseID(result.Hits[0].ID))
ale's avatar
ale committed
}

func bktToKey(bucket, key []byte) []byte {
	return bytes.Join([][]byte{bucket, key}, []byte{keySeparator})
}

// Input is a full key (including bucket).
func keyToId(key []byte) BookId {
	n := bytes.Index(key, []byte{keySeparator})
	if n < 0 {
		return 0
	}

	var id uint64
	binary.Read(bytes.NewReader(key[n+1:]), binary.LittleEndian, &id)
	return BookId(id)
}

func keyRange(prefix []byte) ([]byte, []byte) {
	start := make([]byte, len(prefix)+1)
	end := make([]byte, len(prefix)+1)
	copy(start, prefix)
	copy(end, prefix)
	start[len(prefix)] = keySeparator
	end[len(prefix)] = keySeparator + 1
	return start, end
}