package fe

import (
	"bytes"
	"fmt"
	"html/template"
	"io"
	"log"
	"net"
	"net/http"
	"path/filepath"
	"strconv"
	"strings"
	"sync"
	"time"

	_ "net/http/pprof"

	"git.autistici.org/ale/autoradio"
	"git.autistici.org/ale/autoradio/instrumentation"
	"github.com/PuerkitoBio/ghost/handlers"
)

var (
	httpStatusCodes   = instrumentation.NewCounter("http.status")
	httpTargetStats   = instrumentation.NewCounter("http.target")
	sourceConnections = instrumentation.NewCounter("http.source_connections")
	source404Errors   = instrumentation.NewCounter("http.source_404")
	sourceErrors      = instrumentation.NewCounter("http.source_errors")
)

type statsResponseWriter struct {
	http.ResponseWriter
	code int
}

func (w *statsResponseWriter) WriteHeader(code int) {
	w.code = code
	w.ResponseWriter.WriteHeader(code)
}

func statsHandler(h http.Handler) http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {
		wrapw := &statsResponseWriter{w, 200}
		h.ServeHTTP(wrapw, r)
		httpStatusCodes.IncrVar(strconv.Itoa(wrapw.code))
	}
}

// HTTP redirector.
type HttpRedirector struct {
	domain   string
	lb       LoadBalancingPolicy
	client   *autoradio.RadioAPI
	template *template.Template
}

func NewHttpRedirector(client *autoradio.RadioAPI, domain string, lbpolicy string) *HttpRedirector {
	return &HttpRedirector{
		client: client,
		domain: domain,
		lb:     getNamedLoadBalancingPolicy(lbpolicy),
	}
}

// Return an active node, chosen according to the current load
// balancing policy.
func (h *HttpRedirector) pickActiveNode() string {
	nodes, _ := h.client.GetNodes()
	if nodes == nil {
		return ""
	}

	// Filter nodes where Icecast is reported to be up.
	okNodes := make([]*autoradio.NodeStatus, 0, len(nodes))
	for _, n := range nodes {
		if n.IcecastUp {
			okNodes = append(okNodes, n)
		}
	}
	if len(okNodes) == 0 {
		return ""
	}

	result := h.lb.GetNode(okNodes)
	if result == nil {
		return ""
	}
	return result.IP
}

// Parse the request and extract the mount path.
func (h *HttpRedirector) getMount(r *http.Request) (*autoradio.Mount, error) {
	path := r.URL.Path
	if strings.HasSuffix(path, ".m3u") {
		path = path[:len(path)-4]
	}
	return h.client.GetMount(path)
}

func makeIcecastUrl(server string) string {
	return net.JoinHostPort(server, strconv.Itoa(autoradio.IcecastPort))
}

// Serve a response for a client connection to a relay.
func (h *HttpRedirector) serveRelay(w http.ResponseWriter, r *http.Request) {
	mount, err := h.getMount(r)
	if err != nil {
		http.Error(w, "Not Found", http.StatusNotFound)
		return
	}

	// Find an active node.
	relayAddr := h.pickActiveNode()
	if relayAddr == "" {
		http.Error(w, "No active nodes", http.StatusServiceUnavailable)
		return
	}
	httpTargetStats.IncrVar(ipToMetric(relayAddr))

	// Create the m3u response.
	m3u := fmt.Sprintf("http://%s%s\n", makeIcecastUrl(relayAddr), mount.Name)
	w.Header().Set("Content-Length", strconv.Itoa(len(m3u)))
	w.Header().Set("Content-Type", "audio/x-mpegurl")
	w.Header().Set("Expires", "-1")
	w.Header().Set("Cache-Control", "private, max-age=0")
	io.WriteString(w, m3u)
}

