Commit 146c29f4 authored by ale's avatar ale
Browse files

use simple TCP proxy for /_stream

Proxied connections to Icecast are now handled with a simple TCP
proxy that can shutdown each direction independently. Fixes some
connection leaks.
parent faff40e6
autoradio (0.5.1p3) unstable; urgency=medium
* Switch back to simple TCP proxy for /_stream.
-- ale <ale@incal.net> Sun, 12 Jul 2015 19:11:20 +0100
autoradio (0.5.1p2) unstable; urgency=medium
* Fix crash when losing transcoder mastership.
......
......@@ -75,7 +75,7 @@ func NewSingleHostReverseProxy(target *url.URL) *ReverseProxy {
}
return &ReverseProxy{
Director: director,
FlushInterval: 500 * time.Millisecond,
FlushInterval: 100 * time.Millisecond,
}
}
......@@ -194,92 +194,31 @@ func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
}
// Run two-way proxying.
p.copyResponse(conn, upstream)
p.handleProxy(conn.(*net.TCPConn), upstream.(*net.TCPConn))
}
func (p *ReverseProxy) copyResponse(conn io.ReadWriter, upstream io.ReadWriter) {
if p.FlushInterval != 0 {
if wf, ok := conn.(writeFlusher); ok {
cmlw := &maxLatencyWriter{
dst: wf,
latency: p.FlushInterval,
done: make(chan bool),
}
go cmlw.flushLoop()
defer cmlw.stop()
conn = cmlw
}
if wf, ok := upstream.(writeFlusher); ok {
umlw := &maxLatencyWriter{
dst: wf,
latency: p.FlushInterval,
done: make(chan bool),
}
go umlw.flushLoop()
defer umlw.stop()
upstream = umlw
}
}
// Copy data in both directions.
// Simple two-way TCP proxy that copies data in both directions and
// can shutdown each direction of the connection independently.
func (p *ReverseProxy) handleProxy(conn *net.TCPConn, upstream *net.TCPConn) {
var wg sync.WaitGroup
wg.Add(2)
go func() {
if _, err := io.Copy(conn, upstream); err != nil {
log.Printf("http: proxy error: %v", err)
log.Printf("http: proxy error: client: %v", err)
}
upstream.CloseRead()
conn.CloseWrite()
wg.Done()
}()
go func() {
if _, err := io.Copy(upstream, conn); err != nil {
log.Printf("http: proxy error: %v", err)
log.Printf("http: proxy error: upstream: %v", err)
}
conn.CloseRead()
upstream.CloseWrite()
wg.Done()
}()
wg.Wait()
}
type writeFlusher interface {
io.ReadWriter
http.Flusher
}
type maxLatencyWriter struct {
dst writeFlusher
latency time.Duration
lk sync.Mutex // protects Write + Flush
done chan bool
}
func (m *maxLatencyWriter) Read(p []byte) (int, error) {
return m.dst.Read(p)
}
func (m *maxLatencyWriter) Write(p []byte) (int, error) {
m.lk.Lock()
defer m.lk.Unlock()
return m.dst.Write(p)
}
func (m *maxLatencyWriter) flushLoop() {
t := time.NewTicker(m.latency)
defer t.Stop()
for {
select {
case <-m.done:
if onExitFlushLoop != nil {
onExitFlushLoop()
}
return
case <-t.C:
m.lk.Lock()
m.dst.Flush()
m.lk.Unlock()
}
}
}
func (m *maxLatencyWriter) stop() { m.done <- true }
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment