package crawl

import (
	"bytes"
	"encoding/gob"
	"errors"
	"fmt"
	"io"
	"log"
	"net/http"
	"net/url"
	"sync"
	"sync/atomic"
	"time"

	"github.com/PuerkitoBio/purell"
	"github.com/syndtr/goleveldb/leveldb"
	lerr "github.com/syndtr/goleveldb/leveldb/errors"
	"github.com/syndtr/goleveldb/leveldb/iterator"
	lutil "github.com/syndtr/goleveldb/leveldb/util"
)

var errorRetryDelay = 180 * time.Second

type gobDB struct {
	*leveldb.DB
}

func newGobDB(path string) (*gobDB, error) {
	db, err := leveldb.OpenFile(path, nil)
	if lerr.IsCorrupted(err) {
		log.Printf("corrupted database, recovering...")
		db, err = leveldb.RecoverFile(path, nil)
	}
	if err != nil {
		return nil, err
	}
	return &gobDB{db}, nil
}

func (db *gobDB) PutObjBatch(wb *leveldb.Batch, key []byte, obj interface{}) error {
	var b bytes.Buffer
	if err := gob.NewEncoder(&b).Encode(obj); err != nil {
		return err
	}
	wb.Put(key, b.Bytes())
	return nil
}

func (db *gobDB) GetObj(key []byte, obj interface{}) error {
	data, err := db.Get(key, nil)
	if err != nil {
		return err
	}
	if err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(obj); err != nil {
		return err
	}
	return nil
}

func (db *gobDB) NewPrefixIterator(prefix []byte) *gobIterator {
	return newGobIterator(db.NewIterator(lutil.BytesPrefix(prefix), nil))
}

func (db *gobDB) NewRangeIterator(startKey, endKey []byte) *gobIterator {
	return newGobIterator(db.NewIterator(&lutil.Range{Start: startKey, Limit: endKey}, nil))
}

type gobIterator struct {
	iterator.Iterator
}

func newGobIterator(i iterator.Iterator) *gobIterator {
	return &gobIterator{i}
}

func (i *gobIterator) Value(obj interface{}) error {
	return gob.NewDecoder(bytes.NewBuffer(i.Iterator.Value())).Decode(obj)
}

// Outlink is a tagged outbound link.
type Outlink struct {
	URL *url.URL
	Tag int
}

const (
	// TagPrimary is a primary reference (another web page).
	TagPrimary = iota

	// TagRelated is a secondary resource, related to a page.
	TagRelated
)

// A Fetcher retrieves contents from remote URLs.
type Fetcher interface {
	// Fetch retrieves a URL and returns the response.
	Fetch(string) (*http.Response, error)
}

// FetcherFunc wraps a simple function into the Fetcher interface.
type FetcherFunc func(string) (*http.Response, error)

// Fetch retrieves a URL and returns the response.
func (f FetcherFunc) Fetch(u string) (*http.Response, error) {
	return f(u)
}

// A Handler processes crawled contents. Any errors returned by public
// implementations of this interface are considered fatal and will
// cause the crawl to abort. The URL will be removed from the queue
// unless the handler returns the special error ErrRetryRequest.
type Handler interface {
	// Handle the response from a URL.
	Handle(Publisher, string, int, int, *http.Response, error) error
}

// HandlerFunc wraps a function into the Handler interface.
type HandlerFunc func(Publisher, string, int, int, *http.Response, error) error

// Handle the response from a URL.
func (f HandlerFunc) Handle(p Publisher, u string, tag, depth int, resp *http.Response, err error) error {
	return f(p, u, tag, depth, resp, err)
}

// ErrRetryRequest is returned by a Handler when the request should be
// retried after some time.
var ErrRetryRequest = errors.New("retry_request")

// Publisher is an interface to something with an Enqueue() method to
// add new potential URLs to crawl.
type Publisher interface {
	Enqueue(Outlink, int) error
}

// The Crawler object contains the crawler state.
type Crawler struct {
	db      *gobDB
	queue   *queue
	seeds   []*url.URL
	scope   Scope
	fetcher Fetcher
	handler Handler

	stopCh   chan bool
	stopping atomic.Value

	enqueueMx sync.Mutex
}

func normalizeURL(u *url.URL) *url.URL {
	urlStr := purell.NormalizeURL(u,
		purell.FlagsSafe|purell.FlagRemoveDotSegments|purell.FlagRemoveDuplicateSlashes|
			purell.FlagRemoveFragment|purell.FlagSortQuery)
	u2, err := url.Parse(urlStr)
	if err != nil {
		// We *really* do not expect an error here.
		panic(err)
	}
	return u2
}

func seenKey(u *url.URL) []byte {
	return []byte(fmt.Sprintf("_seen/%s", u.String()))
}

func (c *Crawler) hasSeen(u *url.URL) bool {
	_, err := c.db.Get(seenKey(u), nil)
	return err == nil
}

func (c *Crawler) setSeen(wb *leveldb.Batch, u *url.URL) {
	wb.Put(seenKey(u), []byte{})
}

// Enqueue a (possibly new) URL for processing.
func (c *Crawler) Enqueue(link Outlink, depth int) error {
	// Normalize the URL. We are going to replace link.URL in-place, to
	// ensure that scope checks are applied to the normalized URL.
	link.URL = normalizeURL(link.URL)

	// See if it's in scope.
	if !c.scope.Check(link, depth) {
		return nil
	}

	// Protect the read-modify-update below with a mutex.
	c.enqueueMx.Lock()
	defer c.enqueueMx.Unlock()

	// Check if we've already seen it.
	if c.hasSeen(link.URL) {
		return nil
	}

	// Store the URL in the queue, and mark it as seen to make
	// sure that subsequent calls to Enqueue with the same URL
	// will fail.
	wb := new(leveldb.Batch)
	if err := c.queue.Add(wb, link.URL.String(), link.Tag, depth, time.Now()); err != nil {
		return err
	}
	c.setSeen(wb, link.URL)
	return c.db.Write(wb, nil)
}

var scanInterval = 1 * time.Second

// Scan the queue for URLs until there are no more.
func (c *Crawler) process() <-chan queuePair {
	ch := make(chan queuePair, 100)
	go func() {
		t := time.NewTicker(scanInterval)
		defer t.Stop()
		defer close(ch)
		for {
			select {
			case <-t.C:
				if err := c.queue.Scan(ch); err != nil {
					return
				}
			case <-c.stopCh:
				return
			}
		}
	}()
	return ch
}

// Main worker loop.
func (c *Crawler) urlHandler(queue <-chan queuePair) {
	for p := range queue {
		// Stop flag needs to short-circuit the queue (which
		// is buffered), or it's going to take a while before
		// we actually stop.
		if c.stopping.Load().(bool) {
			return
		}

		// Fetch the URL and handle it. Make sure to Close the
		// response body (even if it gets replaced in the
		// Response object).
		fmt.Printf("%s\n", p.URL)
		httpResp, httpErr := c.fetcher.Fetch(p.URL)
		var respBody io.ReadCloser
		if httpErr == nil {
			respBody = httpResp.Body
		}

		// Invoke the handler (even if the fetcher errored
		// out). Errors in handling requests are fatal, crawl
		// will be aborted.
		err := c.handler.Handle(c, p.URL, p.Tag, p.Depth, httpResp, httpErr)
		if httpErr == nil {
			respBody.Close() // nolint
		}

		wb := new(leveldb.Batch)
		switch err {
		case nil:
			c.queue.Release(wb, p)
		case ErrRetryRequest:
			Must(c.queue.Retry(wb, p, errorRetryDelay))
		default:
			log.Panicf("fatal error in handling %s: %v", p.URL, err)
		}

		// Write the result in our database.
		Must(c.db.Write(wb, nil))
	}
}

// MustParseURLs parses a list of URLs and aborts on failure.
func MustParseURLs(urls []string) []*url.URL {
	// Parse the seed URLs.
	var parsed []*url.URL
	for _, s := range urls {
		u, err := url.Parse(s)
		if err != nil {
			log.Fatalf("error parsing URL \"%s\": %v", s, err)
		}
		parsed = append(parsed, u)
	}
	return parsed
}

// NewCrawler creates a new Crawler object with the specified behavior.
func NewCrawler(path string, seeds []*url.URL, scope Scope, f Fetcher, h Handler) (*Crawler, error) {
	// Open the crawl database.
	db, err := newGobDB(path)
	if err != nil {
		return nil, err
	}

	c := &Crawler{
		db:      db,
		queue:   &queue{db: db},
		fetcher: f,
		handler: h,
		seeds:   seeds,
		scope:   scope,
		stopCh:  make(chan bool),
	}
	c.stopping.Store(false)

	// Recover active tasks.
	if err := c.queue.Recover(); err != nil {
		return nil, err
	}

	return c, nil
}

// Run the crawl with the specified number of workers. This function
// does not exit until all work is done (no URLs left in the queue).
func (c *Crawler) Run(concurrency int) {
	// Load initial seeds into the queue.
	for _, u := range c.seeds {
		Must(c.Enqueue(Outlink{URL: u, Tag: TagPrimary}, 0))
	}

	// Start some runners and wait until they're done.
	var wg sync.WaitGroup
	ch := c.process()
	for i := 0; i < concurrency; i++ {
		wg.Add(1)
		go func() {
			c.urlHandler(ch)
			wg.Done()
		}()
	}
	wg.Wait()
}

// Stop a running crawl. This will cause a running Run function to return.
func (c *Crawler) Stop() {
	c.stopping.Store(true)
	close(c.stopCh)
}

// Close the database and release resources associated with the crawler state.
func (c *Crawler) Close() {
	c.db.Close() // nolint
}

// FollowRedirects returns a Handler that follows HTTP redirects
// and adds them to the queue for crawling. It will call the wrapped
// handler on all requests regardless.
func FollowRedirects(wrap Handler) Handler {
	return HandlerFunc(func(p Publisher, u string, tag, depth int, resp *http.Response, err error) error {
		if herr := wrap.Handle(p, u, tag, depth, resp, err); herr != nil {
			return herr
		}

		if err != nil {
			return nil
		}

		location := resp.Header.Get("Location")
		if resp.StatusCode >= 300 && resp.StatusCode < 400 && location != "" {
			locationURL, uerr := resp.Request.URL.Parse(location)
			if uerr != nil {
				log.Printf("error parsing Location header: %v", uerr)
			} else {
				return p.Enqueue(Outlink{URL: locationURL, Tag: tag}, depth+1)
			}
		}
		return nil
	})
}

// FilterErrors returns a Handler that forwards only requests with a
// "successful" HTTP status code (anything < 400). When using this
// wrapper, subsequent Handle calls will always have err set to nil.
func FilterErrors(wrap Handler) Handler {
	return HandlerFunc(func(p Publisher, u string, tag, depth int, resp *http.Response, err error) error {
		if err != nil {
			return nil
		}
		if resp.StatusCode >= 400 {
			return nil
		}
		return wrap.Handle(p, u, tag, depth, resp, nil)
	})
}

// HandleRetries returns a Handler that will retry requests on
// temporary errors (all transport-level errors are considered
// temporary, as well as any HTTP status code >= 500).
func HandleRetries(wrap Handler) Handler {
	return HandlerFunc(func(p Publisher, u string, tag, depth int, resp *http.Response, err error) error {
		if err != nil || resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode >= 500 {
			return ErrRetryRequest
		}
		return wrap.Handle(p, u, tag, depth, resp, nil)
	})
}

// Must will abort the program with a message when we encounter an
// error that we can't recover from.
func Must(err error) {
	if err != nil {
		log.Panicf("fatal error: %v", err)
	}
}