Skip to content
Snippets Groups Projects
Commit 04b4f7d7 authored by ale's avatar ale
Browse files

run a simple two-way proxy for SOURCE requests that does not time out

parent 9a760770
No related branches found
No related tags found
No related merge requests found
......@@ -9,13 +9,16 @@ import (
"math/rand"
"net"
"net/http"
"net/http/httputil"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
_ "net/http/pprof"
"git.autistici.org/ale/radioai"
"github.com/PuerkitoBio/ghost/handlers"
)
// HTTP redirector.
......@@ -81,8 +84,9 @@ func (h *HttpRedirector) serveRelay(w http.ResponseWriter, r *http.Request) {
}
func (h *HttpRedirector) serveSource(w http.ResponseWriter, r *http.Request) {
mount, err := h.getMount(r)
_, err := h.getMount(r)
if err != nil {
log.Printf("source: error retrieving mount for %+v: %s", r, err)
http.Error(w, "Not Found", http.StatusNotFound)
return
}
......@@ -90,20 +94,57 @@ func (h *HttpRedirector) serveSource(w http.ResponseWriter, r *http.Request) {
// Find the current master node.
masterAddr, err := h.client.GetMasterAddr()
if err != nil {
log.Printf("source: no master: %s", err)
http.Error(w, err.Error(), http.StatusServiceUnavailable)
return
}
// Hijack the incoming connection. This is just so that we can
// reset the timeout on the underlying network connection
// (which we have no use for once the stream has been
// established), but then we get to run the two-way proxy...
conn, _, err := w.(http.Hijacker).Hijack()
if err != nil {
log.Printf("source: hijack failed: %v", err)
http.Error(w, err.Error(), http.StatusServiceUnavailable)
return
}
defer conn.Close()
if err := conn.SetDeadline(time.Time{}); err != nil {
log.Printf("source: could not reset deadline: %v", err)
}
// Create the upstream connection, and write the original
// request to it as-is (the URL path on the backend is the
// same, and the headers do not need to change).
upstream, err := net.Dial("tcp", makeIcecastUrl(masterAddr))
if err != nil {
log.Printf("source: dial upstream: %v", err)
return
}
defer upstream.Close()
if err := r.Write(upstream); err != nil {
log.Printf("source: write upstream request: %v", err)
return
}
// Proxy the resulting connection.
proxy := &httputil.ReverseProxy{
Director: func(req *http.Request) {
req.URL.Scheme = "http"
req.URL.Host = makeIcecastUrl(masterAddr)
req.URL.Path = mount.Name
},
FlushInterval: time.Second,
// Start two copiers, one for the source data, one for the
// replies. Wait until both are done.
var wg sync.WaitGroup
wg.Add(2)
go func() {
if _, err := io.Copy(conn, upstream); err != nil {
log.Printf("upstream -> source: Copy: %v", err)
}
proxy.ServeHTTP(w, r)
wg.Done()
}()
go func() {
if _, err := io.Copy(upstream, conn); err != nil {
log.Printf("source -> upstream: Copy: %v", err)
}
wg.Done()
}()
wg.Wait()
}
func (h *HttpRedirector) serveStatusPage(w http.ResponseWriter, r *http.Request) {
......@@ -129,7 +170,7 @@ func (h *HttpRedirector) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "" || r.URL.Path == "/" {
// Serve the status page through a GZIPHandler. Binds
// to h using function closure.
handler := GZIPHandler(
handler := handlers.GZIPHandler(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
h.serveStatusPage(w, r)
}), nil)
......@@ -152,13 +193,20 @@ func (h *HttpRedirector) Run(addr, staticDir, templateDir string) {
mux := http.NewServeMux()
mux.HandleFunc(
"/static/",
GZIPHandler(
handlers.GZIPHandler(
http.StripPrefix(
"/static/",
http.FileServer(http.Dir(staticDir))),
nil))
mux.Handle("/", h)
// It would be nice to add a logging handler on top of
// everything, but we would become unable to hijack the
// connection on SOURCE requests...
//
//logopts := handlers.NewLogOptions(nil, handlers.Lshort)
//logger := handlers.LogHandler(mux, logopts)
httpServer := &http.Server{
Addr: addr,
Handler: mux,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment