Skip to content
Snippets Groups Projects
Commit c263800d authored by ale's avatar ale
Browse files

use a low-level tcp proxy to relay streams

parent 7729bbcf
No related branches found
No related tags found
No related merge requests found
......@@ -13,10 +13,8 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"net/http/httputil"
_ "net/http/pprof"
"git.autistici.org/ale/autoradio"
......@@ -161,80 +159,26 @@ func (h *HttpRedirector) serveSource(mount *autoradio.Mount, w http.ResponseWrit
sourceConnections.Incr()
// Handy function to report a source error (the ones we care
// the most about because they measure failures in the
// coordination infrastructure).
sendErr := func(err error) {
http.Error(w, err.Error(), http.StatusServiceUnavailable)
sourceErrors.Incr()
}
// Find the current master node.
masterAddr, err := h.client.GetMasterAddr()
if err != nil {
log.Printf("source: no master: %s", err)
sendErr(err)
return
}
// Create the upstream connection, and write the HTTP request
// to it (with the right URL path).
upstream, err := net.Dial("tcp", icecastAddr(masterAddr))
if err != nil {
log.Printf("source: dial upstream: %v", err)
sendErr(err)
return
}
defer upstream.Close()
r.URL.Path = autoradio.MountNameToIcecastPath(mount.Name)
if err := r.Write(upstream); err != nil {
log.Printf("source: write upstream request: %v", err)
sendErr(err)
log.Printf("source error: no master: %v", err)
http.Error(w, err.Error(), http.StatusServiceUnavailable)
sourceErrors.Incr()
return
}
// Hijack the incoming connection. This is necessary, rather
// than using httputil.ReverseProxy, for two important
// reasons:
//
// 1) So that we can reset the timeout on the underlying
// network connection (which we have no use for once the
// stream has been established).
//
// 2) Because streaming is still mostly a HTTP/1.0 world, the
// HTTP/1.1 features used by Go's net/http package (mostly the
// chunked encoding, I think) will apparently confuse clients
// and servers alike.
//
conn, _, err := w.(http.Hijacker).Hijack()
if err != nil {
log.Printf("source: hijack failed: %v", err)
sendErr(err)
return
// Create a ReverseProxy on the fly with the right backend
// address.
proxy := &ReverseProxy{
Director: func(req *http.Request) {
req.URL.Scheme = "http"
req.URL.Host = icecastAddr(masterAddr)
req.URL.Path = autoradio.MountNameToIcecastPath(mount.Name)
},
FlushInterval: 500 * time.Millisecond,
}
defer conn.Close()
if err := conn.SetDeadline(time.Time{}); err != nil {
log.Printf("source: could not reset deadline: %v", err)
}
// Start two copiers, one for the source data, one for the
// replies. Wait until both are done.
var wg sync.WaitGroup
wg.Add(2)
go func() {
if _, err := io.Copy(conn, upstream); err != nil {
log.Printf("upstream -> source: Copy: %v", err)
}
wg.Done()
}()
go func() {
if _, err := io.Copy(upstream, conn); err != nil {
log.Printf("source -> upstream: Copy: %v", err)
}
wg.Done()
}()
wg.Wait()
proxy.ServeHTTP(w, r)
}
// Serve our cluster status page.
......@@ -266,44 +210,51 @@ func (h *HttpRedirector) Run(addr, staticDir, templateDir string) {
// Create our HTTP handler stack.
mux := http.NewServeMux()
// Serve static content. Responses support gzip encoding.
mux.Handle(
"/static/",
http.StripPrefix(
"/static/",
http.FileServer(http.Dir(staticDir))))
// Optionally enable a reverse proxy to the local Icecast.
if *proxyStreams {
iceurl, _ := url.Parse(fmt.Sprintf("http://localhost:%d", autoradio.IcecastPort))
mux.Handle(autoradio.IcecastMountPrefix+"/", httputil.NewSingleHostReverseProxy(iceurl))
}
handlers.GZIPHandler(
http.StripPrefix(
"/static/",
http.FileServer(http.Dir(staticDir))),
nil))
// Pass /debug/ to the default ServeMux.
mux.Handle("/debug/", http.DefaultServeMux)
relayHandler := h.withMount(h.serveRelay)
// The / handler should discriminate between a request for a
// stream or the status page.
statush := handlers.GZIPHandler(http.HandlerFunc(h.serveStatusPage), nil)
relayh := h.withMount(h.serveRelay)
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
switch {
case r.URL.Path == "" || r.URL.Path == "/":
h.serveStatusPage(w, r)
statush.ServeHTTP(w, r)
default:
relayHandler.ServeHTTP(w, r)
relayh.ServeHTTP(w, r)
}
})
// Add some handlers to support gzip-encoded responses and
// request logging.
wraph := handlers.GZIPHandler(mux, nil)
logopts := handlers.NewLogOptions(nil, handlers.Lshort)
wraph = handlers.LogHandler(wraph, logopts)
logopts.Immediate = true
wraph := handlers.LogHandler(mux, logopts)
wraph = statsHandler(wraph)
// Serve SOURCE requests bypassing the logging and gzip
// Serve proxied requests bypassing the logging and gzip
// handlers: since they wrap the ResponseWriter, we would be
// unable to hijack the underlying connection for proxying.
// TODO: look into using WrapWriter.
sourceHandler := h.withMount(h.serveSource)
rooth := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
root := http.NewServeMux()
// Optionally enable a reverse proxy to the local Icecast.
if *proxyStreams {
iceurl, _ := url.Parse(fmt.Sprintf("http://localhost:%d", autoradio.IcecastPort))
root.Handle(autoradio.IcecastMountPrefix+"/", NewSingleHostReverseProxy(iceurl))
}
root.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if r.Method == "SOURCE" {
sourceHandler.ServeHTTP(w, r)
} else {
......@@ -313,7 +264,7 @@ func (h *HttpRedirector) Run(addr, staticDir, templateDir string) {
httpServer := &http.Server{
Addr: addr,
Handler: rooth,
Handler: root,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
}
......
// Variant of the base http/httputil ReverseProxy suitable for
// low-latency, long-term connections.
// Copyright 2011 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// HTTP reverse proxy handler
package fe
import (
"io"
"log"
"net"
"net/http"
"net/url"
"strings"
"sync"
"time"
)
// onExitFlushLoop is a callback set by tests to detect the state of the
// flushLoop() goroutine.
var onExitFlushLoop func()
// ReverseProxy is an HTTP Handler that takes an incoming request and
// sends it to another server, proxying the response back to the
// client.
type ReverseProxy struct {
// Director must be a function which modifies
// the request into a new request to be sent
// using Transport. Its response is then copied
// back to the original client unmodified.
Director func(*http.Request)
// The transport used to perform proxy requests.
// If nil, http.DefaultTransport is used.
Transport http.RoundTripper
// FlushInterval specifies the flush interval
// to flush to the client while copying the
// response body.
// If zero, no periodic flushing is done.
FlushInterval time.Duration
}
func singleJoiningSlash(a, b string) string {
aslash := strings.HasSuffix(a, "/")
bslash := strings.HasPrefix(b, "/")
switch {
case aslash && bslash:
return a + b[1:]
case !aslash && !bslash:
return a + "/" + b
}
return a + b
}
// NewSingleHostReverseProxy returns a new ReverseProxy that rewrites
// URLs to the scheme, host, and base path provided in target. If the
// target's path is "/base" and the incoming request was for "/dir",
// the target request will be for /base/dir.
func NewSingleHostReverseProxy(target *url.URL) *ReverseProxy {
targetQuery := target.RawQuery
director := func(req *http.Request) {
req.URL.Scheme = target.Scheme
req.URL.Host = target.Host
req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path)
if targetQuery == "" || req.URL.RawQuery == "" {
req.URL.RawQuery = targetQuery + req.URL.RawQuery
} else {
req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery
}
}
return &ReverseProxy{
Director: director,
FlushInterval: 500 * time.Millisecond,
}
}
func copyHeader(dst, src http.Header) {
for k, vv := range src {
for _, v := range vv {
dst.Add(k, v)
}
}
}
// Hop-by-hop headers. These are removed when sent to the backend.
// http://www.w3.org/Protocols/rfc2616/rfc2616-sec13.html
var hopHeaders = []string{
"Connection",
"Keep-Alive",
"Proxy-Authenticate",
"Proxy-Authorization",
"Te", // canonicalized version of "TE"
"Trailers",
"Transfer-Encoding",
"Upgrade",
}
func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
transport := p.Transport
if transport == nil {
transport = http.DefaultTransport
}
outreq := new(http.Request)
*outreq = *req // includes shallow copies of maps, but okay
// Make a HTTP/1.0 connection to the backend.
p.Director(outreq)
outreq.Proto = "HTTP/1.0"
outreq.ProtoMajor = 1
outreq.ProtoMinor = 0
outreq.Close = true
// Remove hop-by-hop headers to the backend. Especially
// important is "Connection" because we want a persistent
// connection, regardless of what the client sent to us. This
// is modifying the same underlying map from req (shallow
// copied above) so we only copy it if necessary.
copiedHeaders := false
for _, h := range hopHeaders {
if outreq.Header.Get(h) != "" {
if !copiedHeaders {
outreq.Header = make(http.Header)
copyHeader(outreq.Header, req.Header)
copiedHeaders = true
}
outreq.Header.Del(h)
}
}
if clientIP, _, err := net.SplitHostPort(req.RemoteAddr); err == nil {
// If we aren't the first proxy retain prior
// X-Forwarded-For information as a comma+space
// separated list and fold multiple headers into one.
if prior, ok := outreq.Header["X-Forwarded-For"]; ok {
clientIP = strings.Join(prior, ", ") + ", " + clientIP
}
outreq.Header.Set("X-Forwarded-For", clientIP)
}
// Create the upstream connection and write the HTTP request
// to it.
upstream, err := net.Dial("tcp", outreq.URL.Host)
if err != nil {
log.Printf("http: proxy dial error: %v", err)
rw.WriteHeader(http.StatusInternalServerError)
return
}
defer upstream.Close()
if err := outreq.Write(upstream); err != nil {
log.Printf("http: proxy request write error: %v", err)
rw.WriteHeader(http.StatusInternalServerError)
return
}
// Hijack the request connection.
conn, _, err := rw.(http.Hijacker).Hijack()
if err != nil {
log.Printf("http: proxy hijack error: %v", err)
rw.WriteHeader(http.StatusInternalServerError)
return
}
defer conn.Close()
if err := conn.SetDeadline(time.Time{}); err != nil {
log.Printf("http: proxy setdeadline error: %v", err)
}
// Run two-way proxying.
p.copyResponse(conn, upstream)
}
func (p *ReverseProxy) copyResponse(conn io.ReadWriter, upstream io.ReadWriter) {
if p.FlushInterval != 0 {
if wf, ok := conn.(writeFlusher); ok {
cmlw := &maxLatencyWriter{
dst: wf,
latency: p.FlushInterval,
done: make(chan bool),
}
go cmlw.flushLoop()
defer cmlw.stop()
conn = cmlw
}
if wf, ok := upstream.(writeFlusher); ok {
umlw := &maxLatencyWriter{
dst: wf,
latency: p.FlushInterval,
done: make(chan bool),
}
go umlw.flushLoop()
defer umlw.stop()
upstream = umlw
}
}
// Copy data in both directions.
var wg sync.WaitGroup
wg.Add(2)
go func() {
if _, err := io.Copy(conn, upstream); err != nil {
log.Printf("http: proxy error: %v", err)
}
wg.Done()
}()
go func() {
if _, err := io.Copy(upstream, conn); err != nil {
log.Printf("http: proxy error: %v", err)
}
wg.Done()
}()
wg.Wait()
}
type writeFlusher interface {
io.ReadWriter
http.Flusher
}
type maxLatencyWriter struct {
dst writeFlusher
latency time.Duration
lk sync.Mutex // protects Write + Flush
done chan bool
}
func (m *maxLatencyWriter) Read(p []byte) (int, error) {
return m.dst.Read(p)
}
func (m *maxLatencyWriter) Write(p []byte) (int, error) {
m.lk.Lock()
defer m.lk.Unlock()
return m.dst.Write(p)
}
func (m *maxLatencyWriter) flushLoop() {
t := time.NewTicker(m.latency)
defer t.Stop()
for {
select {
case <-m.done:
if onExitFlushLoop != nil {
onExitFlushLoop()
}
return
case <-t.C:
m.lk.Lock()
m.dst.Flush()
m.lk.Unlock()
}
}
}
func (m *maxLatencyWriter) stop() { m.done <- true }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment