Newer
Older
"encoding/gob"
"github.com/blevesearch/bleve/v2"
"github.com/blevesearch/bleve/v2/analysis/analyzer/custom"
_ "github.com/blevesearch/bleve/v2/analysis/analyzer/keyword"
_ "github.com/blevesearch/bleve/v2/analysis/analyzer/simple"
_ "github.com/blevesearch/bleve/v2/analysis/analyzer/standard"
"github.com/blevesearch/bleve/v2/analysis/lang/en"
"github.com/blevesearch/bleve/v2/analysis/token/edgengram"
"github.com/blevesearch/bleve/v2/analysis/token/lowercase"
"github.com/blevesearch/bleve/v2/analysis/tokenizer/unicode"
blevegoleveldb "github.com/blevesearch/bleve/v2/index/upsidedown/store/goleveldb"
blevemapping "github.com/blevesearch/bleve/v2/mapping"
blevequery "github.com/blevesearch/bleve/v2/search/query"
"github.com/syndtr/goleveldb/leveldb"
ldbfilter "github.com/syndtr/goleveldb/leveldb/filter"
ldbiter "github.com/syndtr/goleveldb/leveldb/iterator"
ldbopt "github.com/syndtr/goleveldb/leveldb/opt"
ldbutil "github.com/syndtr/goleveldb/leveldb/util"
BookBucket = []byte("ebook")
FileBucket = []byte("file")
BookFileBucket = []byte("ebook_file")
keySeparator = byte('/')
)
type BookId uint64
func (id BookId) String() string {
return strconv.FormatUint(uint64(id), 10)
}
func (id BookId) Key() []byte {
var b [8]byte
binary.LittleEndian.PutUint64(b[:], uint64(id))
return b[:]
}
func NewID() BookId {
return BookId(rand.Int63())
}
func ParseID(s string) BookId {
id, _ := strconv.ParseUint(s, 10, 64)
return BookId(id)
}
func ParseBinaryID(b []byte) BookId {
return BookId(binary.LittleEndian.Uint64(b))
type Book struct {
Id BookId
CoverPath string
Metadata *Metadata
}
func (b *Book) String() string {
return fmt.Sprintf("%s (%s)", b.Metadata.String(), b.Id.String())
}
type File struct {
Path string
FileType string
Mtime time.Time
Size int64
Error bool
Id BookId
}
func (f *File) HasChanged(info os.FileInfo) bool {
return !info.ModTime().Equal(f.Mtime) || info.Size() != f.Size
}
func init() {
// Seed the RNG to a random value.
var seed int64
binary.Read(cryptorand.Reader, binary.LittleEndian, &seed)
rand.Seed(seed)
}
// The structure that gets actually indexed.
type flatBook struct {
Title string `json:"title"`
Author []string `json:"author"`
Description string `json:"description"`
ISBN []string `json:"isbn"`
Unique []string `json:"_unique"`
Suggest []string `json:"_suggest"`
}
func (f *flatBook) Type() string {
return "ebook"
}
func flatten(book *Book) *flatBook {
suggest := []string{book.Metadata.Title}
if len(book.Metadata.Creator) > 0 {
suggest = append(suggest, book.Metadata.Creator...)
}
return &flatBook{
Title: book.Metadata.Title,
Author: book.Metadata.Creator,
Description: book.Metadata.Description,
ISBN: book.Metadata.ISBN,
Unique: book.Metadata.Uniques(),
func metadataDocumentMapping() *blevemapping.DocumentMapping {
md := bleve.NewDocumentStaticMapping()
textFieldMapping := bleve.NewTextFieldMapping()
textFieldMapping.Store = false
textFieldMapping.Analyzer = defaultTextAnalyzer
authorFieldMapping := bleve.NewTextFieldMapping()
authorFieldMapping.Store = false
keywordFieldMapping := bleve.NewTextFieldMapping()
keywordFieldMapping.Store = false
keywordFieldMapping.Analyzer = "keyword"
keywordFieldMapping.IncludeInAll = false
suggestFieldMapping := bleve.NewTextFieldMapping()
suggestFieldMapping.Store = false
suggestFieldMapping.Analyzer = "edgeNgram"
suggestFieldMapping.IncludeTermVectors = false
suggestFieldMapping.IncludeInAll = false
md.AddFieldMappingsAt("title", textFieldMapping)
md.AddFieldMappingsAt("author", authorFieldMapping)
md.AddFieldMappingsAt("description", textFieldMapping)
md.AddFieldMappingsAt("isbn", keywordFieldMapping)
md.AddFieldMappingsAt("_unique", keywordFieldMapping)
md.AddFieldMappingsAt("_suggest", suggestFieldMapping)
func defaultIndexMapping() blevemapping.IndexMapping {
err := i.AddCustomTokenFilter("edgeNgram325",
map[string]interface{}{
"min": 3.0,
"max": 25.0,
})
if err != nil {
log.Fatal("creating token filter edgeNgram325:", err)
}
err = i.AddCustomAnalyzer("edgeNgram",
map[string]interface{}{
"type": custom.Name,
"tokenizer": unicode.Name,
"token_filters": []interface{}{
en.PossessiveName,
lowercase.Name,
en.StopName,
"edgeNgram325",
},
})
if err != nil {
log.Fatal("creating custom analyzer edgeNgram:", err)
i.DefaultAnalyzer = defaultTextAnalyzer
i.DefaultType = "ebook"
index bleve.Index
}
func NewDb(path string) (*Database, error) {
// Make sure that path exists.
if _, err := os.Stat(path); err != nil {
if err := os.Mkdir(path, 0700); err != nil {
return nil, err
}
}
// Initialize our database and the index.
if err := d.setupLevelDb(filepath.Join(path, "db")); err != nil {
return nil, err
}
if err := d.setupIndex(filepath.Join(path, "index")); err != nil {
return nil, err
}
return d, nil
}
func (db *Database) setupLevelDb(path string) error {
// Use 128MB of cache and a small Bloom filter.
opts := &ldbopt.Options{
Filter: ldbfilter.NewBloomFilter(10),
}
ldb, err := leveldb.OpenFile(path, opts)
return nil
}
func (db *Database) setupIndex(path string) error {
var err error
if _, serr := os.Stat(path); serr == nil {
db.index, err = bleve.Open(path)
} else {
// Create a new Bleve index, backed by goleveldb.
db.index, err = bleve.NewUsing(path, defaultIndexMapping(), bleve.Config.DefaultIndexType, blevegoleveldb.Name, map[string]interface{}{
"create_if_missing": true,
"write_buffer_size": 2 << 25,
"lru_cache_capacity": 2 << 27,
"bloom_filter_bits_per_key": 10,
})
}
if err != nil {
return err
}
return nil
}
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
var schemaVersionKey = []byte("_liber_schema_version")
func (db *Database) getSchemaVersion() uint64 {
data, err := db.ldb.Get(schemaVersionKey, nil)
if err != nil {
return 0
}
return binary.LittleEndian.Uint64(data)
}
func (db *Database) setSchemaVersion(v uint64) {
var b [8]byte
binary.LittleEndian.PutUint64(b[:], v)
db.ldb.Put(schemaVersionKey, b[:], nil)
}
type databaseMigration struct {
version uint64
run func(db *Database) error
}
func (db *Database) runMigrations(migrations []databaseMigration) error {
version := db.getSchemaVersion()
for _, m := range migrations {
if m.version < version {
continue
}
if err := m.run(db); err != nil {
return err
}
version = m.version
db.setSchemaVersion(version)
}
return nil
}
}
func (db *Database) GetBook(bookid BookId) (*Book, error) {
var b Book
if err := db.Get(BookBucket, bookid.Key(), &b); err != nil {
return nil, err
}
return &b, nil
}
func (db *Database) GetFile(path string) (*File, error) {
var f File
if err := db.Get(FileBucket, []byte(path), &f); err != nil {
return nil, err
}
return &f, nil
}
func (db *Database) GetBookFiles(bookid BookId) ([]*File, error) {
start, end := keyRange(bktToKey(BookFileBucket, bookid.Key()))
it := db.ldb.NewIterator(
&ldbutil.Range{
Start: start,
Limit: end,
},
nil,
)
defer it.Release()
if gob.NewDecoder(bytes.NewReader(it.Value())).Decode(&filepath) == nil {
if file, err := db.GetFile(filepath); err == nil {
out = append(out, file)
}
}
}
if err := it.Error(); err != nil {
return nil, err
}
func (db *Database) Get(bucket, key []byte, obj interface{}) error {
data, err := db.ldb.Get(bktToKey(bucket, key), nil)
return gob.NewDecoder(bytes.NewReader(data)).Decode(obj)
}
func (db *Database) PutBook(b *Book) error {
if err := db.Put(BookBucket, b.Id.Key(), b); err != nil {
return err
}
func fileBookKey(path string, bookid BookId) []byte {
return bytes.Join([][]byte{bookid.Key(), []byte(path)}, []byte{keySeparator})
}
if err := db.Put(FileBucket, []byte(f.Path), f); err != nil {
return err
}
if !f.Error {
return db.Put(BookFileBucket, fileBookKey(f.Path, f.Id), f.Path)
}
return nil
func (db *Database) RawPut(key, value []byte) error {
func (db *Database) Put(bucket, key []byte, obj interface{}) error {
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(obj); err != nil {
return db.RawPut(bktToKey(bucket, key), buf.Bytes())
}
func (db *Database) DeleteBook(bookid BookId) error {
db.Delete(BookBucket, bookid.Key())
return db.index.Delete(bookid.String())
}
func (db *Database) DeleteFile(path string) error {
f, err := db.GetFile(path)
if err != nil {
return nil
}
db.Delete(FileBucket, []byte(path))
db.Delete(BookFileBucket, fileBookKey(path, f.Id))
// Delete the book if there are no files left.
if files, err := db.GetBookFiles(f.Id); err == nil && len(files) == 0 {
db.DeleteBook(f.Id)
}
return nil
}
return db.ldb.Delete(bktToKey(bucket, key), nil)
func (i *DatabaseIterator) Close() error {
defer i.iter.Release()
return i.iter.Error()
func (i *DatabaseIterator) Next() bool {
return i.iter.Next()
}
func (i *DatabaseIterator) Id() BookId {
return keyToId(i.iter.Key())
}
func (i *DatabaseIterator) RawKey() []byte {
return i.iter.Key()
}
func (i *DatabaseIterator) RawValue() []byte {
return i.iter.Value()
}
return gob.NewDecoder(bytes.NewReader(i.iter.Value())).Decode(obj)
}
// Scan an entire bucket.
func (db *Database) Scan(bucket []byte) *DatabaseIterator {
start, end := keyRange(bucket)
it := db.ldb.NewIterator(&ldbutil.Range{
Start: start,
Limit: end,
}, &ldbopt.ReadOptions{
DontFillCache: true,
})
return &DatabaseIterator{iter: it}
func writeBytes(w io.Writer, b []byte) error {
binary.Write(w, binary.LittleEndian, uint32(len(b)))
_, err := w.Write(b)
return err
}
func readBytes(r io.Reader) ([]byte, error) {
var szb [4]byte
n, err := io.ReadFull(r, szb[:])
if err != nil {
if n != 4 {
return nil, errors.New("short header read")
}
sz := binary.LittleEndian.Uint32(szb[:])
n, err = io.ReadFull(r, b)
if n != int(sz) {
return nil, errors.New("short data read")
}
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
return b, err
}
// Dump the contents of the database to a Writer.
func (db *Database) Dump(w io.Writer) error {
it := db.ldb.NewIterator(nil, &ldbopt.ReadOptions{DontFillCache: true})
defer it.Release()
count := 0
for it.Next() {
writeBytes(w, it.Key())
writeBytes(w, it.Value())
count++
}
log.Printf("dumped %d entries from the database", count)
return nil
}
// Restore a backup to the current database (assuming it is empty).
func (db *Database) Restore(r io.Reader) error {
count := 0
for {
key, err := readBytes(r)
if err == io.EOF {
break
}
if err != nil {
return err
}
value, err := readBytes(r)
if err == io.EOF {
return errors.New("unexpected eof")
}
if err != nil {
return err
}
if count%1000 == 0 {
log.Printf("restored %d entries", count)
}
log.Printf("restore complete (%d entries)", count)
// Reindex the entire database. This is an administrative operation,
// to be performed after an incompatible index schema change. It will
// delete the existing index and re-create it from scratch.
func (db *Database) Reindex() error {
// Close the index, delete it, and re-open it.
db.index.Close()
indexPath := filepath.Join(db.path, "index")
if err := os.RemoveAll(indexPath); err != nil {
return err
}
if err := db.setupIndex(indexPath); err != nil {
return err
}
// Scan the database and re-index everything. Use batch
// indexing to achieve decent performance.
i := 0
start := time.Now()
batch := db.index.NewBatch()
if err := db.onAllBooks(func(book *Book) error {
batch.Index(book.Id.String(), flatten(book))
i++
if i%100 == 0 {
if err := db.index.Batch(batch); err != nil {
return err
}
batch = db.index.NewBatch()
}
}); err != nil {
return err
}
// Flush the batch indexer if not empty.
if err := db.index.Batch(batch); err != nil {
return err
}
log.Printf("re-indexing complete, %d documents in %g seconds", i, time.Since(start).Seconds())
return nil
}
// Call a function for all books in the database.
func (db *Database) onAllBooks(f func(*Book) error) error {
it := db.Scan(BookBucket)
for it.Next() {
if err := it.Value(&book); err != nil {
if err := f(&book); err != nil {
return err
}
type SearchResult struct {
Results []*Book
NumResults int
}
func (db *Database) doSearch(query blevequery.Query, offset, limit int) (*SearchResult, error) {
req := bleve.NewSearchRequestOptions(query, limit, offset, false)
result, err := db.index.Search(req)
if err != nil {
return nil, err
}
sr := SearchResult{NumResults: int(result.Total)}
for _, r := range result.Hits {
if book, err := db.GetBook(ParseID(r.ID)); err == nil {
sr.Results = append(sr.Results, book)
}
}
return &sr, nil
}
// Search the database with a query string.
func (db *Database) Search(queryStr string, offset, limit int) (*SearchResult, error) {
return db.doSearch(bleve.NewQueryStringQuery(queryStr), offset, limit)
}
// Autocomplete runs a fuzzy search for a term.
func (db *Database) Suggest(term string) (*SearchResult, error) {
query := bleve.NewTermQuery(term)
query.SetField("_suggest")
return db.doSearch(query, 0, 20)
func (db *Database) Find(uniqueIds []string) (*Book, error) {
var queries []blevequery.Query
var query blevequery.Query
q := bleve.NewTermQuery(u)
q.SetField("_unique")
queries = append(queries, q)
query = bleve.NewDisjunctionQuery(queries...)
}
search := bleve.NewSearchRequest(query)
result, err := db.index.Search(search)
if err != nil {
return nil, err
}
if len(result.Hits) == 0 {
return nil, errors.New("no matches found")
}
func bktToKey(bucket, key []byte) []byte {
return bytes.Join([][]byte{bucket, key}, []byte{keySeparator})
}
// Input is a full key (including bucket).
func keyToId(key []byte) BookId {
n := bytes.Index(key, []byte{keySeparator})
if n < 0 {
return 0
}
}
func keyRange(prefix []byte) ([]byte, []byte) {
start := make([]byte, len(prefix)+1)
end := make([]byte, len(prefix)+1)
copy(start, prefix)
copy(end, prefix)
start[len(prefix)] = keySeparator
end[len(prefix)] = keySeparator + 1
return start, end
}