func (h *HttpRedirector) serveSource(w http.ResponseWriter, r *http.Request) {
	m, err := h.getMount(r)
	if err != nil || m.IsRelay() {
		if err != nil {
			log.Printf("source: error retrieving mount for %+v: %s", r, err)
		} else {
			log.Printf("source: connection to relay stream %s", m.Name)
		}
		http.Error(w, "Not Found", http.StatusNotFound)
		source404Errors.Incr()
		return
	}

	sourceConnections.Incr()

	// Handy function to report a source error (the ones we care
	// the most about because they measure failures in the
	// coordination infrastructure).
	sendErr := func(err error) {
		http.Error(w, err.Error(), http.StatusServiceUnavailable)
		sourceErrors.Incr()
	}

	// Find the current master node.
	masterAddr, err := h.client.GetMasterAddr()
	if err != nil {
		log.Printf("source: no master: %s", err)
		sendErr(err)
		return
	}

	// Create the upstream connection, and write the original
	// request to it as-is (the URL path on the backend is the
	// same, and the headers do not need to change).
	upstream, err := net.Dial("tcp", makeIcecastUrl(masterAddr))
	if err != nil {
		log.Printf("source: dial upstream: %v", err)
		sendErr(err)
		return
	}
	defer upstream.Close()
	if err := r.Write(upstream); err != nil {
		log.Printf("source: write upstream request: %v", err)
		sendErr(err)
		return
	}

	// Hijack the incoming connection. This is necessary, rather
	// than using httputil.ReverseProxy, for two important
	// reasons:
	//
	// 1) So that we can reset the timeout on the underlying
	// network connection (which we have no use for once the
	// stream has been established).
	//
	// 2) Because streaming is still mostly a HTTP/1.0 world, the
	// HTTP/1.1 features used by Go's net/http package (mostly the
	// chunked encoding, I think) will apparently confuse clients
	// and servers alike.
	//
	conn, _, err := w.(http.Hijacker).Hijack()
	if err != nil {
		log.Printf("source: hijack failed: %v", err)
		sendErr(err)
		return
	}
	defer conn.Close()
	if err := conn.SetDeadline(time.Time{}); err != nil {
		log.Printf("source: could not reset deadline: %v", err)
	}

	// Start two copiers, one for the source data, one for the
	// replies. Wait until both are done.
	var wg sync.WaitGroup
	wg.Add(2)
	go func() {
		if _, err := io.Copy(conn, upstream); err != nil {
			log.Printf("upstream -> source: Copy: %v", err)
		}
		wg.Done()
	}()
	go func() {
		if _, err := io.Copy(upstream, conn); err != nil {
			log.Printf("source -> upstream: Copy: %v", err)
		}
		wg.Done()
	}()
	wg.Wait()
}

func (h *HttpRedirector) serveStatusPage(w http.ResponseWriter, r *http.Request) {
	nodes, _ := h.client.GetNodes()
	mounts, _ := h.client.ListMounts()
	ctx := struct {
		Domain string
		Nodes  []*autoradio.NodeStatus
		Mounts []*autoradio.Mount
	}{h.domain, nodes, mounts}

	var buf bytes.Buffer
	if err := h.template.ExecuteTemplate(&buf, "index.html", ctx); err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	w.Header().Set("Content-Type", "text/html; charset=utf-8")
	w.Header().Set("Content-Length", strconv.Itoa(buf.Len()))
	w.Header().Set("Expires", "-1")
	w.Header().Set("Cache-Control", "private, max-age=0")
	w.Write(buf.Bytes())
}

// Run starts the HTTP server on the given addr. Does not return.
func (h *HttpRedirector) Run(addr, staticDir, templateDir string) {
	h.template = template.Must(
		template.ParseGlob(
			filepath.Join(templateDir, "*.html")))

	// Create our HTTP handler stack. Passes the /debug/ queries
	// along to the global ServeMux (where moodules such as pprof
	// install their handlers).
	mux := http.NewServeMux()
	mux.Handle(
		"/static/",
		http.StripPrefix(
			"/static/",
			http.FileServer(http.Dir(staticDir))))
	mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		switch {
		case r.URL.Path == "" || r.URL.Path == "/":
			h.serveStatusPage(w, r)
		case strings.HasPrefix(r.URL.Path, "/debug/"):
			http.DefaultServeMux.ServeHTTP(w, r)
		default:
			h.serveRelay(w, r)
		}
	})

	// Add some handlers to support gzip-encoded responses and
	// request logging.
	wraph := handlers.GZIPHandler(mux, nil)
	logopts := handlers.NewLogOptions(nil, handlers.Lshort)
	wraph = handlers.LogHandler(wraph, logopts)
	wraph = statsHandler(wraph)

	// Serve SOURCE requests bypassing the logging and gzip
	// handlers: since they wrap the ResponseWriter, we would be
	// unable to hijack the underlying connection for proxying.
	// TODO: look into using WrapWriter.
	rooth := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		if r.Method == "SOURCE" {
			h.serveSource(w, r)
		} else {
			wraph.ServeHTTP(w, r)
		}
	})

	httpServer := &http.Server{
		Addr:         addr,
		Handler:      rooth,
		ReadTimeout:  10 * time.Second,
		WriteTimeout: 10 * time.Second,
	}

	log.Printf("starting HTTP server on %s/tcp", httpServer.Addr)
	log.Fatal(httpServer.ListenAndServe())
}