From 394de2d98a9cfde6244620f0b188625b60f68f96 Mon Sep 17 00:00:00 2001
From: ale <ale@incal.net>
Date: Mon, 18 Dec 2017 22:34:48 +0000
Subject: [PATCH] Switch to github.com/syndtr/goleveldb

The native Go implementation of LevelDB.
---
 crawler.go | 104 +++++++++++++++--------------------------------------
 queue.go   |  48 +++++++++----------------
 2 files changed, 45 insertions(+), 107 deletions(-)

diff --git a/crawler.go b/crawler.go
index f2a8968..f1edc2d 100644
--- a/crawler.go
+++ b/crawler.go
@@ -13,34 +13,29 @@ import (
 	"time"
 
 	"github.com/PuerkitoBio/purell"
-	"github.com/jmhodges/levigo"
+	"github.com/syndtr/goleveldb/leveldb"
+	lerr "github.com/syndtr/goleveldb/leveldb/errors"
+	"github.com/syndtr/goleveldb/leveldb/iterator"
+	lutil "github.com/syndtr/goleveldb/leveldb/util"
 )
 
 type gobDB struct {
-	*levigo.DB
+	*leveldb.DB
 }
 
 func newGobDB(path string) (*gobDB, error) {
-	opts := levigo.NewOptions()
-	opts.SetCreateIfMissing(true)
-	opts.SetCache(levigo.NewLRUCache(2 << 20))
-	opts.SetFilterPolicy(levigo.NewBloomFilter(10))
-	db, err := levigo.Open(path, opts)
+	db, err := leveldb.OpenFile(path, nil)
+	if lerr.IsCorrupted(err) {
+		log.Printf("corrupted database, recovering...")
+		db, err = leveldb.RecoverFile(path, nil)
+	}
 	if err != nil {
 		return nil, err
 	}
 	return &gobDB{db}, nil
 }
 
