Commit 394de2d9 authored by ale's avatar ale

Switch to github.com/syndtr/goleveldb

The native Go implementation of LevelDB.
parent 4fc0b1d2
...@@ -13,34 +13,29 @@ import ( ...@@ -13,34 +13,29 @@ import (
"time" "time"
"github.com/PuerkitoBio/purell" "github.com/PuerkitoBio/purell"
"github.com/jmhodges/levigo" "github.com/syndtr/goleveldb/leveldb"
lerr "github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/iterator"
lutil "github.com/syndtr/goleveldb/leveldb/util"
) )
type gobDB struct { type gobDB struct {
*levigo.DB *leveldb.DB
} }
func newGobDB(path string) (*gobDB, error) { func newGobDB(path string) (*gobDB, error) {
opts := levigo.NewOptions() db, err := leveldb.OpenFile(path, nil)
opts.SetCreateIfMissing(true) if lerr.IsCorrupted(err) {
opts.SetCache(levigo.NewLRUCache(2 << 20)) log.Printf("corrupted database, recovering...")
opts.SetFilterPolicy(levigo.NewBloomFilter(10)) db, err = leveldb.RecoverFile(path, nil)
db, err := levigo.Open(path, opts) }
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &gobDB{db}, nil return &gobDB{db}, nil
} }
func (db *gobDB) PutObj(wo *levigo.WriteOptions, key []byte, obj interface{}) error { func (db *gobDB) PutObjBatch(wb *leveldb.Batch, key []byte, obj interface{}) error {
var b bytes.Buffer
if err := gob.NewEncoder(&b).Encode(obj); err != nil {
return err
}
return db.Put(wo, key, b.Bytes())
}
func (db *gobDB) PutObjBatch(wb *levigo.WriteBatch, key []byte, obj interface{}) error {
var b bytes.Buffer var b bytes.Buffer
if err := gob.NewEncoder(&b).Encode(obj); err != nil { if err := gob.NewEncoder(&b).Encode(obj); err != nil {
return err return err
...@@ -49,8 +44,8 @@ func (db *gobDB) PutObjBatch(wb *levigo.WriteBatch, key []byte, obj interface{}) ...@@ -49,8 +44,8 @@ func (db *gobDB) PutObjBatch(wb *levigo.WriteBatch, key []byte, obj interface{})
return nil return nil
} }
func (db *gobDB) GetObj(ro *levigo.ReadOptions, key []byte, obj interface{}) error { func (db *gobDB) GetObj(key []byte, obj interface{}) error {
data, err := db.Get(ro, key) data, err := db.Get(key, nil)
if err != nil { if err != nil {
return err return err
} }
...@@ -60,58 +55,24 @@ func (db *gobDB) GetObj(ro *levigo.ReadOptions, key []byte, obj interface{}) err ...@@ -60,58 +55,24 @@ func (db *gobDB) GetObj(ro *levigo.ReadOptions, key []byte, obj interface{}) err
return nil return nil
} }
func (db *gobDB) NewPrefixIterator(ro *levigo.ReadOptions, prefix []byte) *gobPrefixIterator { func (db *gobDB) NewPrefixIterator(prefix []byte) *gobIterator {
i := db.NewIterator(ro) return newGobIterator(db.NewIterator(lutil.BytesPrefix(prefix), nil))
i.Seek(prefix)
return newGobPrefixIterator(i, prefix)
} }
func (db *gobDB) NewRangeIterator(ro *levigo.ReadOptions, startKey, endKey []byte) *gobRangeIterator { func (db *gobDB) NewRangeIterator(startKey, endKey []byte) *gobIterator {
i := db.NewIterator(ro) return newGobIterator(db.NewIterator(&lutil.Range{Start: startKey, Limit: endKey}, nil))
if startKey != nil {
i.Seek(startKey)
}
return newGobRangeIterator(i, endKey)
} }
type gobIterator struct { type gobIterator struct {
*levigo.Iterator iterator.Iterator
}
func (i *gobIterator) Value(obj interface{}) error {
return gob.NewDecoder(bytes.NewBuffer(i.Iterator.Value())).Decode(obj)
} }
type gobPrefixIterator struct { func newGobIterator(i iterator.Iterator) *gobIterator {
*gobIterator return &gobIterator{i}
prefix []byte
} }
func (i *gobPrefixIterator) Valid() bool { func (i *gobIterator) Value(obj interface{}) error {
return i.gobIterator.Valid() && bytes.HasPrefix(i.Key(), i.prefix) return gob.NewDecoder(bytes.NewBuffer(i.Iterator.Value())).Decode(obj)
}
func newGobPrefixIterator(i *levigo.Iterator, prefix []byte) *gobPrefixIterator {
return &gobPrefixIterator{
gobIterator: &gobIterator{i},
prefix: prefix,
}
}
type gobRangeIterator struct {
*gobIterator
endKey []byte
}
func (i *gobRangeIterator) Valid() bool {
return i.gobIterator.Valid() && (i.endKey == nil || bytes.Compare(i.Key(), i.endKey) < 0)
}
func newGobRangeIterator(i *levigo.Iterator, endKey []byte) *gobRangeIterator {
return &gobRangeIterator{
gobIterator: &gobIterator{i},
endKey: endKey,
}
} }
// URLInfo stores information about a crawled URL. // URLInfo stores information about a crawled URL.
...@@ -182,23 +143,18 @@ func (c *Crawler) Enqueue(u *url.URL, depth int) { ...@@ -182,23 +143,18 @@ func (c *Crawler) Enqueue(u *url.URL, depth int) {
// Check if we've already seen it. // Check if we've already seen it.
var info URLInfo var info URLInfo
ro := levigo.NewReadOptions()
defer ro.Close()
ukey := []byte(fmt.Sprintf("url/%s", urlStr)) ukey := []byte(fmt.Sprintf("url/%s", urlStr))
if err := c.db.GetObj(ro, ukey, &info); err == nil { if err := c.db.GetObj(ukey, &info); err == nil {
return return
} }
// Store the URL in the queue, and store an empty URLInfo to // Store the URL in the queue, and store an empty URLInfo to
// make sure that subsequent calls to Enqueue with the same // make sure that subsequent calls to Enqueue with the same
// URL will fail. // URL will fail.
wb := levigo.NewWriteBatch() wb := new(leveldb.Batch)
defer wb.Close()
c.queue.Add(wb, urlStr, depth, time.Now()) c.queue.Add(wb, urlStr, depth, time.Now())
c.db.PutObjBatch(wb, ukey, &info) c.db.PutObjBatch(wb, ukey, &info)
wo := levigo.NewWriteOptions() c.db.Write(wb, nil)
defer wo.Close()
c.db.Write(wo, wb)
} }
// Scan the queue for URLs until there are no more. // Scan the queue for URLs until there are no more.
...@@ -222,8 +178,7 @@ func (c *Crawler) urlHandler(queue <-chan queuePair) { ...@@ -222,8 +178,7 @@ func (c *Crawler) urlHandler(queue <-chan queuePair) {
// Ignore errors, we can work with an empty object. // Ignore errors, we can work with an empty object.
urlkey := []byte(fmt.Sprintf("url/%s", p.URL)) urlkey := []byte(fmt.Sprintf("url/%s", p.URL))
var info URLInfo var info URLInfo
ro := levigo.NewReadOptions() c.db.GetObj(urlkey, &info)
c.db.GetObj(ro, urlkey, &info)
info.CrawledAt = time.Now() info.CrawledAt = time.Now()
info.URL = p.URL info.URL = p.URL
...@@ -241,7 +196,7 @@ func (c *Crawler) urlHandler(queue <-chan queuePair) { ...@@ -241,7 +196,7 @@ func (c *Crawler) urlHandler(queue <-chan queuePair) {
// Invoke the handler (even if the fetcher errored out). // Invoke the handler (even if the fetcher errored out).
info.Error = c.handler.Handle(c, p.URL, p.Depth, httpResp, httpErr) info.Error = c.handler.Handle(c, p.URL, p.Depth, httpResp, httpErr)
wb := levigo.NewWriteBatch() wb := new(leveldb.Batch)
if httpErr == nil { if httpErr == nil {
respBody.Close() respBody.Close()
...@@ -254,10 +209,7 @@ func (c *Crawler) urlHandler(queue <-chan queuePair) { ...@@ -254,10 +209,7 @@ func (c *Crawler) urlHandler(queue <-chan queuePair) {
c.db.PutObjBatch(wb, urlkey, &info) c.db.PutObjBatch(wb, urlkey, &info)
wo := levigo.NewWriteOptions() c.db.Write(wb, nil)
c.db.Write(wo, wb)
wo.Close()
wb.Close()
} }
} }
......
...@@ -8,7 +8,7 @@ import ( ...@@ -8,7 +8,7 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/jmhodges/levigo" "github.com/syndtr/goleveldb/leveldb"
) )
type queue struct { type queue struct {
...@@ -34,18 +34,12 @@ type queuePair struct { ...@@ -34,18 +34,12 @@ type queuePair struct {
// Scan the pending queue and send items on 'ch'. Returns an error // Scan the pending queue and send items on 'ch'. Returns an error
// when the queue is empty (work is done). // when the queue is empty (work is done).
func (q *queue) Scan(ch chan<- queuePair) error { func (q *queue) Scan(ch chan<- queuePair) error {
snap := q.db.NewSnapshot()
defer q.db.ReleaseSnapshot(snap)
ro := levigo.NewReadOptions()
ro.SetSnapshot(snap)
defer ro.Close()
n := 0 n := 0
startKey, endKey := queueScanRange() startKey, endKey := queueScanRange()
iter := q.db.NewRangeIterator(ro, startKey, endKey) iter := q.db.NewRangeIterator(startKey, endKey)
defer iter.Release()
for ; iter.Valid(); iter.Next() { for iter.Next() {
var p queuePair var p queuePair
if err := iter.Value(&p); err != nil { if err := iter.Value(&p); err != nil {
continue continue
...@@ -63,34 +57,31 @@ func (q *queue) Scan(ch chan<- queuePair) error { ...@@ -63,34 +57,31 @@ func (q *queue) Scan(ch chan<- queuePair) error {
} }
// Add an item to the pending work queue. // Add an item to the pending work queue.
func (q *queue) Add(wb *levigo.WriteBatch, urlStr string, depth int, when time.Time) { func (q *queue) Add(wb *leveldb.Batch, urlStr string, depth int, when time.Time) {
t := uint64(when.UnixNano()) t := uint64(when.UnixNano())
qkey := bytes.Join([][]byte{queuePrefix, encodeUint64(t), encodeUint64(uint64(rand.Int63()))}, queueKeySep) qkey := bytes.Join([][]byte{queuePrefix, encodeUint64(t), encodeUint64(uint64(rand.Int63()))}, queueKeySep)
q.db.PutObjBatch(wb, qkey, &queuePair{URL: urlStr, Depth: depth}) q.db.PutObjBatch(wb, qkey, &queuePair{URL: urlStr, Depth: depth})
} }
func (q *queue) acquire(qp queuePair) { func (q *queue) acquire(qp queuePair) {
wb := levigo.NewWriteBatch() wb := new(leveldb.Batch)
defer wb.Close()
q.db.PutObjBatch(wb, activeQueueKey(qp.key), qp) q.db.PutObjBatch(wb, activeQueueKey(qp.key), qp)
wb.Delete(qp.key) wb.Delete(qp.key)
wo := levigo.NewWriteOptions() q.db.Write(wb, nil)
defer wo.Close()
q.db.Write(wo, wb)
atomic.AddInt32(&q.numActive, 1) atomic.AddInt32(&q.numActive, 1)
} }
// Release an item from the queue. Processing for this item is done. // Release an item from the queue. Processing for this item is done.
func (q *queue) Release(wb *levigo.WriteBatch, qp queuePair) { func (q *queue) Release(wb *leveldb.Batch, qp queuePair) {
wb.Delete(activeQueueKey(qp.key)) wb.Delete(activeQueueKey(qp.key))
atomic.AddInt32(&q.numActive, -1) atomic.AddInt32(&q.numActive, -1)
} }
// Retry processing this item at a later time. // Retry processing this item at a later time.
func (q *queue) Retry(wb *levigo.WriteBatch, qp queuePair, delay time.Duration) { func (q *queue) Retry(wb *leveldb.Batch, qp queuePair, delay time.Duration) {
wb.Delete(activeQueueKey(qp.key)) wb.Delete(activeQueueKey(qp.key))
q.Add(wb, qp.URL, qp.Depth, time.Now().Add(delay)) q.Add(wb, qp.URL, qp.Depth, time.Now().Add(delay))
atomic.AddInt32(&q.numActive, -1) atomic.AddInt32(&q.numActive, -1)
...@@ -100,15 +91,12 @@ func (q *queue) Retry(wb *levigo.WriteBatch, qp queuePair, delay time.Duration) ...@@ -100,15 +91,12 @@ func (q *queue) Retry(wb *levigo.WriteBatch, qp queuePair, delay time.Duration)
// called at startup to recover tasks that were active when the // called at startup to recover tasks that were active when the
// previous run terminated. // previous run terminated.
func (q *queue) Recover() { func (q *queue) Recover() {
wb := levigo.NewWriteBatch() wb := new(leveldb.Batch)
defer wb.Close()
ro := levigo.NewReadOptions()
defer ro.Close()
prefix := bytes.Join([][]byte{activePrefix, []byte{}}, queueKeySep) prefix := bytes.Join([][]byte{activePrefix, []byte{}}, queueKeySep)
iter := q.db.NewPrefixIterator(ro, prefix) iter := q.db.NewPrefixIterator(prefix)
for ; iter.Valid(); iter.Next() { defer iter.Release()
for iter.Next() {
var p queuePair var p queuePair
if err := iter.Value(&p); err != nil { if err := iter.Value(&p); err != nil {
continue continue
...@@ -118,15 +106,13 @@ func (q *queue) Recover() { ...@@ -118,15 +106,13 @@ func (q *queue) Recover() {
wb.Delete(iter.Key()) wb.Delete(iter.Key())
} }
wo := levigo.NewWriteOptions() q.db.Write(wb, nil)
defer wo.Close()
q.db.Write(wo, wb)
} }
func encodeUint64(n uint64) []byte { func encodeUint64(n uint64) []byte {
var b bytes.Buffer var b [8]byte
binary.Write(&b, binary.BigEndian, n) binary.BigEndian.PutUint64(b[:], n)
return b.Bytes() return b[:]
} }
func activeQueueKey(key []byte) []byte { func activeQueueKey(key []byte) []byte {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment