diff --git a/fe/http.go b/fe/http.go index 6ed5400fa7a981827f2ee4a99f0abf51571ec6bc..1e1b072ce80c742fcca54b38cb809ffd591f9f99 100644 --- a/fe/http.go +++ b/fe/http.go @@ -13,10 +13,8 @@ import ( "path/filepath" "strconv" "strings" - "sync" "time" - "net/http/httputil" _ "net/http/pprof" "git.autistici.org/ale/autoradio" @@ -161,80 +159,26 @@ func (h *HttpRedirector) serveSource(mount *autoradio.Mount, w http.ResponseWrit sourceConnections.Incr() - // Handy function to report a source error (the ones we care - // the most about because they measure failures in the - // coordination infrastructure). - sendErr := func(err error) { - http.Error(w, err.Error(), http.StatusServiceUnavailable) - sourceErrors.Incr() - } - // Find the current master node. masterAddr, err := h.client.GetMasterAddr() if err != nil { - log.Printf("source: no master: %s", err) - sendErr(err) - return - } - - // Create the upstream connection, and write the HTTP request - // to it (with the right URL path). - upstream, err := net.Dial("tcp", icecastAddr(masterAddr)) - if err != nil { - log.Printf("source: dial upstream: %v", err) - sendErr(err) - return - } - defer upstream.Close() - - r.URL.Path = autoradio.MountNameToIcecastPath(mount.Name) - if err := r.Write(upstream); err != nil { - log.Printf("source: write upstream request: %v", err) - sendErr(err) + log.Printf("source error: no master: %v", err) + http.Error(w, err.Error(), http.StatusServiceUnavailable) + sourceErrors.Incr() return } - // Hijack the incoming connection. This is necessary, rather - // than using httputil.ReverseProxy, for two important - // reasons: - // - // 1) So that we can reset the timeout on the underlying - // network connection (which we have no use for once the - // stream has been established). - // - // 2) Because streaming is still mostly a HTTP/1.0 world, the - // HTTP/1.1 features used by Go's net/http package (mostly the - // chunked encoding, I think) will apparently confuse clients - // and servers alike. - // - conn, _, err := w.(http.Hijacker).Hijack() - if err != nil { - log.Printf("source: hijack failed: %v", err) - sendErr(err) - return + // Create a ReverseProxy on the fly with the right backend + // address. + proxy := &ReverseProxy{ + Director: func(req *http.Request) { + req.URL.Scheme = "http" + req.URL.Host = icecastAddr(masterAddr) + req.URL.Path = autoradio.MountNameToIcecastPath(mount.Name) + }, + FlushInterval: 500 * time.Millisecond, } - defer conn.Close() - if err := conn.SetDeadline(time.Time{}); err != nil { - log.Printf("source: could not reset deadline: %v", err) - } - - // Start two copiers, one for the source data, one for the - // replies. Wait until both are done. - var wg sync.WaitGroup - wg.Add(2) - go func() { - if _, err := io.Copy(conn, upstream); err != nil { - log.Printf("upstream -> source: Copy: %v", err) - } - wg.Done() - }() - go func() { - if _, err := io.Copy(upstream, conn); err != nil { - log.Printf("source -> upstream: Copy: %v", err) - } - wg.Done() - }() - wg.Wait() + proxy.ServeHTTP(w, r) } // Serve our cluster status page. @@ -266,44 +210,51 @@ func (h *HttpRedirector) Run(addr, staticDir, templateDir string) { // Create our HTTP handler stack. mux := http.NewServeMux() + + // Serve static content. Responses support gzip encoding. mux.Handle( "/static/", - http.StripPrefix( - "/static/", - http.FileServer(http.Dir(staticDir)))) - - // Optionally enable a reverse proxy to the local Icecast. - if *proxyStreams { - iceurl, _ := url.Parse(fmt.Sprintf("http://localhost:%d", autoradio.IcecastPort)) - mux.Handle(autoradio.IcecastMountPrefix+"/", httputil.NewSingleHostReverseProxy(iceurl)) - } + handlers.GZIPHandler( + http.StripPrefix( + "/static/", + http.FileServer(http.Dir(staticDir))), + nil)) // Pass /debug/ to the default ServeMux. mux.Handle("/debug/", http.DefaultServeMux) - relayHandler := h.withMount(h.serveRelay) + // The / handler should discriminate between a request for a + // stream or the status page. + statush := handlers.GZIPHandler(http.HandlerFunc(h.serveStatusPage), nil) + relayh := h.withMount(h.serveRelay) mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { switch { case r.URL.Path == "" || r.URL.Path == "/": - h.serveStatusPage(w, r) + statush.ServeHTTP(w, r) default: - relayHandler.ServeHTTP(w, r) + relayh.ServeHTTP(w, r) } }) // Add some handlers to support gzip-encoded responses and // request logging. - wraph := handlers.GZIPHandler(mux, nil) logopts := handlers.NewLogOptions(nil, handlers.Lshort) - wraph = handlers.LogHandler(wraph, logopts) + logopts.Immediate = true + wraph := handlers.LogHandler(mux, logopts) wraph = statsHandler(wraph) - // Serve SOURCE requests bypassing the logging and gzip + // Serve proxied requests bypassing the logging and gzip // handlers: since they wrap the ResponseWriter, we would be // unable to hijack the underlying connection for proxying. - // TODO: look into using WrapWriter. sourceHandler := h.withMount(h.serveSource) - rooth := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + root := http.NewServeMux() + + // Optionally enable a reverse proxy to the local Icecast. + if *proxyStreams { + iceurl, _ := url.Parse(fmt.Sprintf("http://localhost:%d", autoradio.IcecastPort)) + root.Handle(autoradio.IcecastMountPrefix+"/", NewSingleHostReverseProxy(iceurl)) + } + root.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { if r.Method == "SOURCE" { sourceHandler.ServeHTTP(w, r) } else { @@ -313,7 +264,7 @@ func (h *HttpRedirector) Run(addr, staticDir, templateDir string) { httpServer := &http.Server{ Addr: addr, - Handler: rooth, + Handler: root, ReadTimeout: 10 * time.Second, WriteTimeout: 10 * time.Second, } diff --git a/fe/proxy.go b/fe/proxy.go new file mode 100644 index 0000000000000000000000000000000000000000..ccf4503f593a49b92d8e90cdb1d56f9c6712fff6 --- /dev/null +++ b/fe/proxy.go @@ -0,0 +1,262 @@ +// Variant of the base http/httputil ReverseProxy suitable for +// low-latency, long-term connections. + +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// HTTP reverse proxy handler + +package fe + +import ( + "io" + "log" + "net" + "net/http" + "net/url" + "strings" + "sync" + "time" +) + +// onExitFlushLoop is a callback set by tests to detect the state of the +// flushLoop() goroutine. +var onExitFlushLoop func() + +// ReverseProxy is an HTTP Handler that takes an incoming request and +// sends it to another server, proxying the response back to the +// client. +type ReverseProxy struct { + // Director must be a function which modifies + // the request into a new request to be sent + // using Transport. Its response is then copied + // back to the original client unmodified. + Director func(*http.Request) + + // The transport used to perform proxy requests. + // If nil, http.DefaultTransport is used. + Transport http.RoundTripper + + // FlushInterval specifies the flush interval + // to flush to the client while copying the + // response body. + // If zero, no periodic flushing is done. + FlushInterval time.Duration +} + +func singleJoiningSlash(a, b string) string { + aslash := strings.HasSuffix(a, "/") + bslash := strings.HasPrefix(b, "/") + switch { + case aslash && bslash: + return a + b[1:] + case !aslash && !bslash: + return a + "/" + b + } + return a + b +} + +// NewSingleHostReverseProxy returns a new ReverseProxy that rewrites +// URLs to the scheme, host, and base path provided in target. If the +// target's path is "/base" and the incoming request was for "/dir", +// the target request will be for /base/dir. +func NewSingleHostReverseProxy(target *url.URL) *ReverseProxy { + targetQuery := target.RawQuery + director := func(req *http.Request) { + req.URL.Scheme = target.Scheme + req.URL.Host = target.Host + req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path) + if targetQuery == "" || req.URL.RawQuery == "" { + req.URL.RawQuery = targetQuery + req.URL.RawQuery + } else { + req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery + } + } + return &ReverseProxy{ + Director: director, + FlushInterval: 500 * time.Millisecond, + } +} + +func copyHeader(dst, src http.Header) { + for k, vv := range src { + for _, v := range vv { + dst.Add(k, v) + } + } +} + +// Hop-by-hop headers. These are removed when sent to the backend. +// http://www.w3.org/Protocols/rfc2616/rfc2616-sec13.html +var hopHeaders = []string{ + "Connection", + "Keep-Alive", + "Proxy-Authenticate", + "Proxy-Authorization", + "Te", // canonicalized version of "TE" + "Trailers", + "Transfer-Encoding", + "Upgrade", +} + +func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) { + transport := p.Transport + if transport == nil { + transport = http.DefaultTransport + } + + outreq := new(http.Request) + *outreq = *req // includes shallow copies of maps, but okay + + // Make a HTTP/1.0 connection to the backend. + p.Director(outreq) + outreq.Proto = "HTTP/1.0" + outreq.ProtoMajor = 1 + outreq.ProtoMinor = 0 + outreq.Close = true + + // Remove hop-by-hop headers to the backend. Especially + // important is "Connection" because we want a persistent + // connection, regardless of what the client sent to us. This + // is modifying the same underlying map from req (shallow + // copied above) so we only copy it if necessary. + copiedHeaders := false + for _, h := range hopHeaders { + if outreq.Header.Get(h) != "" { + if !copiedHeaders { + outreq.Header = make(http.Header) + copyHeader(outreq.Header, req.Header) + copiedHeaders = true + } + outreq.Header.Del(h) + } + } + + if clientIP, _, err := net.SplitHostPort(req.RemoteAddr); err == nil { + // If we aren't the first proxy retain prior + // X-Forwarded-For information as a comma+space + // separated list and fold multiple headers into one. + if prior, ok := outreq.Header["X-Forwarded-For"]; ok { + clientIP = strings.Join(prior, ", ") + ", " + clientIP + } + outreq.Header.Set("X-Forwarded-For", clientIP) + } + + // Create the upstream connection and write the HTTP request + // to it. + upstream, err := net.Dial("tcp", outreq.URL.Host) + if err != nil { + log.Printf("http: proxy dial error: %v", err) + rw.WriteHeader(http.StatusInternalServerError) + return + } + defer upstream.Close() + if err := outreq.Write(upstream); err != nil { + log.Printf("http: proxy request write error: %v", err) + rw.WriteHeader(http.StatusInternalServerError) + return + } + + // Hijack the request connection. + conn, _, err := rw.(http.Hijacker).Hijack() + if err != nil { + log.Printf("http: proxy hijack error: %v", err) + rw.WriteHeader(http.StatusInternalServerError) + return + } + defer conn.Close() + if err := conn.SetDeadline(time.Time{}); err != nil { + log.Printf("http: proxy setdeadline error: %v", err) + } + + // Run two-way proxying. + p.copyResponse(conn, upstream) +} + +func (p *ReverseProxy) copyResponse(conn io.ReadWriter, upstream io.ReadWriter) { + if p.FlushInterval != 0 { + if wf, ok := conn.(writeFlusher); ok { + cmlw := &maxLatencyWriter{ + dst: wf, + latency: p.FlushInterval, + done: make(chan bool), + } + go cmlw.flushLoop() + defer cmlw.stop() + conn = cmlw + } + if wf, ok := upstream.(writeFlusher); ok { + umlw := &maxLatencyWriter{ + dst: wf, + latency: p.FlushInterval, + done: make(chan bool), + } + go umlw.flushLoop() + defer umlw.stop() + upstream = umlw + } + } + + // Copy data in both directions. + var wg sync.WaitGroup + wg.Add(2) + + go func() { + if _, err := io.Copy(conn, upstream); err != nil { + log.Printf("http: proxy error: %v", err) + } + wg.Done() + }() + go func() { + if _, err := io.Copy(upstream, conn); err != nil { + log.Printf("http: proxy error: %v", err) + } + wg.Done() + }() + + wg.Wait() +} + +type writeFlusher interface { + io.ReadWriter + http.Flusher +} + +type maxLatencyWriter struct { + dst writeFlusher + latency time.Duration + + lk sync.Mutex // protects Write + Flush + done chan bool +} + +func (m *maxLatencyWriter) Read(p []byte) (int, error) { + return m.dst.Read(p) +} + +func (m *maxLatencyWriter) Write(p []byte) (int, error) { + m.lk.Lock() + defer m.lk.Unlock() + return m.dst.Write(p) +} + +func (m *maxLatencyWriter) flushLoop() { + t := time.NewTicker(m.latency) + defer t.Stop() + for { + select { + case <-m.done: + if onExitFlushLoop != nil { + onExitFlushLoop() + } + return + case <-t.C: + m.lk.Lock() + m.dst.Flush() + m.lk.Unlock() + } + } +} + +func (m *maxLatencyWriter) stop() { m.done <- true }