From 292e4e7182222902f648e06c66f90585f476ddd3 Mon Sep 17 00:00:00 2001
From: ale <ale@incal.net>
Date: Sat, 17 Jan 2015 16:48:18 +0000
Subject: [PATCH] simplify the HTTP server structure

Refactor the HTTP handler hierarchy to make it simpler.

Replace the ghost logging handler with a simpler one, that fires as soon
as the HTTP request headers are sent (this is a better solution for
long-lived connections).

Add a debug page to track long-lived HTTP request.

Add tests for the HTTP server.
---
 api.go                         |   3 +-
 cmd/redirectord/redirectord.go |   4 +-
 fe/http.go                     | 162 +++++++++++++++++++-------------
 fe/http_debug.go               | 135 +++++++++++++++++++++++++++
 fe/http_test.go                | 165 +++++++++++++++++++++++++++++++++
 fe/proxy.go                    |  31 ++++++-
 6 files changed, 431 insertions(+), 69 deletions(-)
 create mode 100644 fe/http_debug.go
 create mode 100644 fe/http_test.go

diff --git a/api.go b/api.go
index 63f5c777..9e56c8c4 100644
--- a/api.go
+++ b/api.go
@@ -29,11 +29,12 @@ const (
 	TranscoderMasterElectionBase = "/icecast/" + ABIVersion + "/transcode"
 	NodePrefix                   = "/icecast/" + ABIVersion + "/nodes/"
 
-	IcecastPort        = 8000
 	IcecastMountPrefix = "/_stream"
 )
 
 var (
+	IcecastPort = 8000
+
 	ErrIsDirectory = errors.New("key is a directory")
 	ErrIsFile      = errors.New("key is a file")
 )
diff --git a/cmd/redirectord/redirectord.go b/cmd/redirectord/redirectord.go
index 92ed0e29..7fdb59d4 100644
--- a/cmd/redirectord/redirectord.go
+++ b/cmd/redirectord/redirectord.go
@@ -39,9 +39,9 @@ func main() {
 	dnsRed := fe.NewDnsRedirector(client, *domain, util.IPListWithDefault(*publicIps, "127.0.0.1"), dnsTtl)
 	dnsRed.Run(fmt.Sprintf(":%d", *dnsPort))
 
-	red, err := fe.NewHttpRedirector(client, *domain, *lbPolicy)
+	red, err := fe.NewHttpRedirector(client, *domain, *lbPolicy, *staticDir, *templateDir)
 	if err != nil {
 		log.Fatal(err)
 	}
-	red.Run(fmt.Sprintf(":%d", *httpPort), *staticDir, *templateDir)
+	red.Run(fmt.Sprintf(":%d", *httpPort))
 }
diff --git a/fe/http.go b/fe/http.go
index 3383ec69..73d8f392 100644
--- a/fe/http.go
+++ b/fe/http.go
@@ -33,41 +33,83 @@ var (
 	sourceErrors      = instrumentation.NewCounter("http.source_errors")
 )
 
