crawler.go 8.45 KB
Newer Older
ale's avatar
ale committed
1 2 3 4 5 6 7
package crawl

import (
	"bytes"
	"encoding/gob"
	"errors"
	"fmt"
8
	"io"
ale's avatar
ale committed
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
	"log"
	"net/http"
	"net/url"
	"sync"
	"time"

	"github.com/PuerkitoBio/purell"
	"github.com/jmhodges/levigo"
)

type gobDB struct {
	*levigo.DB
}

func newGobDB(path string) (*gobDB, error) {
	opts := levigo.NewOptions()
	opts.SetCreateIfMissing(true)
	opts.SetCache(levigo.NewLRUCache(2 << 20))
	opts.SetFilterPolicy(levigo.NewBloomFilter(10))
	db, err := levigo.Open(path, opts)
	if err != nil {
		return nil, err
	}
	return &gobDB{db}, nil
}

func (db *gobDB) PutObj(wo *levigo.WriteOptions, key []byte, obj interface{}) error {
	var b bytes.Buffer
	if err := gob.NewEncoder(&b).Encode(obj); err != nil {
		return err
	}
	return db.Put(wo, key, b.Bytes())
}

ale's avatar
ale committed
43 44 45 46 47 48 49 50 51
func (db *gobDB) PutObjBatch(wb *levigo.WriteBatch, key []byte, obj interface{}) error {
	var b bytes.Buffer
	if err := gob.NewEncoder(&b).Encode(obj); err != nil {
		return err
	}
	wb.Put(key, b.Bytes())
	return nil
}

ale's avatar
ale committed
52 53 54 55 56 57 58 59 60 61 62
func (db *gobDB) GetObj(ro *levigo.ReadOptions, key []byte, obj interface{}) error {
	data, err := db.Get(ro, key)
	if err != nil {
		return err
	}
	if err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(obj); err != nil {
		return err
	}
	return nil
}

ale's avatar
ale committed
63
func (db *gobDB) NewPrefixIterator(ro *levigo.ReadOptions, prefix []byte) *gobPrefixIterator {
ale's avatar
ale committed
64 65
	i := db.NewIterator(ro)
	i.Seek(prefix)
ale's avatar
ale committed
66 67 68 69 70 71 72 73 74
	return newGobPrefixIterator(i, prefix)
}

func (db *gobDB) NewRangeIterator(ro *levigo.ReadOptions, startKey, endKey []byte) *gobRangeIterator {
	i := db.NewIterator(ro)
	if startKey != nil {
		i.Seek(startKey)
	}
	return newGobRangeIterator(i, endKey)
ale's avatar
ale committed
75 76 77 78
}

type gobIterator struct {
	*levigo.Iterator
ale's avatar
ale committed
79 80 81 82 83 84 85 86
}

func (i *gobIterator) Value(obj interface{}) error {
	return gob.NewDecoder(bytes.NewBuffer(i.Iterator.Value())).Decode(obj)
}

type gobPrefixIterator struct {
	*gobIterator
ale's avatar
ale committed
87 88 89
	prefix []byte
}

ale's avatar
ale committed
90 91
func (i *gobPrefixIterator) Valid() bool {
	return i.gobIterator.Valid() && bytes.HasPrefix(i.Key(), i.prefix)
ale's avatar
ale committed
92 93
}

ale's avatar
ale committed
94 95 96 97 98
func newGobPrefixIterator(i *levigo.Iterator, prefix []byte) *gobPrefixIterator {
	return &gobPrefixIterator{
		gobIterator: &gobIterator{i},
		prefix:      prefix,
	}
ale's avatar
ale committed
99 100
}

ale's avatar
ale committed
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
type gobRangeIterator struct {
	*gobIterator
	endKey []byte
}

func (i *gobRangeIterator) Valid() bool {
	return i.gobIterator.Valid() && (i.endKey == nil || bytes.Compare(i.Key(), i.endKey) < 0)
}

func newGobRangeIterator(i *levigo.Iterator, endKey []byte) *gobRangeIterator {
	return &gobRangeIterator{
		gobIterator: &gobIterator{i},
		endKey:      endKey,
	}
}

// URLInfo stores information about a crawled URL.
ale's avatar
ale committed
118 119 120 121 122 123 124
type URLInfo struct {
	URL        string
	StatusCode int
	CrawledAt  time.Time
	Error      error
}

125
// A Fetcher retrieves contents from remote URLs.
ale's avatar
ale committed
126
type Fetcher interface {
ale's avatar
ale committed
127
	// Fetch retrieves a URL and returns the response.
ale's avatar
ale committed
128 129 130
	Fetch(string) (*http.Response, error)
}

ale's avatar
ale committed
131
// FetcherFunc wraps a simple function into the Fetcher interface.
ale's avatar
ale committed
132 133
type FetcherFunc func(string) (*http.Response, error)

ale's avatar
ale committed
134
// Fetch retrieves a URL and returns the response.
ale's avatar
ale committed
135 136 137 138
func (f FetcherFunc) Fetch(u string) (*http.Response, error) {
	return f(u)
}

139 140 141
// A Handler processes crawled contents. Any errors returned by public
// implementations of this interface are considered permanent and will
// not cause the URL to be fetched again.
ale's avatar
ale committed
142
type Handler interface {
ale's avatar
ale committed
143
	// Handle the response from a URL.
ale's avatar
ale committed
144 145 146
	Handle(*Crawler, string, int, *http.Response, error) error
}

ale's avatar
ale committed
147
// HandlerFunc wraps a function into the Handler interface.
ale's avatar
ale committed
148 149
type HandlerFunc func(*Crawler, string, int, *http.Response, error) error

ale's avatar
ale committed
150
// Handle the response from a URL.
ale's avatar
ale committed
151 152 153 154
func (f HandlerFunc) Handle(db *Crawler, u string, depth int, resp *http.Response, err error) error {
	return f(db, u, depth, resp, err)
}

155
// The Crawler object contains the crawler state.
ale's avatar
ale committed
156 157
type Crawler struct {
	db      *gobDB
ale's avatar
ale committed
158
	queue   *queue
ale's avatar
ale committed
159
	seeds   []*url.URL
ale's avatar
ale committed
160
	scopes  []Scope
ale's avatar
ale committed
161 162 163 164 165 166 167 168 169 170 171
	fetcher Fetcher
	handler Handler

	enqueueMx sync.Mutex
}

// Enqueue a (possibly new) URL for processing.
func (c *Crawler) Enqueue(u *url.URL, depth int) {
	// Normalize the URL.
	urlStr := purell.NormalizeURL(u, purell.FlagsSafe|purell.FlagRemoveDotSegments|purell.FlagRemoveDuplicateSlashes|purell.FlagRemoveFragment|purell.FlagRemoveDirectoryIndex|purell.FlagSortQuery)

ale's avatar
ale committed
172 173 174 175 176
	// See if it's in scope. Checks are ANDed.
	for _, sc := range c.scopes {
		if !sc.Check(u, depth) {
			return
		}
ale's avatar
ale committed
177 178
	}

ale's avatar
ale committed
179
	// Protect the read-modify-update below with a mutex.
ale's avatar
ale committed
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
	c.enqueueMx.Lock()
	defer c.enqueueMx.Unlock()

	// Check if we've already seen it.
	var info URLInfo
	ro := levigo.NewReadOptions()
	defer ro.Close()
	ukey := []byte(fmt.Sprintf("url/%s", urlStr))
	if err := c.db.GetObj(ro, ukey, &info); err == nil {
		return
	}

	// Store the URL in the queue, and store an empty URLInfo to
	// make sure that subsequent calls to Enqueue with the same
	// URL will fail.
ale's avatar
ale committed
195 196 197 198
	wb := levigo.NewWriteBatch()
	defer wb.Close()
	c.queue.Add(wb, urlStr, depth, time.Now())
	c.db.PutObjBatch(wb, ukey, &info)
ale's avatar
ale committed
199 200
	wo := levigo.NewWriteOptions()
	defer wo.Close()
ale's avatar
ale committed
201
	c.db.Write(wo, wb)
ale's avatar
ale committed
202 203 204
}

// Scan the queue for URLs until there are no more.
205 206
func (c *Crawler) process() <-chan queuePair {
	ch := make(chan queuePair)
ale's avatar
ale committed
207 208
	go func() {
		for range time.Tick(2 * time.Second) {
ale's avatar
ale committed
209
			if err := c.queue.Scan(ch); err != nil {
ale's avatar
ale committed
210 211 212 213 214 215 216 217 218
				break
			}
		}
		close(ch)
	}()
	return ch
}

// Main worker loop.
219
func (c *Crawler) urlHandler(queue <-chan queuePair) {
ale's avatar
ale committed
220
	for p := range queue {
221 222 223 224 225 226 227 228 229
		// Retrieve the URLInfo object from the crawl db.
		// Ignore errors, we can work with an empty object.
		urlkey := []byte(fmt.Sprintf("url/%s", p.URL))
		var info URLInfo
		ro := levigo.NewReadOptions()
		c.db.GetObj(ro, urlkey, &info)
		info.CrawledAt = time.Now()
		info.URL = p.URL

ale's avatar
ale committed
230 231 232 233 234
		// Fetch the URL and handle it. Make sure to Close the
		// response body (even if it gets replaced in the
		// Response object).
		fmt.Printf("%s\n", p.URL)
		httpResp, httpErr := c.fetcher.Fetch(p.URL)
235
		var respBody io.ReadCloser
ale's avatar
ale committed
236
		if httpErr == nil {
237 238
			respBody = httpResp.Body
			info.StatusCode = httpResp.StatusCode
ale's avatar
ale committed
239 240
		}

241 242 243
		// Invoke the handler (even if the fetcher errored out).
		info.Error = c.handler.Handle(c, p.URL, p.Depth, httpResp, httpErr)

ale's avatar
ale committed
244
		wb := levigo.NewWriteBatch()
245 246 247 248
		if httpErr == nil {
			respBody.Close()

			// Remove the URL from the queue if the fetcher was successful.
ale's avatar
ale committed
249
			c.queue.Release(wb, p)
ale's avatar
ale committed
250
		} else {
251
			log.Printf("error retrieving %s: %v", p.URL, httpErr)
ale's avatar
ale committed
252
			c.queue.Retry(wb, p, 300*time.Second)
ale's avatar
ale committed
253
		}
ale's avatar
ale committed
254 255 256 257 258

		c.db.PutObjBatch(wb, urlkey, &info)

		wo := levigo.NewWriteOptions()
		c.db.Write(wo, wb)
259
		wo.Close()
ale's avatar
ale committed
260
		wb.Close()
ale's avatar
ale committed
261 262 263
	}
}

ale's avatar
ale committed
264
// MustParseURLs parses a list of URLs and aborts on failure.
ale's avatar
ale committed
265 266 267 268 269 270
func MustParseURLs(urls []string) []*url.URL {
	// Parse the seed URLs.
	var parsed []*url.URL
	for _, s := range urls {
		u, err := url.Parse(s)
		if err != nil {
271
			log.Fatalf("error parsing URL \"%s\": %v", s, err)
ale's avatar
ale committed
272 273 274 275 276 277 278
		}
		parsed = append(parsed, u)
	}
	return parsed
}

// NewCrawler creates a new Crawler object with the specified behavior.
ale's avatar
ale committed
279
func NewCrawler(path string, seeds []*url.URL, scopes []Scope, f Fetcher, h Handler) (*Crawler, error) {
ale's avatar
ale committed
280 281 282 283 284
	// Open the crawl database.
	db, err := newGobDB(path)
	if err != nil {
		return nil, err
	}
ale's avatar
ale committed
285

ale's avatar
ale committed
286 287
	c := &Crawler{
		db:      db,
ale's avatar
ale committed
288
		queue:   &queue{db: db},
ale's avatar
ale committed
289
		fetcher: f,
290
		handler: h,
ale's avatar
ale committed
291
		seeds:   seeds,
ale's avatar
ale committed
292
		scopes:  scopes,
ale's avatar
ale committed
293
	}
ale's avatar
ale committed
294 295 296 297

	// Recover active tasks.
	c.queue.Recover()

ale's avatar
ale committed
298 299 300
	return c, nil
}

301 302 303
// Run the crawl with the specified number of workers. This function
// does not exit until all work is done (no URLs left in the queue).
func (c *Crawler) Run(concurrency int) {
ale's avatar
ale committed
304 305 306 307 308 309 310 311
	// Load initial seeds into the queue.
	for _, u := range c.seeds {
		c.Enqueue(u, 0)
	}

	// Start some runners and wait until they're done.
	var wg sync.WaitGroup
	ch := c.process()
312
	for i := 0; i < concurrency; i++ {
ale's avatar
ale committed
313 314 315 316 317 318 319 320 321
		wg.Add(1)
		go func() {
			c.urlHandler(ch)
			wg.Done()
		}()
	}
	wg.Wait()
}

322
type redirectHandler struct {
ale's avatar
ale committed
323 324 325
	h Handler
}

326
func (wrap *redirectHandler) Handle(c *Crawler, u string, depth int, resp *http.Response, err error) error {
ale's avatar
ale committed
327 328 329 330 331 332
	if err == nil {
		if resp.StatusCode == 200 {
			err = wrap.h.Handle(c, u, depth, resp, err)
		} else if resp.StatusCode > 300 && resp.StatusCode < 400 {
			location := resp.Header.Get("Location")
			if location != "" {
ale's avatar
ale committed
333
				locationURL, err := resp.Request.URL.Parse(location)
ale's avatar
ale committed
334 335 336
				if err != nil {
					log.Printf("error parsing Location header: %v", err)
				} else {
ale's avatar
ale committed
337
					c.Enqueue(locationURL, depth+1)
ale's avatar
ale committed
338 339 340 341 342 343
				}
			}
		} else {
			err = errors.New(resp.Status)
		}
	}
344 345
	return err
}
ale's avatar
ale committed
346

347 348 349 350
// NewRedirectHandler returns a Handler that follows HTTP redirects,
// and will call the wrapped handler on every request with HTTP status 200.
func NewRedirectHandler(wrap Handler) Handler {
	return &redirectHandler{wrap}
ale's avatar
ale committed
351
}