-func (db *gobDB) PutObj(wo *levigo.WriteOptions, key []byte, obj interface{}) error {
-	var b bytes.Buffer
-	if err := gob.NewEncoder(&b).Encode(obj); err != nil {
-		return err
-	}
-	return db.Put(wo, key, b.Bytes())
-}
-
-func (db *gobDB) PutObjBatch(wb *levigo.WriteBatch, key []byte, obj interface{}) error {
+func (db *gobDB) PutObjBatch(wb *leveldb.Batch, key []byte, obj interface{}) error {
 	var b bytes.Buffer
 	if err := gob.NewEncoder(&b).Encode(obj); err != nil {
 		return err
@@ -49,8 +44,8 @@ func (db *gobDB) PutObjBatch(wb *levigo.WriteBatch, key []byte, obj interface{})
 	return nil
 }
 
-func (db *gobDB) GetObj(ro *levigo.ReadOptions, key []byte, obj interface{}) error {
-	data, err := db.Get(ro, key)
+func (db *gobDB) GetObj(key []byte, obj interface{}) error {
+	data, err := db.Get(key, nil)
 	if err != nil {
 		return err
 	}
@@ -60,58 +55,24 @@ func (db *gobDB) GetObj(ro *levigo.ReadOptions, key []byte, obj interface{}) err
 	return nil
 }
 
-func (db *gobDB) NewPrefixIterator(ro *levigo.ReadOptions, prefix []byte) *gobPrefixIterator {
-	i := db.NewIterator(ro)
-	i.Seek(prefix)
-	return newGobPrefixIterator(i, prefix)
+func (db *gobDB) NewPrefixIterator(prefix []byte) *gobIterator {
+	return newGobIterator(db.NewIterator(lutil.BytesPrefix(prefix), nil))
 }
 
-func (db *gobDB) NewRangeIterator(ro *levigo.ReadOptions, startKey, endKey []byte) *gobRangeIterator {
-	i := db.NewIterator(ro)
-	if startKey != nil {
-		i.Seek(startKey)
-	}
-	return newGobRangeIterator(i, endKey)
+func (db *gobDB) NewRangeIterator(startKey, endKey []byte) *gobIterator {
+	return newGobIterator(db.NewIterator(&lutil.Range{Start: startKey, Limit: endKey}, nil))
 }
 
 type gobIterator struct {
-	*levigo.Iterator
-}
-
-func (i *gobIterator) Value(obj interface{}) error {
-	return gob.NewDecoder(bytes.NewBuffer(i.Iterator.Value())).Decode(obj)
+	iterator.Iterator
 }
 
-type gobPrefixIterator struct {
-	*gobIterator
-	prefix []byte
+func newGobIterator(i iterator.Iterator) *gobIterator {
+	return &gobIterator{i}
 }
 
-func (i *gobPrefixIterator) Valid() bool {
-	return i.gobIterator.Valid() && bytes.HasPrefix(i.Key(), i.prefix)
-}
-
-func newGobPrefixIterator(i *levigo.Iterator, prefix []byte) *gobPrefixIterator {
-	return &gobPrefixIterator{
-		gobIterator: &gobIterator{i},
-		prefix:      prefix,
-	}
-}
-
-type gobRangeIterator struct {
-	*gobIterator
-	endKey []byte
-}
-
-func (i *gobRangeIterator) Valid() bool {
-	return i.gobIterator.Valid() && (i.endKey == nil || bytes.Compare(i.Key(), i.endKey) < 0)
-}
-
-func newGobRangeIterator(i *levigo.Iterator, endKey []byte) *gobRangeIterator {
-	return &gobRangeIterator{
-		gobIterator: &gobIterator{i},
-		endKey:      endKey,
-	}
+func (i *gobIterator) Value(obj interface{}) error {
+	return gob.NewDecoder(bytes.NewBuffer(i.Iterator.Value())).Decode(obj)
 }
 
 // URLInfo stores information about a crawled URL.
@@ -182,23 +143,18 @@ func (c *Crawler) Enqueue(u *url.URL, depth int) {
 
 	// Check if we've already seen it.
 	var info URLInfo
-	ro := levigo.NewReadOptions()
-	defer ro.Close()
 	ukey := []byte(fmt.Sprintf("url/%s", urlStr))
-	if err := c.db.GetObj(ro, ukey, &info); err == nil {
+	if err := c.db.GetObj(ukey, &info); err == nil {
 		return
 	}
 
 	// Store the URL in the queue, and store an empty URLInfo to
 	// make sure that subsequent calls to Enqueue with the same
 	// URL will fail.
-	wb := levigo.NewWriteBatch()
-	defer wb.Close()
+	wb := new(leveldb.Batch)
 	c.queue.Add(wb, urlStr, depth, time.Now())
 	c.db.PutObjBatch(wb, ukey, &info)
-	wo := levigo.NewWriteOptions()
-	defer wo.Close()
-	c.db.Write(wo, wb)
+	c.db.Write(wb, nil)
 }
 
 // Scan the queue for URLs until there are no more.
@@ -222,8 +178,7 @@ func (c *Crawler) urlHandler(queue <-chan queuePair) {
 		// Ignore errors, we can work with an empty object.
 		urlkey := []byte(fmt.Sprintf("url/%s", p.URL))
 		var info URLInfo
-		ro := levigo.NewReadOptions()
-		c.db.GetObj(ro, urlkey, &info)
+		c.db.GetObj(urlkey, &info)
 		info.CrawledAt = time.Now()
 		info.URL = p.URL
 
@@ -241,7 +196,7 @@ func (c *Crawler) urlHandler(queue <-chan queuePair) {
 		// Invoke the handler (even if the fetcher errored out).
 		info.Error = c.handler.Handle(c, p.URL, p.Depth, httpResp, httpErr)
 
-		wb := levigo.NewWriteBatch()
+		wb := new(leveldb.Batch)
 		if httpErr == nil {
 			respBody.Close()
 
@@ -254,10 +209,7 @@ func (c *Crawler) urlHandler(queue <-chan queuePair) {
 
 		c.db.PutObjBatch(wb, urlkey, &info)
 
-		wo := levigo.NewWriteOptions()
-		c.db.Write(wo, wb)
-		wo.Close()
-		wb.Close()
+		c.db.Write(wb, nil)
 	}
 }
 
diff --git a/queue.go b/queue.go
index 5bad577..7621226 100644
--- a/queue.go
+++ b/queue.go
@@ -8,7 +8,7 @@ import (
 	"sync/atomic"
 	"time"
 
-	"github.com/jmhodges/levigo"
+	"github.com/syndtr/goleveldb/leveldb"
 )
 
 type queue struct {
@@ -34,18 +34,12 @@ type queuePair struct {
 // Scan the pending queue and send items on 'ch'. Returns an error
 // when the queue is empty (work is done).
 func (q *queue) Scan(ch chan<- queuePair) error {
-	snap := q.db.NewSnapshot()
-	defer q.db.ReleaseSnapshot(snap)
-
-	ro := levigo.NewReadOptions()
-	ro.SetSnapshot(snap)
-	defer ro.Close()
-
 	n := 0
 	startKey, endKey := queueScanRange()
-	iter := q.db.NewRangeIterator(ro, startKey, endKey)
+	iter := q.db.NewRangeIterator(startKey, endKey)
+	defer iter.Release()
 
-	for ; iter.Valid(); iter.Next() {
+	for iter.Next() {
 		var p queuePair
 		if err := iter.Value(&p); err != nil {
 			continue
@@ -63,34 +57,31 @@ func (q *queue) Scan(ch chan<- queuePair) error {
 }
 
 // Add an item to the pending work queue.
-func (q *queue) Add(wb *levigo.WriteBatch, urlStr string, depth int, when time.Time) {
+func (q *queue) Add(wb *leveldb.Batch, urlStr string, depth int, when time.Time) {
 	t := uint64(when.UnixNano())
 	qkey := bytes.Join([][]byte{queuePrefix, encodeUint64(t), encodeUint64(uint64(rand.Int63()))}, queueKeySep)
 	q.db.PutObjBatch(wb, qkey, &queuePair{URL: urlStr, Depth: depth})
 }
 
 func (q *queue) acquire(qp queuePair) {
-	wb := levigo.NewWriteBatch()
-	defer wb.Close()
+	wb := new(leveldb.Batch)
 
 	q.db.PutObjBatch(wb, activeQueueKey(qp.key), qp)
 	wb.Delete(qp.key)
 
-	wo := levigo.NewWriteOptions()
-	defer wo.Close()
-	q.db.Write(wo, wb)
+	q.db.Write(wb, nil)
 
 	atomic.AddInt32(&q.numActive, 1)
 }
 
 // Release an item from the queue. Processing for this item is done.
-func (q *queue) Release(wb *levigo.WriteBatch, qp queuePair) {
+func (q *queue) Release(wb *leveldb.Batch, qp queuePair) {
 	wb.Delete(activeQueueKey(qp.key))
 	atomic.AddInt32(&q.numActive, -1)
 }
 
 // Retry processing this item at a later time.
-func (q *queue) Retry(wb *levigo.WriteBatch, qp queuePair, delay time.Duration) {
+func (q *queue) Retry(wb *leveldb.Batch, qp queuePair, delay time.Duration) {
 	wb.Delete(activeQueueKey(qp.key))
 	q.Add(wb, qp.URL, qp.Depth, time.Now().Add(delay))
 	atomic.AddInt32(&q.numActive, -1)
@@ -100,15 +91,12 @@ func (q *queue) Retry(wb *levigo.WriteBatch, qp queuePair, delay time.Duration)
 // called at startup to recover tasks that were active when the
 // previous run terminated.
 func (q *queue) Recover() {
-	wb := levigo.NewWriteBatch()
-	defer wb.Close()
-
-	ro := levigo.NewReadOptions()
-	defer ro.Close()
+	wb := new(leveldb.Batch)
 
 	prefix := bytes.Join([][]byte{activePrefix, []byte{}}, queueKeySep)
-	iter := q.db.NewPrefixIterator(ro, prefix)
-	for ; iter.Valid(); iter.Next() {
+	iter := q.db.NewPrefixIterator(prefix)
+	defer iter.Release()
+	for iter.Next() {
 		var p queuePair
 		if err := iter.Value(&p); err != nil {
 			continue
@@ -118,15 +106,13 @@ func (q *queue) Recover() {
 		wb.Delete(iter.Key())
 	}
 
-	wo := levigo.NewWriteOptions()
-	defer wo.Close()
-	q.db.Write(wo, wb)
+	q.db.Write(wb, nil)
 }
 
 func encodeUint64(n uint64) []byte {
-	var b bytes.Buffer
-	binary.Write(&b, binary.BigEndian, n)
-	return b.Bytes()
+	var b [8]byte
+	binary.BigEndian.PutUint64(b[:], n)
+	return b[:]
 }
 
 func activeQueueKey(key []byte) []byte {
-- 
GitLab