database.go 13.7 KB
Newer Older
ale's avatar
ale committed
1 2 3 4
package liber

import (
	"bytes"
ale's avatar
ale committed
5
	cryptorand "crypto/rand"
ale's avatar
ale committed
6
	"encoding/binary"
7
	"encoding/gob"
ale's avatar
ale committed
8
	"errors"
ale's avatar
ale committed
9
	"fmt"
10
	"io"
11
	"log"
ale's avatar
ale committed
12 13 14 15 16 17
	"math/rand"
	"os"
	"path/filepath"
	"strconv"
	"time"

ale's avatar
ale committed
18 19 20 21 22 23
	"git.autistici.org/ale/liber/Godeps/_workspace/src/github.com/blevesearch/bleve"
	_ "git.autistici.org/ale/liber/Godeps/_workspace/src/github.com/blevesearch/bleve/analysis/analyzers/custom_analyzer"
	_ "git.autistici.org/ale/liber/Godeps/_workspace/src/github.com/blevesearch/bleve/analysis/analyzers/keyword_analyzer"
	_ "git.autistici.org/ale/liber/Godeps/_workspace/src/github.com/blevesearch/bleve/analysis/analyzers/simple_analyzer"
	_ "git.autistici.org/ale/liber/Godeps/_workspace/src/github.com/blevesearch/bleve/analysis/analyzers/standard_analyzer"
	_ "git.autistici.org/ale/liber/Godeps/_workspace/src/github.com/blevesearch/bleve/analysis/token_filters/edge_ngram_filter"
ale's avatar
ale committed
24
	blevegoleveldb "git.autistici.org/ale/liber/Godeps/_workspace/src/github.com/blevesearch/bleve/index/store/goleveldb"
25 26 27 28 29 30

	"git.autistici.org/ale/liber/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb"
	ldbfilter "git.autistici.org/ale/liber/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/filter"
	ldbiter "git.autistici.org/ale/liber/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/iterator"
	ldbopt "git.autistici.org/ale/liber/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt"
	ldbutil "git.autistici.org/ale/liber/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util"
ale's avatar
ale committed
31 32 33
)

var (
ale's avatar
ale committed
34 35 36
	BookBucket     = []byte("ebook")
	FileBucket     = []byte("file")
	BookFileBucket = []byte("ebook_file")
ale's avatar
ale committed
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52

	keySeparator = byte('/')
)

type BookId uint64

func (id BookId) String() string {
	return strconv.FormatUint(uint64(id), 10)
}

func (id BookId) Key() []byte {
	var buf bytes.Buffer
	binary.Write(&buf, binary.LittleEndian, id)
	return buf.Bytes()
}

ale's avatar
ale committed
53 54 55 56 57 58
type Book struct {
	Id        BookId
	CoverPath string
	Metadata  *Metadata
}

ale's avatar
ale committed
59 60 61 62
func (b *Book) String() string {
	return fmt.Sprintf("%s (%s)", b.Metadata.String(), b.Id.String())
}

ale's avatar
ale committed
63 64 65 66 67 68 69 70 71 72 73 74 75
type File struct {
	Path     string
	FileType string
	Mtime    time.Time
	Size     int64
	Error    bool
	Id       BookId
}

func (f *File) HasChanged(info os.FileInfo) bool {
	return !info.ModTime().Equal(f.Mtime) || info.Size() != f.Size
}

ale's avatar
ale committed
76 77 78 79 80 81 82
func init() {
	// Seed the RNG to a random value.
	var seed int64
	binary.Read(cryptorand.Reader, binary.LittleEndian, &seed)
	rand.Seed(seed)
}

ale's avatar
ale committed
83 84 85 86 87 88 89 90 91
func NewID() BookId {
	return BookId(rand.Int63())
}

func ParseID(s string) BookId {
	id, _ := strconv.ParseUint(s, 10, 64)
	return BookId(id)
}

ale's avatar
ale committed
92 93 94 95 96 97 98
// The structure that gets actually indexed.
type flatBook struct {
	Title       string   `json:"title"`
	Author      []string `json:"author"`
	Description string   `json:"description"`
	ISBN        []string `json:"isbn"`
	Unique      []string `json:"_unique"`
99
	Suggest     []string `json:"_suggest"`
ale's avatar
ale committed
100 101 102 103 104 105 106
}

func (f *flatBook) Type() string {
	return "ebook"
}

func flatten(book *Book) *flatBook {
107 108 109 110
	suggest := []string{book.Metadata.Title}
	if len(book.Metadata.Creator) > 0 {
		suggest = append(suggest, book.Metadata.Creator...)
	}
ale's avatar
ale committed
111 112 113 114 115 116
	return &flatBook{
		Title:       book.Metadata.Title,
		Author:      book.Metadata.Creator,
		Description: book.Metadata.Description,
		ISBN:        book.Metadata.ISBN,
		Unique:      book.Metadata.Uniques(),
117
		Suggest:     suggest,
ale's avatar
ale committed
118 119 120 121 122
	}
}

var defaultTextAnalyzer = "standard"

ale's avatar
ale committed
123 124 125
func metadataDocumentMapping() *bleve.DocumentMapping {
	md := bleve.NewDocumentMapping()

ale's avatar
ale committed
126 127 128
	textFieldMapping := bleve.NewTextFieldMapping()
	textFieldMapping.Store = false
	textFieldMapping.Analyzer = defaultTextAnalyzer
ale's avatar
ale committed
129 130 131

	authorFieldMapping := bleve.NewTextFieldMapping()
	authorFieldMapping.Store = false
ale's avatar
ale committed
132
	authorFieldMapping.Analyzer = "simple"
ale's avatar
ale committed
133

ale's avatar
ale committed
134 135 136 137
	keywordFieldMapping := bleve.NewTextFieldMapping()
	keywordFieldMapping.Store = false
	keywordFieldMapping.Analyzer = "keyword"
	keywordFieldMapping.IncludeInAll = false
ale's avatar
ale committed
138

139 140 141 142 143 144
	suggestFieldMapping := bleve.NewTextFieldMapping()
	suggestFieldMapping.Store = false
	suggestFieldMapping.Analyzer = "edgeNgram"
	suggestFieldMapping.IncludeTermVectors = false
	suggestFieldMapping.IncludeInAll = false

ale's avatar
ale committed
145 146 147 148 149
	md.AddFieldMappingsAt("title", textFieldMapping)
	md.AddFieldMappingsAt("author", authorFieldMapping)
	md.AddFieldMappingsAt("description", textFieldMapping)
	md.AddFieldMappingsAt("isbn", keywordFieldMapping)
	md.AddFieldMappingsAt("_unique", keywordFieldMapping)
150
	md.AddFieldMappingsAt("_suggest", suggestFieldMapping)
ale's avatar
ale committed
151 152 153 154 155 156

	return md
}

func defaultIndexMapping() *bleve.IndexMapping {
	i := bleve.NewIndexMapping()
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171

	err := i.AddCustomTokenFilter("edgeNgram325",
		map[string]interface{}{
			"type": "edge_ngram",
			"min":  3.0,
			"max":  25.0,
		})
	if err != nil {
		log.Fatal(err)
	}

	err = i.AddCustomAnalyzer("edgeNgram",
		map[string]interface{}{
			"type":      "custom",
			"tokenizer": "unicode",
172
			"token_filters": []interface{}{
173 174 175 176 177 178 179 180 181
				"to_lower",
				"stop_en",
				"edgeNgram325",
			},
		})
	if err != nil {
		log.Fatal(err)
	}

ale's avatar
ale committed
182
	i.AddDocumentMapping("ebook", metadataDocumentMapping())
183

ale's avatar
ale committed
184 185
	i.DefaultAnalyzer = defaultTextAnalyzer
	i.DefaultType = "ebook"
ale's avatar
ale committed
186 187 188 189
	return i
}

type Database struct {
190 191
	path  string
	ldb   *leveldb.DB
ale's avatar
ale committed
192 193 194 195 196 197 198 199 200 201 202 203
	index bleve.Index
}

func NewDb(path string) (*Database, error) {
	// Make sure that path exists.
	if _, err := os.Stat(path); err != nil {
		if err := os.Mkdir(path, 0700); err != nil {
			return nil, err
		}
	}

	// Initialize our database and the index.
204
	d := &Database{path: path}
ale's avatar
ale committed
205 206 207 208 209 210 211 212 213 214
	if err := d.setupLevelDb(filepath.Join(path, "db")); err != nil {
		return nil, err
	}
	if err := d.setupIndex(filepath.Join(path, "index")); err != nil {
		return nil, err
	}
	return d, nil
}

