From 146c29f4a513b9027329572a99e9fcce07f42506 Mon Sep 17 00:00:00 2001 From: ale <ale@incal.net> Date: Sun, 12 Jul 2015 19:14:15 +0100 Subject: [PATCH] use simple TCP proxy for /_stream Proxied connections to Icecast are now handled with a simple TCP proxy that can shutdown each direction independently. Fixes some connection leaks. --- debian/changelog | 6 ++++ fe/proxy.go | 83 +++++++----------------------------------------- 2 files changed, 17 insertions(+), 72 deletions(-) diff --git a/debian/changelog b/debian/changelog index f4eb8659..40fac3c0 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +autoradio (0.5.1p3) unstable; urgency=medium + + * Switch back to simple TCP proxy for /_stream. + + -- ale <ale@incal.net> Sun, 12 Jul 2015 19:11:20 +0100 + autoradio (0.5.1p2) unstable; urgency=medium * Fix crash when losing transcoder mastership. diff --git a/fe/proxy.go b/fe/proxy.go index 40f8e844..b4922dea 100644 --- a/fe/proxy.go +++ b/fe/proxy.go @@ -75,7 +75,7 @@ func NewSingleHostReverseProxy(target *url.URL) *ReverseProxy { } return &ReverseProxy{ Director: director, - FlushInterval: 500 * time.Millisecond, + FlushInterval: 100 * time.Millisecond, } } @@ -194,92 +194,31 @@ func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) { } // Run two-way proxying. - p.copyResponse(conn, upstream) + p.handleProxy(conn.(*net.TCPConn), upstream.(*net.TCPConn)) } -func (p *ReverseProxy) copyResponse(conn io.ReadWriter, upstream io.ReadWriter) { - if p.FlushInterval != 0 { - if wf, ok := conn.(writeFlusher); ok { - cmlw := &maxLatencyWriter{ - dst: wf, - latency: p.FlushInterval, - done: make(chan bool), - } - go cmlw.flushLoop() - defer cmlw.stop() - conn = cmlw - } - if wf, ok := upstream.(writeFlusher); ok { - umlw := &maxLatencyWriter{ - dst: wf, - latency: p.FlushInterval, - done: make(chan bool), - } - go umlw.flushLoop() - defer umlw.stop() - upstream = umlw - } - } - - // Copy data in both directions. +// Simple two-way TCP proxy that copies data in both directions and +// can shutdown each direction of the connection independently. +func (p *ReverseProxy) handleProxy(conn *net.TCPConn, upstream *net.TCPConn) { var wg sync.WaitGroup wg.Add(2) go func() { if _, err := io.Copy(conn, upstream); err != nil { - log.Printf("http: proxy error: %v", err) + log.Printf("http: proxy error: client: %v", err) } + upstream.CloseRead() + conn.CloseWrite() wg.Done() }() go func() { if _, err := io.Copy(upstream, conn); err != nil { - log.Printf("http: proxy error: %v", err) + log.Printf("http: proxy error: upstream: %v", err) } + conn.CloseRead() + upstream.CloseWrite() wg.Done() }() wg.Wait() } - -type writeFlusher interface { - io.ReadWriter - http.Flusher -} - -type maxLatencyWriter struct { - dst writeFlusher - latency time.Duration - - lk sync.Mutex // protects Write + Flush - done chan bool -} - -func (m *maxLatencyWriter) Read(p []byte) (int, error) { - return m.dst.Read(p) -} - -func (m *maxLatencyWriter) Write(p []byte) (int, error) { - m.lk.Lock() - defer m.lk.Unlock() - return m.dst.Write(p) -} - -func (m *maxLatencyWriter) flushLoop() { - t := time.NewTicker(m.latency) - defer t.Stop() - for { - select { - case <-m.done: - if onExitFlushLoop != nil { - onExitFlushLoop() - } - return - case <-t.C: - m.lk.Lock() - m.dst.Flush() - m.lk.Unlock() - } - } -} - -func (m *maxLatencyWriter) stop() { m.done <- true } -- GitLab