From 9fbc656c6cd2ad610986a265c6b346bc234bb881 Mon Sep 17 00:00:00 2001
From: ale <ale@incal.net>
Date: Mon, 29 Jun 2015 10:07:40 +0100
Subject: [PATCH] improve queue code; golint fixes

The queuing code now performs proper lease accounting, and it will not
return a URL twice if the page load is slow.
---
 analysis/links.go |   2 +
 crawler.go        | 128 ++++++++++++++++++++++++++---------------
 queue.go          | 141 ++++++++++++++++++++++++++++++++++++++++++++++
 scope.go          |  10 +++-
 4 files changed, 233 insertions(+), 48 deletions(-)
 create mode 100644 queue.go

diff --git a/analysis/links.go b/analysis/links.go
index 22bcb80..a51a467 100644
--- a/analysis/links.go
+++ b/analysis/links.go
@@ -27,6 +27,8 @@ var (
 	}
 )
 
+// GetLinks returns all the links found in a document. Currently only
+// parses HTML pages and CSS stylesheets.
 func GetLinks(resp *http.Response) ([]*url.URL, error) {
 	var outlinks []string
 
diff --git a/crawler.go b/crawler.go
index 52317fb..c337d97 100644
--- a/crawler.go
+++ b/crawler.go
@@ -40,6 +40,15 @@ func (db *gobDB) PutObj(wo *levigo.WriteOptions, key []byte, obj interface{}) er
 	return db.Put(wo, key, b.Bytes())
 }
 
+func (db *gobDB) PutObjBatch(wb *levigo.WriteBatch, key []byte, obj interface{}) error {
+	var b bytes.Buffer
+	if err := gob.NewEncoder(&b).Encode(obj); err != nil {
+		return err
+	}
+	wb.Put(key, b.Bytes())
+	return nil
+}
+
 func (db *gobDB) GetObj(ro *levigo.ReadOptions, key []byte, obj interface{}) error {
 	data, err := db.Get(ro, key)
 	if err != nil {
@@ -51,25 +60,61 @@ func (db *gobDB) GetObj(ro *levigo.ReadOptions, key []byte, obj interface{}) err
 	return nil
 }
 
-func (db *gobDB) NewPrefixIterator(ro *levigo.ReadOptions, prefix []byte) *gobIterator {
+func (db *gobDB) NewPrefixIterator(ro *levigo.ReadOptions, prefix []byte) *gobPrefixIterator {
 	i := db.NewIterator(ro)
 	i.Seek(prefix)
-	return &gobIterator{Iterator: i, prefix: prefix}
+	return newGobPrefixIterator(i, prefix)
+}
+
+func (db *gobDB) NewRangeIterator(ro *levigo.ReadOptions, startKey, endKey []byte) *gobRangeIterator {
+	i := db.NewIterator(ro)
+	if startKey != nil {
+		i.Seek(startKey)
+	}
+	return newGobRangeIterator(i, endKey)
 }
 
 type gobIterator struct {
 	*levigo.Iterator
+}
+
+func (i *gobIterator) Value(obj interface{}) error {
+	return gob.NewDecoder(bytes.NewBuffer(i.Iterator.Value())).Decode(obj)
+}
+
+type gobPrefixIterator struct {
+	*gobIterator
 	prefix []byte
 }
 
-func (i *gobIterator) Valid() bool {
-	return i.Iterator.Valid() && bytes.HasPrefix(i.Key(), i.prefix)
+func (i *gobPrefixIterator) Valid() bool {
+	return i.gobIterator.Valid() && bytes.HasPrefix(i.Key(), i.prefix)
 }
 
-func (i *gobIterator) Value(obj interface{}) error {
-	return gob.NewDecoder(bytes.NewBuffer(i.Iterator.Value())).Decode(obj)
+func newGobPrefixIterator(i *levigo.Iterator, prefix []byte) *gobPrefixIterator {
+	return &gobPrefixIterator{
+		gobIterator: &gobIterator{i},
+		prefix:      prefix,
+	}
 }
 
+type gobRangeIterator struct {
+	*gobIterator
+	endKey []byte
+}
+
+func (i *gobRangeIterator) Valid() bool {
+	return i.gobIterator.Valid() && (i.endKey == nil || bytes.Compare(i.Key(), i.endKey) < 0)
+}
+
+func newGobRangeIterator(i *levigo.Iterator, endKey []byte) *gobRangeIterator {
+	return &gobRangeIterator{
+		gobIterator: &gobIterator{i},
+		endKey:      endKey,
+	}
+}
+
+// URLInfo stores information about a crawled URL.
 type URLInfo struct {
 	URL        string
 	StatusCode int
@@ -79,11 +124,14 @@ type URLInfo struct {
 
 // A Fetcher retrieves contents from remote URLs.
 type Fetcher interface {
+	// Fetch retrieves a URL and returns the response.
 	Fetch(string) (*http.Response, error)
 }
 
+// FetcherFunc wraps a simple function into the Fetcher interface.
 type FetcherFunc func(string) (*http.Response, error)
 
+// Fetch retrieves a URL and returns the response.
 func (f FetcherFunc) Fetch(u string) (*http.Response, error) {
 	return f(u)
 }
@@ -92,11 +140,14 @@ func (f FetcherFunc) Fetch(u string) (*http.Response, error) {
 // implementations of this interface are considered permanent and will
 // not cause the URL to be fetched again.
 type Handler interface {
+	// Handle the response from a URL.
 	Handle(*Crawler, string, int, *http.Response, error) error
 }
 
+// HandlerFunc wraps a function into the Handler interface.
 type HandlerFunc func(*Crawler, string, int, *http.Response, error) error
 
+// Handle the response from a URL.
 func (f HandlerFunc) Handle(db *Crawler, u string, depth int, resp *http.Response, err error) error {
 	return f(db, u, depth, resp, err)
 }
@@ -104,6 +155,7 @@ func (f HandlerFunc) Handle(db *Crawler, u string, depth int, resp *http.Respons
 // The Crawler object contains the crawler state.
 type Crawler struct {
 	db      *gobDB
+	queue   *queue
 	seeds   []*url.URL
 	scopes  []Scope
 	fetcher Fetcher
@@ -112,12 +164,6 @@ type Crawler struct {
 	enqueueMx sync.Mutex
 }
 
-type queuePair struct {
-	Key   []byte
-	URL   string
-	Depth int
-}
-
 // Enqueue a (possibly new) URL for processing.
 func (c *Crawler) Enqueue(u *url.URL, depth int) {
 	// Normalize the URL.
@@ -143,47 +189,24 @@ func (c *Crawler) Enqueue(u *url.URL, depth int) {
 		return
 	}
 
-	// Create a unique key using the URL and the current timestamp.
-	qkey := []byte(fmt.Sprintf("queue/%d/%s", time.Now().Unix(), urlStr))
-
 	// Store the URL in the queue, and store an empty URLInfo to
 	// make sure that subsequent calls to Enqueue with the same
 	// URL will fail.
+	wb := levigo.NewWriteBatch()
+	defer wb.Close()
+	c.queue.Add(wb, urlStr, depth, time.Now())
+	c.db.PutObjBatch(wb, ukey, &info)
 	wo := levigo.NewWriteOptions()
 	defer wo.Close()
-	c.db.PutObj(wo, qkey, &queuePair{Key: qkey, URL: urlStr, Depth: depth})
-	c.db.PutObj(wo, ukey, &info)
+	c.db.Write(wo, wb)
 }
 
 // Scan the queue for URLs until there are no more.
 func (c *Crawler) process() <-chan queuePair {
 	ch := make(chan queuePair)
 	go func() {
-		queuePrefix := []byte("queue/")
 		for range time.Tick(2 * time.Second) {
-			n := 0
-
-			// Scan the queue using a snapshot, to ignore
-			// new URLs that might be added after this.
-			s := c.db.NewSnapshot()
-			ro := levigo.NewReadOptions()
-			ro.SetSnapshot(s)
-
-			iter := c.db.NewPrefixIterator(ro, queuePrefix)
-			for ; iter.Valid(); iter.Next() {
-				var p queuePair
-				if err := iter.Value(&p); err != nil {
-					continue
-				}
-				ch <- p
-				n++
-			}
-			iter.Close()
-
-			ro.Close()
-			c.db.ReleaseSnapshot(s)
-
-			if n == 0 {
+			if err := c.queue.Scan(ch); err != nil {
 				break
 			}
 		}
@@ -218,20 +241,27 @@ func (c *Crawler) urlHandler(queue <-chan queuePair) {
 		// Invoke the handler (even if the fetcher errored out).
 		info.Error = c.handler.Handle(c, p.URL, p.Depth, httpResp, httpErr)
 
-		wo := levigo.NewWriteOptions()
+		wb := levigo.NewWriteBatch()
 		if httpErr == nil {
 			respBody.Close()
 
 			// Remove the URL from the queue if the fetcher was successful.
-			c.db.Delete(wo, p.Key)
+			c.queue.Release(wb, p)
 		} else {
 			log.Printf("error retrieving %s: %v", p.URL, httpErr)
+			c.queue.Retry(wb, p, 300*time.Second)
 		}
-		c.db.PutObj(wo, urlkey, &info)
+
+		c.db.PutObjBatch(wb, urlkey, &info)
+
+		wo := levigo.NewWriteOptions()
+		c.db.Write(wo, wb)
 		wo.Close()
+		wb.Close()
 	}
 }
 
+// MustParseURLs parses a list of URLs and aborts on failure.
 func MustParseURLs(urls []string) []*url.URL {
 	// Parse the seed URLs.
 	var parsed []*url.URL
@@ -252,13 +282,19 @@ func NewCrawler(path string, seeds []*url.URL, scopes []Scope, f Fetcher, h Hand
 	if err != nil {
 		return nil, err
 	}
+
 	c := &Crawler{
 		db:      db,
+		queue:   &queue{db: db},
 		fetcher: f,
 		handler: h,
 		seeds:   seeds,
 		scopes:  scopes,
 	}
+
+	// Recover active tasks.
+	c.queue.Recover()
+
 	return c, nil
 }
 
@@ -294,11 +330,11 @@ func (wrap *redirectHandler) Handle(c *Crawler, u string, depth int, resp *http.
 		} else if resp.StatusCode > 300 && resp.StatusCode < 400 {
 			location := resp.Header.Get("Location")
 			if location != "" {
-				locationUrl, err := resp.Request.URL.Parse(location)
+				locationURL, err := resp.Request.URL.Parse(location)
 				if err != nil {
 					log.Printf("error parsing Location header: %v", err)
 				} else {
-					c.Enqueue(locationUrl, depth+1)
+					c.Enqueue(locationURL, depth+1)
 				}
 			}
 		} else {
diff --git a/queue.go b/queue.go
new file mode 100644
index 0000000..5bad577
--- /dev/null
+++ b/queue.go
@@ -0,0 +1,141 @@
+package crawl
+
+import (
+	"bytes"
+	"encoding/binary"
+	"errors"
+	"math/rand"
+	"sync/atomic"
+	"time"
+
+	"github.com/jmhodges/levigo"
+)
+
+type queue struct {
+	db        *gobDB
+	numActive int32
+}
+
+var (
+	queuePrefix  = []byte("queue")
+	activePrefix = []byte("queue_active")
+
+	queueKeySep   = []byte{'/'}
+	queueKeySepP1 = []byte{'/' + 1}
+)
+
+type queuePair struct {
+	key []byte
+
+	URL   string
+	Depth int
+}
+
+// Scan the pending queue and send items on 'ch'. Returns an error
+// when the queue is empty (work is done).
+func (q *queue) Scan(ch chan<- queuePair) error {
+	snap := q.db.NewSnapshot()
+	defer q.db.ReleaseSnapshot(snap)
+
+	ro := levigo.NewReadOptions()
+	ro.SetSnapshot(snap)
+	defer ro.Close()
+
+	n := 0
+	startKey, endKey := queueScanRange()
+	iter := q.db.NewRangeIterator(ro, startKey, endKey)
+
+	for ; iter.Valid(); iter.Next() {
+		var p queuePair
+		if err := iter.Value(&p); err != nil {
+			continue
+		}
+		p.key = iter.Key()
+		q.acquire(p)
+		ch <- p
+		n++
+	}
+
+	if n == 0 && q.numActive == 0 {
+		return errors.New("EOF")
+	}
+	return nil
+}
+
+// Add an item to the pending work queue.
+func (q *queue) Add(wb *levigo.WriteBatch, urlStr string, depth int, when time.Time) {
+	t := uint64(when.UnixNano())
+	qkey := bytes.Join([][]byte{queuePrefix, encodeUint64(t), encodeUint64(uint64(rand.Int63()))}, queueKeySep)
+	q.db.PutObjBatch(wb, qkey, &queuePair{URL: urlStr, Depth: depth})
+}
+
+func (q *queue) acquire(qp queuePair) {
+	wb := levigo.NewWriteBatch()
+	defer wb.Close()
+
+	q.db.PutObjBatch(wb, activeQueueKey(qp.key), qp)
+	wb.Delete(qp.key)
+
+	wo := levigo.NewWriteOptions()
+	defer wo.Close()
+	q.db.Write(wo, wb)
+
+	atomic.AddInt32(&q.numActive, 1)
+}
+
+// Release an item from the queue. Processing for this item is done.
+func (q *queue) Release(wb *levigo.WriteBatch, qp queuePair) {
+	wb.Delete(activeQueueKey(qp.key))
+	atomic.AddInt32(&q.numActive, -1)
+}
+
+// Retry processing this item at a later time.
+func (q *queue) Retry(wb *levigo.WriteBatch, qp queuePair, delay time.Duration) {
+	wb.Delete(activeQueueKey(qp.key))
+	q.Add(wb, qp.URL, qp.Depth, time.Now().Add(delay))
+	atomic.AddInt32(&q.numActive, -1)
+}
+
+// Recover moves all active tasks to the pending queue. To be
+// called at startup to recover tasks that were active when the
+// previous run terminated.
+func (q *queue) Recover() {
+	wb := levigo.NewWriteBatch()
+	defer wb.Close()
+
+	ro := levigo.NewReadOptions()
+	defer ro.Close()
+
+	prefix := bytes.Join([][]byte{activePrefix, []byte{}}, queueKeySep)
+	iter := q.db.NewPrefixIterator(ro, prefix)
+	for ; iter.Valid(); iter.Next() {
+		var p queuePair
+		if err := iter.Value(&p); err != nil {
+			continue
+		}
+		p.key = iter.Key()[len(activePrefix)+1:]
+		q.db.PutObjBatch(wb, p.key, &p)
+		wb.Delete(iter.Key())
+	}
+
+	wo := levigo.NewWriteOptions()
+	defer wo.Close()
+	q.db.Write(wo, wb)
+}
+
+func encodeUint64(n uint64) []byte {
+	var b bytes.Buffer
+	binary.Write(&b, binary.BigEndian, n)
+	return b.Bytes()
+}
+
+func activeQueueKey(key []byte) []byte {
+	return bytes.Join([][]byte{activePrefix, key}, queueKeySep)
+}
+
+func queueScanRange() ([]byte, []byte) {
+	tlim := uint64(time.Now().UnixNano() + 1)
+	startKey := bytes.Join([][]byte{queuePrefix, []byte{}}, queueKeySep)
+	endKey := bytes.Join([][]byte{queuePrefix, encodeUint64(tlim)}, queueKeySep)
+	return startKey, endKey
+}
diff --git a/scope.go b/scope.go
index ccba5f5..6a63018 100644
--- a/scope.go
+++ b/scope.go
@@ -7,7 +7,9 @@ import (
 	"strings"
 )
 
+// Scope defines the crawling scope.
 type Scope interface {
+	// Check a URL to see if it's in scope for crawling.
 	Check(*url.URL, int) bool
 }
 
@@ -48,14 +50,16 @@ func NewSchemeScope(schemes []string) Scope {
 // eventual "www." prefix.
 type URLPrefixMap map[string]struct{}
 
-func normalizeUrlPrefix(uri *url.URL) string {
+func normalizeURLPrefix(uri *url.URL) string {
 	return strings.TrimPrefix(uri.Host, "www.") + strings.TrimSuffix(uri.Path, "/")
 }
 
+// Add an URL to the prefix map.
 func (m URLPrefixMap) Add(uri *url.URL) {
-	m[normalizeUrlPrefix(uri)] = struct{}{}
+	m[normalizeURLPrefix(uri)] = struct{}{}
 }
 
+// Contains returns true if the given URL matches the prefix map.
 func (m URLPrefixMap) Contains(uri *url.URL) bool {
 	s := strings.TrimPrefix(uri.Host, "www.")
 	if _, ok := m[s]; ok {
@@ -111,6 +115,8 @@ func (s *regexpIgnoreScope) Check(uri *url.URL, depth int) bool {
 	return true
 }
 
+// NewRegexpIgnoreScope returns a Scope that filters out URLs
+// according to a list of regular expressions.
 func NewRegexpIgnoreScope(ignores []string) Scope {
 	if ignores == nil {
 		ignores = defaultIgnorePatterns
-- 
GitLab