-type statsResponseWriter struct {
+// ResponseWriter wrapper that logs an entry for every incoming HTTP
+// request, as soon as the response headers are sent.
+type logResponseWriter struct {
 	http.ResponseWriter
-	code int
+
+	logged bool
+
+	// Parameters of the original request.
+	remoteAddr string
+	method     string
+	path       string
+	proto      string
+}
+
+func (w *logResponseWriter) writeLog(code int) {
+	log.Printf("%s %s %s %s %d", w.remoteAddr, w.method, w.path, w.proto, code)
+	httpStatusCodes.IncrVar(strconv.Itoa(code))
+	w.logged = true
 }
 
-func (w *statsResponseWriter) WriteHeader(code int) {
-	w.code = code
+func (w *logResponseWriter) WriteHeader(code int) {
+	w.writeLog(code)
 	w.ResponseWriter.WriteHeader(code)
 }
 
-func statsHandler(h http.Handler) http.HandlerFunc {
+func (w *logResponseWriter) Write(data []byte) (int, error) {
+	// Catch the case when WriteHeader wasn't called.
+	if !w.logged {
+		w.writeLog(http.StatusOK)
+	}
+	return w.ResponseWriter.Write(data)
+}
+
+func (w *logResponseWriter) WrappedWriter() http.ResponseWriter {
+	return w.ResponseWriter
+}
+
+func logHandler(h http.Handler) http.HandlerFunc {
 	return func(w http.ResponseWriter, r *http.Request) {
-		wrapw := &statsResponseWriter{w, 200}
+		ip, _, err := net.SplitHostPort(r.RemoteAddr)
+		if err != nil {
+			ip = r.RemoteAddr
+		}
+		wrapw := &logResponseWriter{
+			ResponseWriter: w,
+			remoteAddr:     ip,
+			method:         r.Method,
+			path:           r.URL.Path,
+			proto:          r.Proto,
+		}
 		h.ServeHTTP(wrapw, r)
-		httpStatusCodes.IncrVar(strconv.Itoa(wrapw.code))
 	}
 }
 
 // HTTP redirector.
 type HttpRedirector struct {
-	domain   string
-	lb       *autoradioLoadBalancer
-	client   *autoradio.Client
-	template *template.Template
+	domain    string
+	staticDir string
+	lb        *autoradioLoadBalancer
+	client    *autoradio.Client
+	template  *template.Template
 }
 
-func NewHttpRedirector(client *autoradio.Client, domain string, lbspec string) (*HttpRedirector, error) {
+func NewHttpRedirector(client *autoradio.Client, domain, lbspec, staticDir, templateDir string) (*HttpRedirector, error) {
 	lb, err := parseLoadBalancerSpec(lbspec)
 	if err != nil {
 		return nil, err
 	}
+	tmpl := template.Must(
+		template.ParseGlob(
+			filepath.Join(templateDir, "*.html")))
 	return &HttpRedirector{
-		client: client,
-		domain: domain,
-		lb:     lb,
+		client:    client,
+		domain:    domain,
+		lb:        lb,
+		template:  tmpl,
+		staticDir: staticDir,
 	}, nil
 }
 
@@ -224,11 +266,14 @@ func (h *HttpRedirector) serveSource(mount *autoradio.Mount, w http.ResponseWrit
 	}
 
 	// Create a ReverseProxy on the fly with the right backend
-	// address.
+	// address. Blindly assume that the master has at least one
+	// public IP, and that we can reach it.
+	target := icecastAddr(masterInfo.IP[0])
+	tracker.setConnState(r, StateSource, target)
 	proxy := &ReverseProxy{
 		Director: func(req *http.Request) {
 			req.URL.Scheme = "http"
-			req.URL.Host = icecastAddr(masterInfo.IP[0])
+			req.URL.Host = target
 			req.URL.Path = autoradio.MountNameToIcecastPath(mount.Name)
 		},
 		FlushInterval: 500 * time.Millisecond,
@@ -257,73 +302,66 @@ func (h *HttpRedirector) serveStatusPage(w http.ResponseWriter, r *http.Request)
 	w.Write(buf.Bytes())
 }
 
-// Run starts the HTTP server on the given addr. Does not return.
-func (h *HttpRedirector) Run(addr, staticDir, templateDir string) {
-	// Start the background goroutine that updates the
-	// LoadBalancer asynchronously.
-	go h.lbUpdater()
-
-	h.template = template.Must(
-		template.ParseGlob(
-			filepath.Join(templateDir, "*.html")))
-
+func (h *HttpRedirector) createHandler() http.Handler {
 	// Create our HTTP handler stack.
 	mux := http.NewServeMux()
 
-	// Serve static content. Responses support gzip encoding.
+	// Serve static content.
 	mux.Handle(
 		"/static/",
 		handlers.GZIPHandler(
 			http.StripPrefix(
 				"/static/",
-				http.FileServer(http.Dir(staticDir))),
+				http.FileServer(http.Dir(h.staticDir))),
 			nil))
 
-	// Pass /debug/ to the default ServeMux.
-	mux.Handle("/debug/", http.DefaultServeMux)
+	// Pass /debug/ to the default ServeMux, all the default debug
+	// handlers are installed there.
+	mux.Handle(
+		"/debug/",
+		handlers.GZIPHandler(http.DefaultServeMux, nil))
+
+	// Optionally enable a reverse proxy to the local Icecast for
+	// the direct stream URLs (below IcecastMountPrefix).
+	if *proxyStreams {
+		iceProxy := NewSingleHostReverseProxy(&url.URL{Scheme: "http", Host: fmt.Sprintf("localhost:%d", autoradio.IcecastPort)})
+		mux.HandleFunc(autoradio.IcecastMountPrefix+"/", func(w http.ResponseWriter, r *http.Request) {
+			tracker.setConnState(r, StateListener, "localhost")
+			iceProxy.ServeHTTP(w, r)
+		})
+	}
+
+	// The root handler should discriminate between requests for
+	// the status page, source and listener connections.
+	relayHandler := h.withMount(h.serveRelay)
+	sourceHandler := h.withMount(h.serveSource)
+	statusPageHandler := handlers.GZIPHandler(
+		http.HandlerFunc(h.serveStatusPage), nil)
 
-	// The / handler should discriminate between a request for a
-	// stream or the status page.
-	statush := handlers.GZIPHandler(http.HandlerFunc(h.serveStatusPage), nil)
-	relayh := h.withMount(h.serveRelay)
 	mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
 		switch {
+		case r.Method == "SOURCE":
+			sourceHandler.ServeHTTP(w, r)
 		case r.URL.Path == "" || r.URL.Path == "/":
-			statush.ServeHTTP(w, r)
+			statusPageHandler.ServeHTTP(w, r)
 		default:
-			relayh.ServeHTTP(w, r)
+			relayHandler.ServeHTTP(w, r)
 		}
 	})
 
-	// Add some handlers to support gzip-encoded responses and
-	// request logging.
-	logopts := handlers.NewLogOptions(nil, handlers.Lshort)
-	logopts.Immediate = true
-	wraph := handlers.LogHandler(mux, logopts)
-	wraph = statsHandler(wraph)
-
-	// Serve proxied requests bypassing the logging and gzip
-	// handlers: since they wrap the ResponseWriter, we would be
-	// unable to hijack the underlying connection for proxying.
-	sourceHandler := h.withMount(h.serveSource)
-	root := http.NewServeMux()
+	// Instrument the resulting HTTP handler.
+	return logHandler(trackRequestsHandler(mux))
+}
 
-	// Optionally enable a reverse proxy to the local Icecast.
-	if *proxyStreams {
-		iceurl, _ := url.Parse(fmt.Sprintf("http://localhost:%d", autoradio.IcecastPort))
-		root.Handle(autoradio.IcecastMountPrefix+"/", NewSingleHostReverseProxy(iceurl))
-	}
-	root.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
-		if r.Method == "SOURCE" {
-			sourceHandler.ServeHTTP(w, r)
-		} else {
-			wraph.ServeHTTP(w, r)
-		}
-	})
+// Run starts the HTTP server on the given addr. Does not return.
+func (h *HttpRedirector) Run(addr string) {
+	// Start the background goroutine that updates the
+	// LoadBalancer asynchronously.
+	go h.lbUpdater()
 
 	httpServer := &http.Server{
 		Addr:         addr,
-		Handler:      root,
+		Handler:      h.createHandler(),
 		ReadTimeout:  10 * time.Second,
 		WriteTimeout: 10 * time.Second,
 	}
diff --git a/fe/http_debug.go b/fe/http_debug.go
new file mode 100644
index 00000000..4861be2a
--- /dev/null
+++ b/fe/http_debug.go
@@ -0,0 +1,135 @@
+package fe
+
+import (
+	"fmt"
+	"net/http"
+	"sort"
+	"sync"
+	"text/template"
+	"time"
+)
+
+const debugText = `<html>
+    <body>
+    <title>Requests</title>
+    <table>
+      <tr><th>IP</th><th>URL</th><th>Target</th></tr>
+      {{range .}}
+      <tr>
+        <td align=left>{{.RemoteAddr}}</td>
+        <td align=left font=fixed>{{.URL}}</td>
+        <td align=left>{{.State}}</td>
+        <td align=left>{{.Target}}</td>
+      </tr>  
+      {{end}}
+    </table>
+    </body>
+    </html>`
+
+var (
+	debugTmpl = template.Must(template.New("http debug").Parse(debugText))
+
+	tracker *requestTracker
+)
+
+const (
+	StateConnected = iota
+	StateSource
+	StateListener
+)
+
+type connState int
+
+func (s connState) String() string {
+	switch s {
+	case StateConnected:
+		return "CONNECTED"
+	case StateSource:
+		return "SOURCE"
+	case StateListener:
+		return "LISTENER"
+	}
+	return ""
+}
+
+type connInfo struct {
+	State      connState
+	URL        string
+	Stamp      time.Time
+	RemoteAddr string
+	Target     string
+}
+
+// Request tracker (for long-running requests).
+type requestTracker struct {
+	lock  sync.Mutex
+	conns map[*http.Request]*connInfo
+}
+
+func (t *requestTracker) addConn(r *http.Request) {
+	t.lock.Lock()
+	defer t.lock.Unlock()
+	t.conns[r] = &connInfo{
+		URL:        r.URL.String(),
+		State:      StateConnected,
+		Stamp:      time.Now(),
+		RemoteAddr: r.RemoteAddr,
+	}
+}
+
+func (t *requestTracker) delConn(r *http.Request) {
+	t.lock.Lock()
+	defer t.lock.Unlock()
+	delete(t.conns, r)
+}
+
+func (t *requestTracker) setConnState(r *http.Request, state connState, target string) {
+	t.lock.Lock()
+	defer t.lock.Unlock()
+	if cs, ok := t.conns[r]; ok {
+		cs.State = state
+		cs.Target = target
+	}
+}
+
+type connList []connInfo
+
+func (l connList) Len() int      { return len(l) }
+func (l connList) Swap(i, j int) { l[i], l[j] = l[j], l[i] }
+func (l connList) Less(i, j int) bool {
+	if l[i].URL == l[j].URL {
+		return l[i].RemoteAddr < l[j].RemoteAddr
+	}
+	return l[i].URL < l[j].URL
+}
+
+func (t *requestTracker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	t.lock.Lock()
+	var conns []connInfo
+	for _, c := range t.conns {
+		conns = append(conns, *c)
+	}
+	t.lock.Unlock()
+
+	sort.Sort(connList(conns))
+
+	err := debugTmpl.Execute(w, conns)
+	if err != nil {
+		fmt.Fprintln(w, "debug: error executing template: ", err.Error())
+	}
+}
+
+func trackRequestsHandler(h http.Handler) http.Handler {
+	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+		tracker.addConn(r)
+		h.ServeHTTP(w, r)
+		tracker.delConn(r)
+	})
+}
+
+func init() {
+	tracker = &requestTracker{
+		conns: make(map[*http.Request]*connInfo),
+	}
+	http.Handle("/debug/http", tracker)
+}
diff --git a/fe/http_test.go b/fe/http_test.go
new file mode 100644
index 00000000..531e5919
--- /dev/null
+++ b/fe/http_test.go
@@ -0,0 +1,165 @@
+package fe
+
+import (
+	"fmt"
+	"io/ioutil"
+	"net"
+	"net/http"
+	"net/http/httptest"
+	"strconv"
+	"strings"
+	"testing"
+
+	"git.autistici.org/ale/autoradio"
+	"git.autistici.org/ale/autoradio/util"
+)
+
+// Create a test target, replacing the local icecast daemon. Returns
+// the port used, so the caller can then override the IcecastPort
+// global variable.
+func createTestTargetServer(t *testing.T) (*httptest.Server, int) {
+	srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+		fmt.Fprintf(w, "%s", r.URL.Path)
+	}))
+	port, _ := strconv.Atoi(strings.Split(strings.TrimPrefix(srv.URL, "http://"), ":")[1])
+	return srv, port
+}
+
+func createTestHttpRedirector(t *testing.T) *HttpRedirector {
+	nodes := []*autoradio.NodeStatus{
+		&autoradio.NodeStatus{
+			Name:      "node1",
+			IcecastUp: true,
+			IP: []net.IP{
+				net.ParseIP("127.0.0.1"),
+			},
+		},
+	}
+
+	etcd := util.NewTestEtcdClient()
+	etcd.Set(autoradio.MountPrefix+"test.ogg",
+		`{"Name": "/test.ogg", "Username": "source1", "Password": "foo"}`,
+		86400)
+	etcd.Set(autoradio.MasterElectionPath,
+		`{"Name": "node1", "IP": ["127.0.0.1"]}`,
+		86400)
+	client := autoradio.NewClient(etcd)
+	h, err := NewHttpRedirector(client, "example.com", "best", "./static", "./templates")
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// Bootstrap the load balancer.
+	h.lb.Update(nodes)
+
+	return h
+}
+
+func createTestHttpServer(t *testing.T) (*HttpRedirector, *httptest.Server) {
+	h := createTestHttpRedirector(t)
+	srv := httptest.NewServer(h.createHandler())
+	return h, srv
+}
+
+func doHttpRequest(t *testing.T, method, url string, expectedStatus int) string {
+	req, err := http.NewRequest(method, url, nil)
+	if err != nil {
+		t.Fatalf("Error building request: %v", err)
+	}
+	client := &http.Client{}
+	resp, err := client.Do(req)
+	if err != nil {
+		t.Fatalf("Error retrieving page: %v", err)
+	}
+	if resp.StatusCode != expectedStatus {
+		t.Fatalf("HTTP status %d (expected %d)", resp.StatusCode, expectedStatus)
+	}
+	defer resp.Body.Close()
+	data, err := ioutil.ReadAll(resp.Body)
+	if err != nil {
+		t.Fatalf("Error reading data: %v", err)
+	}
+	return string(data)
+}
+
+func TestHttpRedirector_StatusPage(t *testing.T) {
+	_, srv := createTestHttpServer(t)
+	defer srv.Close()
+
+	// Retrieve the status page.
+	data := doHttpRequest(t, "GET", srv.URL, 200)
+	if !strings.Contains(data, "<div class=\"container\">") {
+		t.Errorf("Bad response:\n%s", data)
+	}
+}
+
+func TestHttpRedirector_Static(t *testing.T) {
+	_, srv := createTestHttpServer(t)
+	defer srv.Close()
+
+	data := doHttpRequest(t, "GET", srv.URL+"/static/style.css", 200)
+	if !strings.Contains(data, "body {") {
+		t.Errorf("Bad response:\n%s", data)
+	}
+}
+
+type httpTestContext struct {
+	srv             *httptest.Server
+	targetSrv       *httptest.Server
+	origIcecastPort int
+}
+
+func (c *httpTestContext) Close() {
+	c.srv.Close()
+	c.targetSrv.Close()
+	autoradio.IcecastPort = c.origIcecastPort
+}
+
+func createTestHttpContext(t *testing.T) *httpTestContext {
+	target, targetPort := createTestTargetServer(t)
+	autoradio.IcecastPort = targetPort
+	_, srv := createTestHttpServer(t)
+	c := &httpTestContext{
+		srv:             srv,
+		targetSrv:       target,
+		origIcecastPort: autoradio.IcecastPort,
+	}
+	return c
+}
+
+func TestHttpRedirector_Source(t *testing.T) {
+	ctx := createTestHttpContext(t)
+	defer ctx.Close()
+
+	data := doHttpRequest(t, "SOURCE", ctx.srv.URL+"/test.ogg", 200)
+	if data != "/_stream/test.ogg" {
+		t.Errorf("Bad response: %s", data)
+	}
+
+	doHttpRequest(t, "SOURCE", ctx.srv.URL+"/nonexist.ogg", 404)
+}
+
+func TestHttpRedirector_Relay(t *testing.T) {
+	ctx := createTestHttpContext(t)
+	defer ctx.Close()
+
+	data := doHttpRequest(t, "GET", ctx.srv.URL+"/test.ogg", 200)
+	if data != "/_stream/test.ogg" {
+		t.Errorf("Bad response: %s", data)
+	}
+}
+
+func TestHttpRedirector_IcecastProxy(t *testing.T) {
+	*proxyStreams = true
+	defer func() {
+		*proxyStreams = false
+	}()
+
+	ctx := createTestHttpContext(t)
+	defer ctx.Close()
+
+	data := doHttpRequest(t, "GET", ctx.srv.URL+"/_stream/test.ogg", 200)
+	if data != "/_stream/test.ogg" {
+		t.Errorf("Bad response: %s", data)
+	}
+}
diff --git a/fe/proxy.go b/fe/proxy.go
index ccf4503f..40f8e844 100644
--- a/fe/proxy.go
+++ b/fe/proxy.go
@@ -100,6 +100,10 @@ var hopHeaders = []string{
 	"Upgrade",
 }
 
+type wrappedWriter interface {
+	WrappedWriter() http.ResponseWriter
+}
+
 func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
 	transport := p.Transport
 	if transport == nil {
@@ -158,10 +162,29 @@ func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
 		return
 	}
 
-	// Hijack the request connection.
-	conn, _, err := rw.(http.Hijacker).Hijack()
-	if err != nil {
-		log.Printf("http: proxy hijack error: %v", err)
+	// Hijack the request connection. We might need to unroll the
+	// layers of nested WrappedWriters, until we find a
+	// ResponseWriter that also implements the http.Hijacker
+	// interface.
+	var conn net.Conn
+	for {
+		if h, ok := rw.(http.Hijacker); ok {
+			var err error
+			conn, _, err = h.Hijack()
+			if err != nil {
+				log.Printf("http: proxy hijack error: %v", err)
+				rw.WriteHeader(http.StatusInternalServerError)
+				return
+			}
+			break
+		} else if w, ok := rw.(wrappedWriter); ok {
+			rw = w.WrappedWriter()
+		} else {
+			break
+		}
+	}
+	if conn == nil {
+		log.Printf("http: proxy error: could not find hijackable connection")
 		rw.WriteHeader(http.StatusInternalServerError)
 		return
 	}
-- 
GitLab