func (db *Database) setupLevelDb(path string) error {
ale's avatar
ale committed
215
	// Use 128MB of cache and a small Bloom filter.
216 217
	opts := &ldbopt.Options{
		Filter:             ldbfilter.NewBloomFilter(10),
ale's avatar
ale committed
218
		BlockCacheCapacity: 2 << 27,
219 220 221
	}

	ldb, err := leveldb.OpenFile(path, opts)
ale's avatar
ale committed
222 223 224
	if err != nil {
		return err
	}
225
	db.ldb = ldb
ale's avatar
ale committed
226 227 228 229 230 231 232 233
	return nil
}

func (db *Database) setupIndex(path string) error {
	var err error
	if _, serr := os.Stat(path); serr == nil {
		db.index, err = bleve.Open(path)
	} else {
ale's avatar
ale committed
234 235 236 237 238 239 240
		// Create a new Bleve index, backed by goleveldb.
		db.index, err = bleve.NewUsing(path, defaultIndexMapping(), bleve.Config.DefaultIndexType, blevegoleveldb.Name, map[string]interface{}{
			"create_if_missing":         true,
			"write_buffer_size":         2 << 25,
			"lru_cache_capacity":        2 << 27,
			"bloom_filter_bits_per_key": 10,
		})
ale's avatar
ale committed
241 242 243 244 245 246 247 248 249
	}
	if err != nil {
		return err
	}
	return nil
}

func (db *Database) Close() {
	db.index.Close()
250
	db.ldb.Close()
ale's avatar
ale committed
251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268
}

func (db *Database) GetBook(bookid BookId) (*Book, error) {
	var b Book
	if err := db.Get(BookBucket, bookid.Key(), &b); err != nil {
		return nil, err
	}
	return &b, nil
}

func (db *Database) GetFile(path string) (*File, error) {
	var f File
	if err := db.Get(FileBucket, []byte(path), &f); err != nil {
		return nil, err
	}
	return &f, nil
}

ale's avatar
ale committed
269 270
func (db *Database) GetBookFiles(bookid BookId) ([]*File, error) {
	start, end := keyRange(bktToKey(BookFileBucket, bookid.Key()))
271 272 273 274 275 276 277 278 279
	it := db.ldb.NewIterator(
		&ldbutil.Range{
			Start: start,
			Limit: end,
		},
		nil,
	)
	defer it.Release()

ale's avatar
ale committed
280
	var out []*File
281
	for it.Next() {
ale's avatar
ale committed
282
		var filepath string
283
		if gob.NewDecoder(bytes.NewReader(it.Value())).Decode(&filepath) == nil {
ale's avatar
ale committed
284 285 286 287 288
			if file, err := db.GetFile(filepath); err == nil {
				out = append(out, file)
			}
		}
	}
289 290 291
	if err := it.Error(); err != nil {
		return nil, err
	}
ale's avatar
ale committed
292 293 294
	return out, nil
}

ale's avatar
ale committed
295
func (db *Database) Get(bucket, key []byte, obj interface{}) error {
296
	data, err := db.ldb.Get(bktToKey(bucket, key), nil)
ale's avatar
ale committed
297 298 299
	if err != nil {
		return err
	}
300
	return gob.NewDecoder(bytes.NewReader(data)).Decode(obj)
ale's avatar
ale committed
301 302 303 304 305 306
}

func (db *Database) PutBook(b *Book) error {
	if err := db.Put(BookBucket, b.Id.Key(), b); err != nil {
		return err
	}
ale's avatar
ale committed
307
	return db.index.Index(b.Id.String(), flatten(b))
ale's avatar
ale committed
308 309
}

ale's avatar
ale committed
310 311 312 313
func fileBookKey(path string, bookid BookId) []byte {
	return bytes.Join([][]byte{bookid.Key(), []byte(path)}, []byte{keySeparator})
}

ale's avatar
ale committed
314
func (db *Database) PutFile(f *File) error {
ale's avatar
ale committed
315 316 317 318 319 320 321
	if err := db.Put(FileBucket, []byte(f.Path), f); err != nil {
		return err
	}
	if !f.Error {
		return db.Put(BookFileBucket, fileBookKey(f.Path, f.Id), f.Path)
	}
	return nil
ale's avatar
ale committed
322 323
}

ale's avatar
ale committed
324
func (db *Database) RawPut(key, value []byte) error {
325
	return db.ldb.Put(key, value, nil)
ale's avatar
ale committed
326 327
}

ale's avatar
ale committed
328
func (db *Database) Put(bucket, key []byte, obj interface{}) error {
329 330
	var buf bytes.Buffer
	if err := gob.NewEncoder(&buf).Encode(obj); err != nil {
ale's avatar
ale committed
331 332
		return err
	}
ale's avatar
ale committed
333
	return db.RawPut(bktToKey(bucket, key), buf.Bytes())
ale's avatar
ale committed
334 335 336 337 338 339 340
}

func (db *Database) DeleteBook(bookid BookId) error {
	db.Delete(BookBucket, bookid.Key())
	return db.index.Delete(bookid.String())
}

ale's avatar
ale committed
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357
func (db *Database) DeleteFile(path string) error {
	f, err := db.GetFile(path)
	if err != nil {
		return nil
	}

	db.Delete(FileBucket, []byte(path))
	db.Delete(BookFileBucket, fileBookKey(path, f.Id))

	// Delete the book if there are no files left.
	if files, err := db.GetBookFiles(f.Id); err == nil && len(files) == 0 {
		db.DeleteBook(f.Id)
	}

	return nil
}

ale's avatar
ale committed
358
func (db *Database) Delete(bucket, key []byte) error {
359
	return db.ldb.Delete(bktToKey(bucket, key), nil)
ale's avatar
ale committed
360 361 362
}

type DatabaseIterator struct {
363
	iter ldbiter.Iterator
ale's avatar
ale committed
364 365
}

366 367 368
func (i *DatabaseIterator) Close() error {
	defer i.iter.Release()
	return i.iter.Error()
ale's avatar
ale committed
369 370
}

371 372
func (i *DatabaseIterator) Next() bool {
	return i.iter.Next()
ale's avatar
ale committed
373 374 375 376 377 378
}

func (i *DatabaseIterator) Id() BookId {
	return keyToId(i.iter.Key())
}

ale's avatar
ale committed
379 380 381 382 383 384 385 386
func (i *DatabaseIterator) RawKey() []byte {
	return i.iter.Key()
}

func (i *DatabaseIterator) RawValue() []byte {
	return i.iter.Value()
}

ale's avatar
ale committed
387
func (i *DatabaseIterator) Value(obj interface{}) error {
388
	return gob.NewDecoder(bytes.NewReader(i.iter.Value())).Decode(obj)
ale's avatar
ale committed
389 390 391 392 393
}

// Scan an entire bucket.
func (db *Database) Scan(bucket []byte) *DatabaseIterator {
	start, end := keyRange(bucket)
394 395 396 397 398 399 400 401
	it := db.ldb.NewIterator(&ldbutil.Range{
		Start: start,
		Limit: end,
	}, &ldbopt.ReadOptions{
		DontFillCache: true,
	})

	return &DatabaseIterator{iter: it}
ale's avatar
ale committed
402 403
}

404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458
func writeBytes(w io.Writer, b []byte) error {
	binary.Write(w, binary.LittleEndian, uint32(len(b)))
	_, err := w.Write(b)
	return err
}

func readBytes(r io.Reader) ([]byte, error) {
	var sz uint32
	if err := binary.Read(r, binary.LittleEndian, &sz); err != nil {
		return nil, err
	}
	b := make([]byte, sz)
	_, err := r.Read(b)
	return b, err
}

// Dump the contents of the database to a Writer.
func (db *Database) Dump(w io.Writer) error {
	it := db.ldb.NewIterator(nil, &ldbopt.ReadOptions{DontFillCache: true})
	defer it.Release()
	count := 0
	for it.Next() {
		writeBytes(w, it.Key())
		writeBytes(w, it.Value())
		count++
	}
	log.Printf("dumped %d entries from the database", count)
	return nil
}

// Restore a backup to the current database (assuming it is empty).
func (db *Database) Restore(r io.Reader) error {
	count := 0
	for {
		key, err := readBytes(r)
		if err == io.EOF {
			break
		}
		if err != nil {
			return err
		}
		value, err := readBytes(r)
		if err == io.EOF {
			return errors.New("unexpected eof")
		}
		if err != nil {
			return err
		}
		db.RawPut(key, value)
		count++
	}
	log.Printf("restored %d entries to the database", count)
	return db.Reindex()
}

459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474
// Reindex the entire database. This is an administrative operation,
// to be performed after an incompatible index schema change. It will
// delete the existing index and re-create it from scratch.
func (db *Database) Reindex() error {
	// Close the index, delete it, and re-open it.
	db.index.Close()

	indexPath := filepath.Join(db.path, "index")
	if err := os.RemoveAll(indexPath); err != nil {
		return err
	}
	if err := db.setupIndex(indexPath); err != nil {
		return err
	}

	// Scan the database and re-index everything.
ale's avatar
ale committed
475
	i := db.Scan(BookBucket)
476
	for i.Next() {
477 478 479 480 481 482
		var book Book
		if err := i.Value(&book); err != nil {
			continue
		}
		db.index.Index(i.Id().String(), flatten(&book))
	}
483
	return i.Close()
484 485
}

ale's avatar
ale committed
486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512
type SearchResult struct {
	Results    []*Book
	NumResults int
}

func (db *Database) doSearch(query bleve.Query, offset, limit int) (*SearchResult, error) {
	req := bleve.NewSearchRequestOptions(query, limit, offset, false)
	result, err := db.index.Search(req)
	if err != nil {
		return nil, err
	}

	sr := SearchResult{NumResults: int(result.Total)}
	for _, r := range result.Hits {
		if book, err := db.GetBook(ParseID(r.ID)); err == nil {
			sr.Results = append(sr.Results, book)
		}
	}
	return &sr, nil
}

// Search the database with a query string.
func (db *Database) Search(queryStr string, offset, limit int) (*SearchResult, error) {
	return db.doSearch(bleve.NewQueryStringQuery(queryStr), offset, limit)
}

// Autocomplete runs a fuzzy search for a term.
513 514 515
func (db *Database) Suggest(term string) (*SearchResult, error) {
	query := bleve.NewTermQuery(term).SetField("_suggest")
	return db.doSearch(query, 0, 20)
ale's avatar
ale committed
516 517 518
}

// Find a book matching the given metadata, if possible.
ale's avatar
ale committed
519 520
func (db *Database) Find(uniqueIds []string) (*Book, error) {
	var queries []bleve.Query
ale's avatar
ale committed
521 522
	var query bleve.Query

ale's avatar
ale committed
523 524 525 526
	for _, u := range uniqueIds {
		queries = append(queries, bleve.NewTermQuery(u).SetField("_unique"))
	}
	if len(queries) > 0 {
ale's avatar
ale committed
527 528
		query = bleve.NewDisjunctionQuery(queries)
	} else {
ale's avatar
ale committed
529
		query = queries[0]
ale's avatar
ale committed
530 531 532 533 534 535 536
	}

	search := bleve.NewSearchRequest(query)
	result, err := db.index.Search(search)
	if err != nil {
		return nil, err
	}
ale's avatar
ale committed
537 538
	if len(result.Hits) == 0 {
		return nil, errors.New("no matches found")
ale's avatar
ale committed
539
	}
ale's avatar
ale committed
540 541

	return db.GetBook(ParseID(result.Hits[0].ID))
ale's avatar
ale committed
542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568
}

func bktToKey(bucket, key []byte) []byte {
	return bytes.Join([][]byte{bucket, key}, []byte{keySeparator})
}

// Input is a full key (including bucket).
func keyToId(key []byte) BookId {
	n := bytes.Index(key, []byte{keySeparator})
	if n < 0 {
		return 0
	}

	var id uint64
	binary.Read(bytes.NewReader(key[n+1:]), binary.LittleEndian, &id)
	return BookId(id)
}

func keyRange(prefix []byte) ([]byte, []byte) {
	start := make([]byte, len(prefix)+1)
	end := make([]byte, len(prefix)+1)
	copy(start, prefix)
	copy(end, prefix)
	start[len(prefix)] = keySeparator
	end[len(prefix)] = keySeparator + 1
	return start, end
}