Skip to content
Snippets Groups Projects
Commit 292e4e71 authored by ale's avatar ale
Browse files

simplify the HTTP server structure

Refactor the HTTP handler hierarchy to make it simpler.

Replace the ghost logging handler with a simpler one, that fires as soon
as the HTTP request headers are sent (this is a better solution for
long-lived connections).

Add a debug page to track long-lived HTTP request.

Add tests for the HTTP server.
parent 2f3a1b45
No related branches found
No related tags found
No related merge requests found
......@@ -29,11 +29,12 @@ const (
TranscoderMasterElectionBase = "/icecast/" + ABIVersion + "/transcode"
NodePrefix = "/icecast/" + ABIVersion + "/nodes/"
IcecastPort = 8000
IcecastMountPrefix = "/_stream"
)
var (
IcecastPort = 8000
ErrIsDirectory = errors.New("key is a directory")
ErrIsFile = errors.New("key is a file")
)
......
......@@ -39,9 +39,9 @@ func main() {
dnsRed := fe.NewDnsRedirector(client, *domain, util.IPListWithDefault(*publicIps, "127.0.0.1"), dnsTtl)
dnsRed.Run(fmt.Sprintf(":%d", *dnsPort))
red, err := fe.NewHttpRedirector(client, *domain, *lbPolicy)
red, err := fe.NewHttpRedirector(client, *domain, *lbPolicy, *staticDir, *templateDir)
if err != nil {
log.Fatal(err)
}
red.Run(fmt.Sprintf(":%d", *httpPort), *staticDir, *templateDir)
red.Run(fmt.Sprintf(":%d", *httpPort))
}
......@@ -33,41 +33,83 @@ var (
sourceErrors = instrumentation.NewCounter("http.source_errors")
)
type statsResponseWriter struct {
// ResponseWriter wrapper that logs an entry for every incoming HTTP
// request, as soon as the response headers are sent.
type logResponseWriter struct {
http.ResponseWriter
code int
logged bool
// Parameters of the original request.
remoteAddr string
method string
path string
proto string
}
func (w *logResponseWriter) writeLog(code int) {
log.Printf("%s %s %s %s %d", w.remoteAddr, w.method, w.path, w.proto, code)
httpStatusCodes.IncrVar(strconv.Itoa(code))
w.logged = true
}
func (w *statsResponseWriter) WriteHeader(code int) {
w.code = code
func (w *logResponseWriter) WriteHeader(code int) {
w.writeLog(code)
w.ResponseWriter.WriteHeader(code)
}
func statsHandler(h http.Handler) http.HandlerFunc {
func (w *logResponseWriter) Write(data []byte) (int, error) {
// Catch the case when WriteHeader wasn't called.
if !w.logged {
w.writeLog(http.StatusOK)
}
return w.ResponseWriter.Write(data)
}
func (w *logResponseWriter) WrappedWriter() http.ResponseWriter {
return w.ResponseWriter
}
func logHandler(h http.Handler) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
wrapw := &statsResponseWriter{w, 200}
ip, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
ip = r.RemoteAddr
}
wrapw := &logResponseWriter{
ResponseWriter: w,
remoteAddr: ip,
method: r.Method,
path: r.URL.Path,
proto: r.Proto,
}
h.ServeHTTP(wrapw, r)
httpStatusCodes.IncrVar(strconv.Itoa(wrapw.code))
}
}
// HTTP redirector.
type HttpRedirector struct {
domain string
lb *autoradioLoadBalancer
client *autoradio.Client
template *template.Template
domain string
staticDir string
lb *autoradioLoadBalancer
client *autoradio.Client
template *template.Template
}
func NewHttpRedirector(client *autoradio.Client, domain string, lbspec string) (*HttpRedirector, error) {
func NewHttpRedirector(client *autoradio.Client, domain, lbspec, staticDir, templateDir string) (*HttpRedirector, error) {
lb, err := parseLoadBalancerSpec(lbspec)
if err != nil {
return nil, err
}
tmpl := template.Must(
template.ParseGlob(
filepath.Join(templateDir, "*.html")))
return &HttpRedirector{
client: client,
domain: domain,
lb: lb,
client: client,
domain: domain,
lb: lb,
template: tmpl,
staticDir: staticDir,
}, nil
}
......@@ -224,11 +266,14 @@ func (h *HttpRedirector) serveSource(mount *autoradio.Mount, w http.ResponseWrit
}
// Create a ReverseProxy on the fly with the right backend
// address.
// address. Blindly assume that the master has at least one
// public IP, and that we can reach it.
target := icecastAddr(masterInfo.IP[0])
tracker.setConnState(r, StateSource, target)
proxy := &ReverseProxy{
Director: func(req *http.Request) {
req.URL.Scheme = "http"
req.URL.Host = icecastAddr(masterInfo.IP[0])
req.URL.Host = target
req.URL.Path = autoradio.MountNameToIcecastPath(mount.Name)
},
FlushInterval: 500 * time.Millisecond,
......@@ -257,73 +302,66 @@ func (h *HttpRedirector) serveStatusPage(w http.ResponseWriter, r *http.Request)
w.Write(buf.Bytes())
}
// Run starts the HTTP server on the given addr. Does not return.
func (h *HttpRedirector) Run(addr, staticDir, templateDir string) {
// Start the background goroutine that updates the
// LoadBalancer asynchronously.
go h.lbUpdater()
h.template = template.Must(
template.ParseGlob(
filepath.Join(templateDir, "*.html")))
func (h *HttpRedirector) createHandler() http.Handler {
// Create our HTTP handler stack.
mux := http.NewServeMux()
// Serve static content. Responses support gzip encoding.
// Serve static content.
mux.Handle(
"/static/",
handlers.GZIPHandler(
http.StripPrefix(
"/static/",
http.FileServer(http.Dir(staticDir))),
http.FileServer(http.Dir(h.staticDir))),
nil))
// Pass /debug/ to the default ServeMux.
mux.Handle("/debug/", http.DefaultServeMux)
// Pass /debug/ to the default ServeMux, all the default debug
// handlers are installed there.
mux.Handle(
"/debug/",
handlers.GZIPHandler(http.DefaultServeMux, nil))
// Optionally enable a reverse proxy to the local Icecast for
// the direct stream URLs (below IcecastMountPrefix).
if *proxyStreams {
iceProxy := NewSingleHostReverseProxy(&url.URL{Scheme: "http", Host: fmt.Sprintf("localhost:%d", autoradio.IcecastPort)})
mux.HandleFunc(autoradio.IcecastMountPrefix+"/", func(w http.ResponseWriter, r *http.Request) {
tracker.setConnState(r, StateListener, "localhost")
iceProxy.ServeHTTP(w, r)
})
}
// The root handler should discriminate between requests for
// the status page, source and listener connections.
relayHandler := h.withMount(h.serveRelay)
sourceHandler := h.withMount(h.serveSource)
statusPageHandler := handlers.GZIPHandler(
http.HandlerFunc(h.serveStatusPage), nil)
// The / handler should discriminate between a request for a
// stream or the status page.
statush := handlers.GZIPHandler(http.HandlerFunc(h.serveStatusPage), nil)
relayh := h.withMount(h.serveRelay)
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
switch {
case r.Method == "SOURCE":
sourceHandler.ServeHTTP(w, r)
case r.URL.Path == "" || r.URL.Path == "/":
statush.ServeHTTP(w, r)
statusPageHandler.ServeHTTP(w, r)
default:
relayh.ServeHTTP(w, r)
relayHandler.ServeHTTP(w, r)
}
})
// Add some handlers to support gzip-encoded responses and
// request logging.
logopts := handlers.NewLogOptions(nil, handlers.Lshort)
logopts.Immediate = true
wraph := handlers.LogHandler(mux, logopts)
wraph = statsHandler(wraph)
// Serve proxied requests bypassing the logging and gzip
// handlers: since they wrap the ResponseWriter, we would be
// unable to hijack the underlying connection for proxying.
sourceHandler := h.withMount(h.serveSource)
root := http.NewServeMux()
// Instrument the resulting HTTP handler.
return logHandler(trackRequestsHandler(mux))
}
// Optionally enable a reverse proxy to the local Icecast.
if *proxyStreams {
iceurl, _ := url.Parse(fmt.Sprintf("http://localhost:%d", autoradio.IcecastPort))
root.Handle(autoradio.IcecastMountPrefix+"/", NewSingleHostReverseProxy(iceurl))
}
root.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if r.Method == "SOURCE" {
sourceHandler.ServeHTTP(w, r)
} else {
wraph.ServeHTTP(w, r)
}
})
// Run starts the HTTP server on the given addr. Does not return.
func (h *HttpRedirector) Run(addr string) {
// Start the background goroutine that updates the
// LoadBalancer asynchronously.
go h.lbUpdater()
httpServer := &http.Server{
Addr: addr,
Handler: root,
Handler: h.createHandler(),
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
}
......
package fe
import (
"fmt"
"net/http"
"sort"
"sync"
"text/template"
"time"
)
const debugText = `<html>
<body>
<title>Requests</title>
<table>
<tr><th>IP</th><th>URL</th><th>Target</th></tr>
{{range .}}
<tr>
<td align=left>{{.RemoteAddr}}</td>
<td align=left font=fixed>{{.URL}}</td>
<td align=left>{{.State}}</td>
<td align=left>{{.Target}}</td>
</tr>
{{end}}
</table>
</body>
</html>`
var (
debugTmpl = template.Must(template.New("http debug").Parse(debugText))
tracker *requestTracker
)
const (
StateConnected = iota
StateSource
StateListener
)
type connState int
func (s connState) String() string {
switch s {
case StateConnected:
return "CONNECTED"
case StateSource:
return "SOURCE"
case StateListener:
return "LISTENER"
}
return ""
}
type connInfo struct {
State connState
URL string
Stamp time.Time
RemoteAddr string
Target string
}
// Request tracker (for long-running requests).
type requestTracker struct {
lock sync.Mutex
conns map[*http.Request]*connInfo
}
func (t *requestTracker) addConn(r *http.Request) {
t.lock.Lock()
defer t.lock.Unlock()
t.conns[r] = &connInfo{
URL: r.URL.String(),
State: StateConnected,
Stamp: time.Now(),
RemoteAddr: r.RemoteAddr,
}
}
func (t *requestTracker) delConn(r *http.Request) {
t.lock.Lock()
defer t.lock.Unlock()
delete(t.conns, r)
}
func (t *requestTracker) setConnState(r *http.Request, state connState, target string) {
t.lock.Lock()
defer t.lock.Unlock()
if cs, ok := t.conns[r]; ok {
cs.State = state
cs.Target = target
}
}
type connList []connInfo
func (l connList) Len() int { return len(l) }
func (l connList) Swap(i, j int) { l[i], l[j] = l[j], l[i] }
func (l connList) Less(i, j int) bool {
if l[i].URL == l[j].URL {
return l[i].RemoteAddr < l[j].RemoteAddr
}
return l[i].URL < l[j].URL
}
func (t *requestTracker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
t.lock.Lock()
var conns []connInfo
for _, c := range t.conns {
conns = append(conns, *c)
}
t.lock.Unlock()
sort.Sort(connList(conns))
err := debugTmpl.Execute(w, conns)
if err != nil {
fmt.Fprintln(w, "debug: error executing template: ", err.Error())
}
}
func trackRequestsHandler(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
tracker.addConn(r)
h.ServeHTTP(w, r)
tracker.delConn(r)
})
}
func init() {
tracker = &requestTracker{
conns: make(map[*http.Request]*connInfo),
}
http.Handle("/debug/http", tracker)
}
package fe
import (
"fmt"
"io/ioutil"
"net"
"net/http"
"net/http/httptest"
"strconv"
"strings"
"testing"
"git.autistici.org/ale/autoradio"
"git.autistici.org/ale/autoradio/util"
)
// Create a test target, replacing the local icecast daemon. Returns
// the port used, so the caller can then override the IcecastPort
// global variable.
func createTestTargetServer(t *testing.T) (*httptest.Server, int) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "%s", r.URL.Path)
}))
port, _ := strconv.Atoi(strings.Split(strings.TrimPrefix(srv.URL, "http://"), ":")[1])
return srv, port
}
func createTestHttpRedirector(t *testing.T) *HttpRedirector {
nodes := []*autoradio.NodeStatus{
&autoradio.NodeStatus{
Name: "node1",
IcecastUp: true,
IP: []net.IP{
net.ParseIP("127.0.0.1"),
},
},
}
etcd := util.NewTestEtcdClient()
etcd.Set(autoradio.MountPrefix+"test.ogg",
`{"Name": "/test.ogg", "Username": "source1", "Password": "foo"}`,
86400)
etcd.Set(autoradio.MasterElectionPath,
`{"Name": "node1", "IP": ["127.0.0.1"]}`,
86400)
client := autoradio.NewClient(etcd)
h, err := NewHttpRedirector(client, "example.com", "best", "./static", "./templates")
if err != nil {
t.Fatal(err)
}
// Bootstrap the load balancer.
h.lb.Update(nodes)
return h
}
func createTestHttpServer(t *testing.T) (*HttpRedirector, *httptest.Server) {
h := createTestHttpRedirector(t)
srv := httptest.NewServer(h.createHandler())
return h, srv
}
func doHttpRequest(t *testing.T, method, url string, expectedStatus int) string {
req, err := http.NewRequest(method, url, nil)
if err != nil {
t.Fatalf("Error building request: %v", err)
}
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
t.Fatalf("Error retrieving page: %v", err)
}
if resp.StatusCode != expectedStatus {
t.Fatalf("HTTP status %d (expected %d)", resp.StatusCode, expectedStatus)
}
defer resp.Body.Close()
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Error reading data: %v", err)
}
return string(data)
}
func TestHttpRedirector_StatusPage(t *testing.T) {
_, srv := createTestHttpServer(t)
defer srv.Close()
// Retrieve the status page.
data := doHttpRequest(t, "GET", srv.URL, 200)
if !strings.Contains(data, "<div class=\"container\">") {
t.Errorf("Bad response:\n%s", data)
}
}
func TestHttpRedirector_Static(t *testing.T) {
_, srv := createTestHttpServer(t)
defer srv.Close()
data := doHttpRequest(t, "GET", srv.URL+"/static/style.css", 200)
if !strings.Contains(data, "body {") {
t.Errorf("Bad response:\n%s", data)
}
}
type httpTestContext struct {
srv *httptest.Server
targetSrv *httptest.Server
origIcecastPort int
}
func (c *httpTestContext) Close() {
c.srv.Close()
c.targetSrv.Close()
autoradio.IcecastPort = c.origIcecastPort
}
func createTestHttpContext(t *testing.T) *httpTestContext {
target, targetPort := createTestTargetServer(t)
autoradio.IcecastPort = targetPort
_, srv := createTestHttpServer(t)
c := &httpTestContext{
srv: srv,
targetSrv: target,
origIcecastPort: autoradio.IcecastPort,
}
return c
}
func TestHttpRedirector_Source(t *testing.T) {
ctx := createTestHttpContext(t)
defer ctx.Close()
data := doHttpRequest(t, "SOURCE", ctx.srv.URL+"/test.ogg", 200)
if data != "/_stream/test.ogg" {
t.Errorf("Bad response: %s", data)
}
doHttpRequest(t, "SOURCE", ctx.srv.URL+"/nonexist.ogg", 404)
}
func TestHttpRedirector_Relay(t *testing.T) {
ctx := createTestHttpContext(t)
defer ctx.Close()
data := doHttpRequest(t, "GET", ctx.srv.URL+"/test.ogg", 200)
if data != "/_stream/test.ogg" {
t.Errorf("Bad response: %s", data)
}
}
func TestHttpRedirector_IcecastProxy(t *testing.T) {
*proxyStreams = true
defer func() {
*proxyStreams = false
}()
ctx := createTestHttpContext(t)
defer ctx.Close()
data := doHttpRequest(t, "GET", ctx.srv.URL+"/_stream/test.ogg", 200)
if data != "/_stream/test.ogg" {
t.Errorf("Bad response: %s", data)
}
}
......@@ -100,6 +100,10 @@ var hopHeaders = []string{
"Upgrade",
}
type wrappedWriter interface {
WrappedWriter() http.ResponseWriter
}
func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
transport := p.Transport
if transport == nil {
......@@ -158,10 +162,29 @@ func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
return
}
// Hijack the request connection.
conn, _, err := rw.(http.Hijacker).Hijack()
if err != nil {
log.Printf("http: proxy hijack error: %v", err)
// Hijack the request connection. We might need to unroll the
// layers of nested WrappedWriters, until we find a
// ResponseWriter that also implements the http.Hijacker
// interface.
var conn net.Conn
for {
if h, ok := rw.(http.Hijacker); ok {
var err error
conn, _, err = h.Hijack()
if err != nil {
log.Printf("http: proxy hijack error: %v", err)
rw.WriteHeader(http.StatusInternalServerError)
return
}
break
} else if w, ok := rw.(wrappedWriter); ok {
rw = w.WrappedWriter()
} else {
break
}
}
if conn == nil {
log.Printf("http: proxy error: could not find hijackable connection")
rw.WriteHeader(http.StatusInternalServerError)
return
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment