package liber import ( "bytes" cryptorand "crypto/rand" "encoding/binary" "encoding/gob" "errors" "fmt" "io" "log" "math/rand" "os" "path/filepath" "strconv" "time" "github.com/blevesearch/bleve/v2" "github.com/blevesearch/bleve/v2/analysis/analyzer/custom" _ "github.com/blevesearch/bleve/v2/analysis/analyzer/keyword" _ "github.com/blevesearch/bleve/v2/analysis/analyzer/simple" _ "github.com/blevesearch/bleve/v2/analysis/analyzer/standard" "github.com/blevesearch/bleve/v2/analysis/lang/en" "github.com/blevesearch/bleve/v2/analysis/token/edgengram" "github.com/blevesearch/bleve/v2/analysis/token/lowercase" "github.com/blevesearch/bleve/v2/analysis/tokenizer/unicode" blevegoleveldb "github.com/blevesearch/bleve/v2/index/upsidedown/store/goleveldb" blevemapping "github.com/blevesearch/bleve/v2/mapping" blevequery "github.com/blevesearch/bleve/v2/search/query" "github.com/syndtr/goleveldb/leveldb" ldbfilter "github.com/syndtr/goleveldb/leveldb/filter" ldbiter "github.com/syndtr/goleveldb/leveldb/iterator" ldbopt "github.com/syndtr/goleveldb/leveldb/opt" ldbutil "github.com/syndtr/goleveldb/leveldb/util" ) var ( BookBucket = []byte("ebook") FileBucket = []byte("file") BookFileBucket = []byte("ebook_file") keySeparator = byte('/') ) type BookId uint64 func (id BookId) String() string { return strconv.FormatUint(uint64(id), 10) } func (id BookId) Key() []byte { var b [8]byte binary.LittleEndian.PutUint64(b[:], uint64(id)) return b[:] } func NewID() BookId { return BookId(rand.Int63()) } func ParseID(s string) BookId { id, _ := strconv.ParseUint(s, 10, 64) return BookId(id) } func ParseBinaryID(b []byte) BookId { return BookId(binary.LittleEndian.Uint64(b)) } type Book struct { Id BookId CoverPath string Metadata *Metadata } func (b *Book) String() string { return fmt.Sprintf("%s (%s)", b.Metadata.String(), b.Id.String()) } type File struct { Path string FileType string Mtime time.Time Size int64 Error bool Id BookId } func (f *File) HasChanged(info os.FileInfo) bool { return !info.ModTime().Equal(f.Mtime) || info.Size() != f.Size } func init() { // Seed the RNG to a random value. var seed int64 binary.Read(cryptorand.Reader, binary.LittleEndian, &seed) rand.Seed(seed) } // The structure that gets actually indexed. type flatBook struct { Title string `json:"title"` Author []string `json:"author"` Description string `json:"description"` ISBN []string `json:"isbn"` Unique []string `json:"_unique"` Suggest []string `json:"_suggest"` } func (f *flatBook) Type() string { return "ebook" } func flatten(book *Book) *flatBook { suggest := []string{book.Metadata.Title} if len(book.Metadata.Creator) > 0 { suggest = append(suggest, book.Metadata.Creator...) } return &flatBook{ Title: book.Metadata.Title, Author: book.Metadata.Creator, Description: book.Metadata.Description, ISBN: book.Metadata.ISBN, Unique: book.Metadata.Uniques(), Suggest: suggest, } } var defaultTextAnalyzer = "standard" func metadataDocumentMapping() *blevemapping.DocumentMapping { md := bleve.NewDocumentStaticMapping() textFieldMapping := bleve.NewTextFieldMapping() textFieldMapping.Store = false textFieldMapping.Analyzer = defaultTextAnalyzer authorFieldMapping := bleve.NewTextFieldMapping() authorFieldMapping.Store = false authorFieldMapping.Analyzer = "simple" keywordFieldMapping := bleve.NewTextFieldMapping() keywordFieldMapping.Store = false keywordFieldMapping.Analyzer = "keyword" keywordFieldMapping.IncludeInAll = false suggestFieldMapping := bleve.NewTextFieldMapping() suggestFieldMapping.Store = false suggestFieldMapping.Analyzer = "edgeNgram" suggestFieldMapping.IncludeTermVectors = false suggestFieldMapping.IncludeInAll = false md.AddFieldMappingsAt("title", textFieldMapping) md.AddFieldMappingsAt("author", authorFieldMapping) md.AddFieldMappingsAt("description", textFieldMapping) md.AddFieldMappingsAt("isbn", keywordFieldMapping) md.AddFieldMappingsAt("_unique", keywordFieldMapping) md.AddFieldMappingsAt("_suggest", suggestFieldMapping) return md } func defaultIndexMapping() blevemapping.IndexMapping { i := bleve.NewIndexMapping() err := i.AddCustomTokenFilter("edgeNgram325", map[string]interface{}{ "type": edgengram.Name, "min": 3.0, "max": 25.0, }) if err != nil { log.Fatal("creating token filter edgeNgram325:", err) } err = i.AddCustomAnalyzer("edgeNgram", map[string]interface{}{ "type": custom.Name, "tokenizer": unicode.Name, "token_filters": []interface{}{ en.PossessiveName, lowercase.Name, en.StopName, "edgeNgram325", }, }) if err != nil { log.Fatal("creating custom analyzer edgeNgram:", err) } i.AddDocumentMapping("ebook", metadataDocumentMapping()) i.DefaultAnalyzer = defaultTextAnalyzer i.DefaultType = "ebook" return i } type Database struct { path string ldb *leveldb.DB index bleve.Index } func NewDb(path string) (*Database, error) { // Make sure that path exists. if _, err := os.Stat(path); err != nil { if err := os.Mkdir(path, 0700); err != nil { return nil, err } } // Initialize our database and the index. d := &Database{path: path} if err := d.setupLevelDb(filepath.Join(path, "db")); err != nil { return nil, err } if err := d.setupIndex(filepath.Join(path, "index")); err != nil { return nil, err } return d, nil } func (db *Database) setupLevelDb(path string) error { // Use 128MB of cache and a small Bloom filter. opts := &ldbopt.Options{ Filter: ldbfilter.NewBloomFilter(10), BlockCacheCapacity: 1 << 28, // 256MB } ldb, err := leveldb.OpenFile(path, opts) if err != nil { return err } db.ldb = ldb return nil } func (db *Database) setupIndex(path string) error { var err error if _, serr := os.Stat(path); serr == nil { db.index, err = bleve.Open(path) } else { // Create a new Bleve index, backed by goleveldb. db.index, err = bleve.NewUsing(path, defaultIndexMapping(), bleve.Config.DefaultIndexType, blevegoleveldb.Name, map[string]interface{}{ "create_if_missing": true, "write_buffer_size": 2 << 25, "lru_cache_capacity": 2 << 27, "bloom_filter_bits_per_key": 10, }) } if err != nil { return err } return nil } var schemaVersionKey = []byte("_liber_schema_version") func (db *Database) getSchemaVersion() uint64 { data, err := db.ldb.Get(schemaVersionKey, nil) if err != nil { return 0 } return binary.LittleEndian.Uint64(data) } func (db *Database) setSchemaVersion(v uint64) { var b [8]byte binary.LittleEndian.PutUint64(b[:], v) db.ldb.Put(schemaVersionKey, b[:], nil) } type databaseMigration struct { version uint64 run func(db *Database) error } func (db *Database) runMigrations(migrations []databaseMigration) error { version := db.getSchemaVersion() for _, m := range migrations { if m.version < version { continue } if err := m.run(db); err != nil { return err } version = m.version db.setSchemaVersion(version) } return nil } func (db *Database) Close() { db.index.Close() db.ldb.Close() } func (db *Database) GetBook(bookid BookId) (*Book, error) { var b Book if err := db.Get(BookBucket, bookid.Key(), &b); err != nil { return nil, err } return &b, nil } func (db *Database) GetFile(path string) (*File, error) { var f File if err := db.Get(FileBucket, []byte(path), &f); err != nil { return nil, err } return &f, nil } func (db *Database) GetBookFiles(bookid BookId) ([]*File, error) { start, end := keyRange(bktToKey(BookFileBucket, bookid.Key())) it := db.ldb.NewIterator( &ldbutil.Range{ Start: start, Limit: end, }, nil, ) defer it.Release() var out []*File for it.Next() { var filepath string if gob.NewDecoder(bytes.NewReader(it.Value())).Decode(&filepath) == nil { if file, err := db.GetFile(filepath); err == nil { out = append(out, file) } } } if err := it.Error(); err != nil { return nil, err } return out, nil } func (db *Database) Get(bucket, key []byte, obj interface{}) error { data, err := db.ldb.Get(bktToKey(bucket, key), nil) if err != nil { return err } return gob.NewDecoder(bytes.NewReader(data)).Decode(obj) } func (db *Database) PutBook(b *Book) error { if err := db.Put(BookBucket, b.Id.Key(), b); err != nil { return err } return db.index.Index(b.Id.String(), flatten(b)) } func fileBookKey(path string, bookid BookId) []byte { return bytes.Join([][]byte{bookid.Key(), []byte(path)}, []byte{keySeparator}) } func (db *Database) PutFile(f *File) error { if err := db.Put(FileBucket, []byte(f.Path), f); err != nil { return err } if !f.Error { return db.Put(BookFileBucket, fileBookKey(f.Path, f.Id), f.Path) } return nil } func (db *Database) RawPut(key, value []byte) error { return db.ldb.Put(key, value, nil) } func (db *Database) Put(bucket, key []byte, obj interface{}) error { var buf bytes.Buffer if err := gob.NewEncoder(&buf).Encode(obj); err != nil { return err } return db.RawPut(bktToKey(bucket, key), buf.Bytes()) } func (db *Database) DeleteBook(bookid BookId) error { db.Delete(BookBucket, bookid.Key()) return db.index.Delete(bookid.String()) } func (db *Database) DeleteFile(path string) error { f, err := db.GetFile(path) if err != nil { return nil } db.Delete(FileBucket, []byte(path)) db.Delete(BookFileBucket, fileBookKey(path, f.Id)) // Delete the book if there are no files left. if files, err := db.GetBookFiles(f.Id); err == nil && len(files) == 0 { db.DeleteBook(f.Id) } return nil } func (db *Database) Delete(bucket, key []byte) error { return db.ldb.Delete(bktToKey(bucket, key), nil) } type DatabaseIterator struct { iter ldbiter.Iterator } func (i *DatabaseIterator) Close() error { defer i.iter.Release() return i.iter.Error() } func (i *DatabaseIterator) Next() bool { return i.iter.Next() } func (i *DatabaseIterator) Id() BookId { return keyToId(i.iter.Key()) } func (i *DatabaseIterator) RawKey() []byte { return i.iter.Key() } func (i *DatabaseIterator) RawValue() []byte { return i.iter.Value() } func (i *DatabaseIterator) Value(obj interface{}) error { return gob.NewDecoder(bytes.NewReader(i.iter.Value())).Decode(obj) } // Scan an entire bucket. func (db *Database) Scan(bucket []byte) *DatabaseIterator { start, end := keyRange(bucket) it := db.ldb.NewIterator(&ldbutil.Range{ Start: start, Limit: end, }, &ldbopt.ReadOptions{ DontFillCache: true, }) return &DatabaseIterator{iter: it} } func writeBytes(w io.Writer, b []byte) error { binary.Write(w, binary.LittleEndian, uint32(len(b))) _, err := w.Write(b) return err } func readBytes(r io.Reader) ([]byte, error) { var szb [4]byte n, err := io.ReadFull(r, szb[:]) if err != nil { return nil, err } if n != 4 { return nil, errors.New("short header read") } sz := binary.LittleEndian.Uint32(szb[:]) b := make([]byte, sz) n, err = io.ReadFull(r, b) if n != int(sz) { return nil, errors.New("short data read") } return b, err } // Dump the contents of the database to a Writer. func (db *Database) Dump(w io.Writer) error { it := db.ldb.NewIterator(nil, &ldbopt.ReadOptions{DontFillCache: true}) defer it.Release() count := 0 for it.Next() { writeBytes(w, it.Key()) writeBytes(w, it.Value()) count++ } log.Printf("dumped %d entries from the database", count) return nil } // Restore a backup to the current database (assuming it is empty). func (db *Database) Restore(r io.Reader) error { count := 0 for { key, err := readBytes(r) if err == io.EOF { break } if err != nil { return err } value, err := readBytes(r) if err == io.EOF { return errors.New("unexpected eof") } if err != nil { return err } db.RawPut(key, value) count++ if count%1000 == 0 { log.Printf("restored %d entries", count) } } log.Printf("restore complete (%d entries)", count) return db.Reindex() } // Reindex the entire database. This is an administrative operation, // to be performed after an incompatible index schema change. It will // delete the existing index and re-create it from scratch. func (db *Database) Reindex() error { // Close the index, delete it, and re-open it. db.index.Close() indexPath := filepath.Join(db.path, "index") if err := os.RemoveAll(indexPath); err != nil { return err } if err := db.setupIndex(indexPath); err != nil { return err } // Scan the database and re-index everything. Use batch // indexing to achieve decent performance. i := 0 start := time.Now() batch := db.index.NewBatch() if err := db.onAllBooks(func(book *Book) error { batch.Index(book.Id.String(), flatten(book)) i++ if i%100 == 0 { if err := db.index.Batch(batch); err != nil { return err } batch = db.index.NewBatch() } return nil }); err != nil { return err } // Flush the batch indexer if not empty. if err := db.index.Batch(batch); err != nil { return err } log.Printf("re-indexing complete, %d documents in %g seconds", i, time.Since(start).Seconds()) return nil } // Call a function for all books in the database. func (db *Database) onAllBooks(f func(*Book) error) error { it := db.Scan(BookBucket) for it.Next() { var book Book if err := it.Value(&book); err != nil { continue } if err := f(&book); err != nil { return err } } return it.Close() } type SearchResult struct { Results []*Book NumResults int } func (db *Database) doSearch(query blevequery.Query, offset, limit int) (*SearchResult, error) { req := bleve.NewSearchRequestOptions(query, limit, offset, false) result, err := db.index.Search(req) if err != nil { return nil, err } sr := SearchResult{NumResults: int(result.Total)} for _, r := range result.Hits { if book, err := db.GetBook(ParseID(r.ID)); err == nil { sr.Results = append(sr.Results, book) } } return &sr, nil } // Search the database with a query string. func (db *Database) Search(queryStr string, offset, limit int) (*SearchResult, error) { return db.doSearch(bleve.NewQueryStringQuery(queryStr), offset, limit) } // Autocomplete runs a fuzzy search for a term. func (db *Database) Suggest(term string) (*SearchResult, error) { query := bleve.NewTermQuery(term) query.SetField("_suggest") return db.doSearch(query, 0, 20) } // Find a book matching the given metadata, if possible. func (db *Database) Find(uniqueIds []string) (*Book, error) { var queries []blevequery.Query var query blevequery.Query for _, u := range uniqueIds { q := bleve.NewTermQuery(u) q.SetField("_unique") queries = append(queries, q) } if len(queries) > 0 { query = bleve.NewDisjunctionQuery(queries...) } else { query = queries[0] } search := bleve.NewSearchRequest(query) result, err := db.index.Search(search) if err != nil { return nil, err } if len(result.Hits) == 0 { return nil, errors.New("no matches found") } return db.GetBook(ParseID(result.Hits[0].ID)) } func bktToKey(bucket, key []byte) []byte { return bytes.Join([][]byte{bucket, key}, []byte{keySeparator}) } // Input is a full key (including bucket). func keyToId(key []byte) BookId { n := bytes.Index(key, []byte{keySeparator}) if n < 0 { return 0 } return ParseBinaryID(key[n+1:]) } func keyRange(prefix []byte) ([]byte, []byte) { start := make([]byte, len(prefix)+1) end := make([]byte, len(prefix)+1) copy(start, prefix) copy(end, prefix) start[len(prefix)] = keySeparator end[len(prefix)] = keySeparator + 1 return start, end }