Skip to content
Snippets Groups Projects
Commit 3af8c9a2 authored by ale's avatar ale
Browse files

move URLInfo logic into the Crawler itself

parent 7c1b1f70
Branches
No related tags found
No related merge requests found
...@@ -135,7 +135,7 @@ func main() { ...@@ -135,7 +135,7 @@ func main() {
saver := NewSaveHandler(w) saver := NewSaveHandler(w)
crawler, err := crawl.NewCrawler("crawldb", seeds, scope, crawl.FetcherFunc(http.Get), saver) crawler, err := crawl.NewCrawler("crawldb", seeds, scope, crawl.FetcherFunc(http.Get), crawl.NewRedirectHandler(saver))
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
......
...@@ -44,7 +44,7 @@ func main() { ...@@ -44,7 +44,7 @@ func main() {
crawl.NewSeedScope(seeds), crawl.NewSeedScope(seeds),
} }
crawler, err := crawl.NewCrawler("crawldb", seeds, scope, crawl.FetcherFunc(http.Get), crawl.HandlerFunc(extractLinks)) crawler, err := crawl.NewCrawler("crawldb", seeds, scope, crawl.FetcherFunc(http.Get), crawl.NewRedirectHandler(crawl.HandlerFunc(extractLinks)))
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
......
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"encoding/gob" "encoding/gob"
"errors" "errors"
"fmt" "fmt"
"io"
"log" "log"
"net/http" "net/http"
"net/url" "net/url"
...@@ -76,6 +77,7 @@ type URLInfo struct { ...@@ -76,6 +77,7 @@ type URLInfo struct {
Error error Error error
} }
// A Fetcher retrieves contents from remote URLs.
type Fetcher interface { type Fetcher interface {
Fetch(string) (*http.Response, error) Fetch(string) (*http.Response, error)
} }
...@@ -86,6 +88,9 @@ func (f FetcherFunc) Fetch(u string) (*http.Response, error) { ...@@ -86,6 +88,9 @@ func (f FetcherFunc) Fetch(u string) (*http.Response, error) {
return f(u) return f(u)
} }
// A Handler processes crawled contents. Any errors returned by public
// implementations of this interface are considered permanent and will
// not cause the URL to be fetched again.
type Handler interface { type Handler interface {
Handle(*Crawler, string, int, *http.Response, error) error Handle(*Crawler, string, int, *http.Response, error) error
} }
...@@ -96,7 +101,7 @@ func (f HandlerFunc) Handle(db *Crawler, u string, depth int, resp *http.Respons ...@@ -96,7 +101,7 @@ func (f HandlerFunc) Handle(db *Crawler, u string, depth int, resp *http.Respons
return f(db, u, depth, resp, err) return f(db, u, depth, resp, err)
} }
// UrlDb is the database of crawled URLs (either pending or done). // The Crawler object contains the crawler state.
type Crawler struct { type Crawler struct {
db *gobDB db *gobDB
seeds []*url.URL seeds []*url.URL
...@@ -107,19 +112,12 @@ type Crawler struct { ...@@ -107,19 +112,12 @@ type Crawler struct {
enqueueMx sync.Mutex enqueueMx sync.Mutex
} }
type QueuePair struct { type queuePair struct {
Key []byte Key []byte
URL string URL string
Depth int Depth int
} }
// Update this URLInfo entry in the crawl database.
func (c *Crawler) UpdateURL(info *URLInfo) error {
wo := levigo.NewWriteOptions()
defer wo.Close()
return c.db.PutObj(wo, []byte(fmt.Sprintf("url/%s", info.URL)), info)
}
// Enqueue a (possibly new) URL for processing. // Enqueue a (possibly new) URL for processing.
func (c *Crawler) Enqueue(u *url.URL, depth int) { func (c *Crawler) Enqueue(u *url.URL, depth int) {
// Normalize the URL. // Normalize the URL.
...@@ -153,13 +151,13 @@ func (c *Crawler) Enqueue(u *url.URL, depth int) { ...@@ -153,13 +151,13 @@ func (c *Crawler) Enqueue(u *url.URL, depth int) {
// URL will fail. // URL will fail.
wo := levigo.NewWriteOptions() wo := levigo.NewWriteOptions()
defer wo.Close() defer wo.Close()
c.db.PutObj(wo, qkey, &QueuePair{Key: qkey, URL: urlStr, Depth: depth}) c.db.PutObj(wo, qkey, &queuePair{Key: qkey, URL: urlStr, Depth: depth})
c.db.PutObj(wo, ukey, &info) c.db.PutObj(wo, ukey, &info)
} }
// Scan the queue for URLs until there are no more. // Scan the queue for URLs until there are no more.
func (c *Crawler) process() <-chan QueuePair { func (c *Crawler) process() <-chan queuePair {
ch := make(chan QueuePair) ch := make(chan queuePair)
go func() { go func() {
queuePrefix := []byte("queue/") queuePrefix := []byte("queue/")
for range time.Tick(2 * time.Second) { for range time.Tick(2 * time.Second) {
...@@ -173,7 +171,7 @@ func (c *Crawler) process() <-chan QueuePair { ...@@ -173,7 +171,7 @@ func (c *Crawler) process() <-chan QueuePair {
iter := c.db.NewPrefixIterator(ro, queuePrefix) iter := c.db.NewPrefixIterator(ro, queuePrefix)
for ; iter.Valid(); iter.Next() { for ; iter.Valid(); iter.Next() {
var p QueuePair var p queuePair
if err := iter.Value(&p); err != nil { if err := iter.Value(&p); err != nil {
continue continue
} }
...@@ -195,27 +193,42 @@ func (c *Crawler) process() <-chan QueuePair { ...@@ -195,27 +193,42 @@ func (c *Crawler) process() <-chan QueuePair {
} }
// Main worker loop. // Main worker loop.
func (c *Crawler) urlHandler(queue <-chan QueuePair) { func (c *Crawler) urlHandler(queue <-chan queuePair) {
for p := range queue { for p := range queue {
// Retrieve the URLInfo object from the crawl db.
// Ignore errors, we can work with an empty object.
urlkey := []byte(fmt.Sprintf("url/%s", p.URL))
var info URLInfo
ro := levigo.NewReadOptions()
c.db.GetObj(ro, urlkey, &info)
info.CrawledAt = time.Now()
info.URL = p.URL
// Fetch the URL and handle it. Make sure to Close the // Fetch the URL and handle it. Make sure to Close the
// response body (even if it gets replaced in the // response body (even if it gets replaced in the
// Response object). // Response object).
fmt.Printf("%s\n", p.URL) fmt.Printf("%s\n", p.URL)
httpResp, httpErr := c.fetcher.Fetch(p.URL) httpResp, httpErr := c.fetcher.Fetch(p.URL)
respBody := httpResp.Body var respBody io.ReadCloser
err := c.handler.Handle(c, p.URL, p.Depth, httpResp, httpErr)
if httpErr == nil { if httpErr == nil {
respBody.Close() respBody = httpResp.Body
info.StatusCode = httpResp.StatusCode
} }
// Remove the URL from the queue if the handler was successful. // Invoke the handler (even if the fetcher errored out).
if err == nil { info.Error = c.handler.Handle(c, p.URL, p.Depth, httpResp, httpErr)
wo := levigo.NewWriteOptions()
wo := levigo.NewWriteOptions()
if httpErr == nil {
respBody.Close()
// Remove the URL from the queue if the fetcher was successful.
c.db.Delete(wo, p.Key) c.db.Delete(wo, p.Key)
wo.Close()
} else { } else {
log.Printf("error handling %s: %v", p.URL, err) log.Printf("error retrieving %s: %v", p.URL, httpErr)
} }
c.db.PutObj(wo, urlkey, &info)
wo.Close()
} }
} }
...@@ -225,7 +238,7 @@ func MustParseURLs(urls []string) []*url.URL { ...@@ -225,7 +238,7 @@ func MustParseURLs(urls []string) []*url.URL {
for _, s := range urls { for _, s := range urls {
u, err := url.Parse(s) u, err := url.Parse(s)
if err != nil { if err != nil {
log.Fatalf("error parsing seed \"%s\": %v", s, err) log.Fatalf("error parsing URL \"%s\": %v", s, err)
} }
parsed = append(parsed, u) parsed = append(parsed, u)
} }
...@@ -242,7 +255,7 @@ func NewCrawler(path string, seeds []*url.URL, scopes []Scope, f Fetcher, h Hand ...@@ -242,7 +255,7 @@ func NewCrawler(path string, seeds []*url.URL, scopes []Scope, f Fetcher, h Hand
c := &Crawler{ c := &Crawler{
db: db, db: db,
fetcher: f, fetcher: f,
handler: &standardPageHandler{h}, handler: h,
seeds: seeds, seeds: seeds,
scopes: scopes, scopes: scopes,
} }
...@@ -270,16 +283,12 @@ func (c *Crawler) Run(concurrency int) { ...@@ -270,16 +283,12 @@ func (c *Crawler) Run(concurrency int) {
wg.Wait() wg.Wait()
} }
// Standard page handler, follows redirects and invokes a child type redirectHandler struct {
// handler when status == 200.
type standardPageHandler struct {
h Handler h Handler
} }
func (wrap *standardPageHandler) Handle(c *Crawler, u string, depth int, resp *http.Response, err error) error { func (wrap *redirectHandler) Handle(c *Crawler, u string, depth int, resp *http.Response, err error) error {
info := &URLInfo{URL: u, CrawledAt: time.Now()}
if err == nil { if err == nil {
info.StatusCode = resp.StatusCode
if resp.StatusCode == 200 { if resp.StatusCode == 200 {
err = wrap.h.Handle(c, u, depth, resp, err) err = wrap.h.Handle(c, u, depth, resp, err)
} else if resp.StatusCode > 300 && resp.StatusCode < 400 { } else if resp.StatusCode > 300 && resp.StatusCode < 400 {
...@@ -296,8 +305,11 @@ func (wrap *standardPageHandler) Handle(c *Crawler, u string, depth int, resp *h ...@@ -296,8 +305,11 @@ func (wrap *standardPageHandler) Handle(c *Crawler, u string, depth int, resp *h
err = errors.New(resp.Status) err = errors.New(resp.Status)
} }
} }
info.Error = err return err
}
c.UpdateURL(info) // NewRedirectHandler returns a Handler that follows HTTP redirects,
return nil // and will call the wrapped handler on every request with HTTP status 200.
func NewRedirectHandler(wrap Handler) Handler {
return &redirectHandler{wrap}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment