Commit 9fbc656c authored by ale's avatar ale

improve queue code; golint fixes

The queuing code now performs proper lease accounting, and it will not
return a URL twice if the page load is slow.
parent 63bd51e0
...@@ -27,6 +27,8 @@ var ( ...@@ -27,6 +27,8 @@ var (
} }
) )
// GetLinks returns all the links found in a document. Currently only
// parses HTML pages and CSS stylesheets.
func GetLinks(resp *http.Response) ([]*url.URL, error) { func GetLinks(resp *http.Response) ([]*url.URL, error) {
var outlinks []string var outlinks []string
......
...@@ -40,6 +40,15 @@ func (db *gobDB) PutObj(wo *levigo.WriteOptions, key []byte, obj interface{}) er ...@@ -40,6 +40,15 @@ func (db *gobDB) PutObj(wo *levigo.WriteOptions, key []byte, obj interface{}) er
return db.Put(wo, key, b.Bytes()) return db.Put(wo, key, b.Bytes())
} }
func (db *gobDB) PutObjBatch(wb *levigo.WriteBatch, key []byte, obj interface{}) error {
var b bytes.Buffer
if err := gob.NewEncoder(&b).Encode(obj); err != nil {
return err
}
wb.Put(key, b.Bytes())
return nil
}
func (db *gobDB) GetObj(ro *levigo.ReadOptions, key []byte, obj interface{}) error { func (db *gobDB) GetObj(ro *levigo.ReadOptions, key []byte, obj interface{}) error {
data, err := db.Get(ro, key) data, err := db.Get(ro, key)
if err != nil { if err != nil {
...@@ -51,25 +60,61 @@ func (db *gobDB) GetObj(ro *levigo.ReadOptions, key []byte, obj interface{}) err ...@@ -51,25 +60,61 @@ func (db *gobDB) GetObj(ro *levigo.ReadOptions, key []byte, obj interface{}) err
return nil return nil
} }
func (db *gobDB) NewPrefixIterator(ro *levigo.ReadOptions, prefix []byte) *gobIterator { func (db *gobDB) NewPrefixIterator(ro *levigo.ReadOptions, prefix []byte) *gobPrefixIterator {
i := db.NewIterator(ro) i := db.NewIterator(ro)
i.Seek(prefix) i.Seek(prefix)
return &gobIterator{Iterator: i, prefix: prefix} return newGobPrefixIterator(i, prefix)
}
func (db *gobDB) NewRangeIterator(ro *levigo.ReadOptions, startKey, endKey []byte) *gobRangeIterator {
i := db.NewIterator(ro)
if startKey != nil {
i.Seek(startKey)
}
return newGobRangeIterator(i, endKey)
} }
type gobIterator struct { type gobIterator struct {
*levigo.Iterator *levigo.Iterator
}
func (i *gobIterator) Value(obj interface{}) error {
return gob.NewDecoder(bytes.NewBuffer(i.Iterator.Value())).Decode(obj)
}
type gobPrefixIterator struct {
*gobIterator
prefix []byte prefix []byte
} }
func (i *gobIterator) Valid() bool { func (i *gobPrefixIterator) Valid() bool {
return i.Iterator.Valid() && bytes.HasPrefix(i.Key(), i.prefix) return i.gobIterator.Valid() && bytes.HasPrefix(i.Key(), i.prefix)
} }
func (i *gobIterator) Value(obj interface{}) error { func newGobPrefixIterator(i *levigo.Iterator, prefix []byte) *gobPrefixIterator {
return gob.NewDecoder(bytes.NewBuffer(i.Iterator.Value())).Decode(obj) return &gobPrefixIterator{
gobIterator: &gobIterator{i},
prefix: prefix,
}
} }
type gobRangeIterator struct {
*gobIterator
endKey []byte
}
func (i *gobRangeIterator) Valid() bool {
return i.gobIterator.Valid() && (i.endKey == nil || bytes.Compare(i.Key(), i.endKey) < 0)
}
func newGobRangeIterator(i *levigo.Iterator, endKey []byte) *gobRangeIterator {
return &gobRangeIterator{
gobIterator: &gobIterator{i},
endKey: endKey,
}
}
// URLInfo stores information about a crawled URL.
type URLInfo struct { type URLInfo struct {
URL string URL string
StatusCode int StatusCode int
...@@ -79,11 +124,14 @@ type URLInfo struct { ...@@ -79,11 +124,14 @@ type URLInfo struct {
// A Fetcher retrieves contents from remote URLs. // A Fetcher retrieves contents from remote URLs.
type Fetcher interface { type Fetcher interface {
// Fetch retrieves a URL and returns the response.
Fetch(string) (*http.Response, error) Fetch(string) (*http.Response, error)
} }
// FetcherFunc wraps a simple function into the Fetcher interface.
type FetcherFunc func(string) (*http.Response, error) type FetcherFunc func(string) (*http.Response, error)
// Fetch retrieves a URL and returns the response.
func (f FetcherFunc) Fetch(u string) (*http.Response, error) { func (f FetcherFunc) Fetch(u string) (*http.Response, error) {
return f(u) return f(u)
} }
...@@ -92,11 +140,14 @@ func (f FetcherFunc) Fetch(u string) (*http.Response, error) { ...@@ -92,11 +140,14 @@ func (f FetcherFunc) Fetch(u string) (*http.Response, error) {
// implementations of this interface are considered permanent and will // implementations of this interface are considered permanent and will
// not cause the URL to be fetched again. // not cause the URL to be fetched again.
type Handler interface { type Handler interface {
// Handle the response from a URL.
Handle(*Crawler, string, int, *http.Response, error) error Handle(*Crawler, string, int, *http.Response, error) error
} }
// HandlerFunc wraps a function into the Handler interface.
type HandlerFunc func(*Crawler, string, int, *http.Response, error) error type HandlerFunc func(*Crawler, string, int, *http.Response, error) error
// Handle the response from a URL.
func (f HandlerFunc) Handle(db *Crawler, u string, depth int, resp *http.Response, err error) error { func (f HandlerFunc) Handle(db *Crawler, u string, depth int, resp *http.Response, err error) error {
return f(db, u, depth, resp, err) return f(db, u, depth, resp, err)
} }
...@@ -104,6 +155,7 @@ func (f HandlerFunc) Handle(db *Crawler, u string, depth int, resp *http.Respons ...@@ -104,6 +155,7 @@ func (f HandlerFunc) Handle(db *Crawler, u string, depth int, resp *http.Respons
// The Crawler object contains the crawler state. // The Crawler object contains the crawler state.
type Crawler struct { type Crawler struct {
db *gobDB db *gobDB
queue *queue
seeds []*url.URL seeds []*url.URL
scopes []Scope scopes []Scope
fetcher Fetcher fetcher Fetcher
...@@ -112,12 +164,6 @@ type Crawler struct { ...@@ -112,12 +164,6 @@ type Crawler struct {
enqueueMx sync.Mutex enqueueMx sync.Mutex
} }
type queuePair struct {
Key []byte
URL string
Depth int
}
// Enqueue a (possibly new) URL for processing. // Enqueue a (possibly new) URL for processing.
func (c *Crawler) Enqueue(u *url.URL, depth int) { func (c *Crawler) Enqueue(u *url.URL, depth int) {
// Normalize the URL. // Normalize the URL.
...@@ -143,47 +189,24 @@ func (c *Crawler) Enqueue(u *url.URL, depth int) { ...@@ -143,47 +189,24 @@ func (c *Crawler) Enqueue(u *url.URL, depth int) {
return return
} }
// Create a unique key using the URL and the current timestamp.
qkey := []byte(fmt.Sprintf("queue/%d/%s", time.Now().Unix(), urlStr))
// Store the URL in the queue, and store an empty URLInfo to // Store the URL in the queue, and store an empty URLInfo to
// make sure that subsequent calls to Enqueue with the same // make sure that subsequent calls to Enqueue with the same
// URL will fail. // URL will fail.
wb := levigo.NewWriteBatch()
defer wb.Close()
c.queue.Add(wb, urlStr, depth, time.Now())
c.db.PutObjBatch(wb, ukey, &info)
wo := levigo.NewWriteOptions() wo := levigo.NewWriteOptions()
defer wo.Close() defer wo.Close()
c.db.PutObj(wo, qkey, &queuePair{Key: qkey, URL: urlStr, Depth: depth}) c.db.Write(wo, wb)
c.db.PutObj(wo, ukey, &info)
} }
// Scan the queue for URLs until there are no more. // Scan the queue for URLs until there are no more.
func (c *Crawler) process() <-chan queuePair { func (c *Crawler) process() <-chan queuePair {
ch := make(chan queuePair) ch := make(chan queuePair)
go func() { go func() {
queuePrefix := []byte("queue/")
for range time.Tick(2 * time.Second) { for range time.Tick(2 * time.Second) {
n := 0 if err := c.queue.Scan(ch); err != nil {
// Scan the queue using a snapshot, to ignore
// new URLs that might be added after this.
s := c.db.NewSnapshot()
ro := levigo.NewReadOptions()
ro.SetSnapshot(s)
iter := c.db.NewPrefixIterator(ro, queuePrefix)
for ; iter.Valid(); iter.Next() {
var p queuePair
if err := iter.Value(&p); err != nil {
continue
}
ch <- p
n++
}
iter.Close()
ro.Close()
c.db.ReleaseSnapshot(s)
if n == 0 {
break break
} }
} }
...@@ -218,20 +241,27 @@ func (c *Crawler) urlHandler(queue <-chan queuePair) { ...@@ -218,20 +241,27 @@ func (c *Crawler) urlHandler(queue <-chan queuePair) {
// Invoke the handler (even if the fetcher errored out). // Invoke the handler (even if the fetcher errored out).
info.Error = c.handler.Handle(c, p.URL, p.Depth, httpResp, httpErr) info.Error = c.handler.Handle(c, p.URL, p.Depth, httpResp, httpErr)
wo := levigo.NewWriteOptions() wb := levigo.NewWriteBatch()
if httpErr == nil { if httpErr == nil {
respBody.Close() respBody.Close()
// Remove the URL from the queue if the fetcher was successful. // Remove the URL from the queue if the fetcher was successful.
c.db.Delete(wo, p.Key) c.queue.Release(wb, p)
} else { } else {
log.Printf("error retrieving %s: %v", p.URL, httpErr) log.Printf("error retrieving %s: %v", p.URL, httpErr)
c.queue.Retry(wb, p, 300*time.Second)
} }
c.db.PutObj(wo, urlkey, &info)
c.db.PutObjBatch(wb, urlkey, &info)
wo := levigo.NewWriteOptions()
c.db.Write(wo, wb)
wo.Close() wo.Close()
wb.Close()
} }
} }
// MustParseURLs parses a list of URLs and aborts on failure.
func MustParseURLs(urls []string) []*url.URL { func MustParseURLs(urls []string) []*url.URL {
// Parse the seed URLs. // Parse the seed URLs.
var parsed []*url.URL var parsed []*url.URL
...@@ -252,13 +282,19 @@ func NewCrawler(path string, seeds []*url.URL, scopes []Scope, f Fetcher, h Hand ...@@ -252,13 +282,19 @@ func NewCrawler(path string, seeds []*url.URL, scopes []Scope, f Fetcher, h Hand
if err != nil { if err != nil {
return nil, err return nil, err
} }
c := &Crawler{ c := &Crawler{
db: db, db: db,
queue: &queue{db: db},
fetcher: f, fetcher: f,
handler: h, handler: h,
seeds: seeds, seeds: seeds,
scopes: scopes, scopes: scopes,
} }
// Recover active tasks.
c.queue.Recover()
return c, nil return c, nil
} }
...@@ -294,11 +330,11 @@ func (wrap *redirectHandler) Handle(c *Crawler, u string, depth int, resp *http. ...@@ -294,11 +330,11 @@ func (wrap *redirectHandler) Handle(c *Crawler, u string, depth int, resp *http.
} else if resp.StatusCode > 300 && resp.StatusCode < 400 { } else if resp.StatusCode > 300 && resp.StatusCode < 400 {
location := resp.Header.Get("Location") location := resp.Header.Get("Location")
if location != "" { if location != "" {
locationUrl, err := resp.Request.URL.Parse(location) locationURL, err := resp.Request.URL.Parse(location)
if err != nil { if err != nil {
log.Printf("error parsing Location header: %v", err) log.Printf("error parsing Location header: %v", err)
} else { } else {
c.Enqueue(locationUrl, depth+1) c.Enqueue(locationURL, depth+1)
} }
} }
} else { } else {
......
package crawl
import (
"bytes"
"encoding/binary"
"errors"
"math/rand"
"sync/atomic"
"time"
"github.com/jmhodges/levigo"
)
type queue struct {
db *gobDB
numActive int32
}
var (
queuePrefix = []byte("queue")
activePrefix = []byte("queue_active")
queueKeySep = []byte{'/'}
queueKeySepP1 = []byte{'/' + 1}
)
type queuePair struct {
key []byte
URL string
Depth int
}
// Scan the pending queue and send items on 'ch'. Returns an error
// when the queue is empty (work is done).
func (q *queue) Scan(ch chan<- queuePair) error {
snap := q.db.NewSnapshot()
defer q.db.ReleaseSnapshot(snap)
ro := levigo.NewReadOptions()
ro.SetSnapshot(snap)
defer ro.Close()
n := 0
startKey, endKey := queueScanRange()
iter := q.db.NewRangeIterator(ro, startKey, endKey)
for ; iter.Valid(); iter.Next() {
var p queuePair
if err := iter.Value(&p); err != nil {
continue
}
p.key = iter.Key()
q.acquire(p)
ch <- p
n++
}
if n == 0 && q.numActive == 0 {
return errors.New("EOF")
}
return nil
}
// Add an item to the pending work queue.
func (q *queue) Add(wb *levigo.WriteBatch, urlStr string, depth int, when time.Time) {
t := uint64(when.UnixNano())
qkey := bytes.Join([][]byte{queuePrefix, encodeUint64(t), encodeUint64(uint64(rand.Int63()))}, queueKeySep)
q.db.PutObjBatch(wb, qkey, &queuePair{URL: urlStr, Depth: depth})
}
func (q *queue) acquire(qp queuePair) {
wb := levigo.NewWriteBatch()
defer wb.Close()
q.db.PutObjBatch(wb, activeQueueKey(qp.key), qp)
wb.Delete(qp.key)
wo := levigo.NewWriteOptions()
defer wo.Close()
q.db.Write(wo, wb)
atomic.AddInt32(&q.numActive, 1)
}
// Release an item from the queue. Processing for this item is done.
func (q *queue) Release(wb *levigo.WriteBatch, qp queuePair) {
wb.Delete(activeQueueKey(qp.key))
atomic.AddInt32(&q.numActive, -1)
}
// Retry processing this item at a later time.
func (q *queue) Retry(wb *levigo.WriteBatch, qp queuePair, delay time.Duration) {
wb.Delete(activeQueueKey(qp.key))
q.Add(wb, qp.URL, qp.Depth, time.Now().Add(delay))
atomic.AddInt32(&q.numActive, -1)
}
// Recover moves all active tasks to the pending queue. To be
// called at startup to recover tasks that were active when the
// previous run terminated.
func (q *queue) Recover() {
wb := levigo.NewWriteBatch()
defer wb.Close()
ro := levigo.NewReadOptions()
defer ro.Close()
prefix := bytes.Join([][]byte{activePrefix, []byte{}}, queueKeySep)
iter := q.db.NewPrefixIterator(ro, prefix)
for ; iter.Valid(); iter.Next() {
var p queuePair
if err := iter.Value(&p); err != nil {
continue
}
p.key = iter.Key()[len(activePrefix)+1:]
q.db.PutObjBatch(wb, p.key, &p)
wb.Delete(iter.Key())
}
wo := levigo.NewWriteOptions()
defer wo.Close()
q.db.Write(wo, wb)
}
func encodeUint64(n uint64) []byte {
var b bytes.Buffer
binary.Write(&b, binary.BigEndian, n)
return b.Bytes()
}
func activeQueueKey(key []byte) []byte {
return bytes.Join([][]byte{activePrefix, key}, queueKeySep)
}
func queueScanRange() ([]byte, []byte) {
tlim := uint64(time.Now().UnixNano() + 1)
startKey := bytes.Join([][]byte{queuePrefix, []byte{}}, queueKeySep)
endKey := bytes.Join([][]byte{queuePrefix, encodeUint64(tlim)}, queueKeySep)
return startKey, endKey
}
...@@ -7,7 +7,9 @@ import ( ...@@ -7,7 +7,9 @@ import (
"strings" "strings"
) )
// Scope defines the crawling scope.
type Scope interface { type Scope interface {
// Check a URL to see if it's in scope for crawling.
Check(*url.URL, int) bool Check(*url.URL, int) bool
} }
...@@ -48,14 +50,16 @@ func NewSchemeScope(schemes []string) Scope { ...@@ -48,14 +50,16 @@ func NewSchemeScope(schemes []string) Scope {
// eventual "www." prefix. // eventual "www." prefix.
type URLPrefixMap map[string]struct{} type URLPrefixMap map[string]struct{}
func normalizeUrlPrefix(uri *url.URL) string { func normalizeURLPrefix(uri *url.URL) string {
return strings.TrimPrefix(uri.Host, "www.") + strings.TrimSuffix(uri.Path, "/") return strings.TrimPrefix(uri.Host, "www.") + strings.TrimSuffix(uri.Path, "/")
} }
// Add an URL to the prefix map.
func (m URLPrefixMap) Add(uri *url.URL) { func (m URLPrefixMap) Add(uri *url.URL) {
m[normalizeUrlPrefix(uri)] = struct{}{} m[normalizeURLPrefix(uri)] = struct{}{}
} }
// Contains returns true if the given URL matches the prefix map.
func (m URLPrefixMap) Contains(uri *url.URL) bool { func (m URLPrefixMap) Contains(uri *url.URL) bool {
s := strings.TrimPrefix(uri.Host, "www.") s := strings.TrimPrefix(uri.Host, "www.")
if _, ok := m[s]; ok { if _, ok := m[s]; ok {
...@@ -111,6 +115,8 @@ func (s *regexpIgnoreScope) Check(uri *url.URL, depth int) bool { ...@@ -111,6 +115,8 @@ func (s *regexpIgnoreScope) Check(uri *url.URL, depth int) bool {
return true return true
} }
// NewRegexpIgnoreScope returns a Scope that filters out URLs
// according to a list of regular expressions.
func NewRegexpIgnoreScope(ignores []string) Scope { func NewRegexpIgnoreScope(ignores []string) Scope {
if ignores == nil { if ignores == nil {
ignores = defaultIgnorePatterns ignores = defaultIgnorePatterns
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment