diff --git a/fe/http.go b/fe/http.go index 49a24368a548c5dc8b2e1c10e6c0682449dfc037..ff0333bbfc98b6ab9e9a0432266351d9b32f2743 100644 --- a/fe/http.go +++ b/fe/http.go @@ -9,13 +9,16 @@ import ( "math/rand" "net" "net/http" - "net/http/httputil" "path/filepath" "strconv" "strings" + "sync" "time" + _ "net/http/pprof" + "git.autistici.org/ale/radioai" + "github.com/PuerkitoBio/ghost/handlers" ) // HTTP redirector. @@ -81,8 +84,9 @@ func (h *HttpRedirector) serveRelay(w http.ResponseWriter, r *http.Request) { } func (h *HttpRedirector) serveSource(w http.ResponseWriter, r *http.Request) { - mount, err := h.getMount(r) + _, err := h.getMount(r) if err != nil { + log.Printf("source: error retrieving mount for %+v: %s", r, err) http.Error(w, "Not Found", http.StatusNotFound) return } @@ -90,20 +94,57 @@ func (h *HttpRedirector) serveSource(w http.ResponseWriter, r *http.Request) { // Find the current master node. masterAddr, err := h.client.GetMasterAddr() if err != nil { + log.Printf("source: no master: %s", err) + http.Error(w, err.Error(), http.StatusServiceUnavailable) + return + } + + // Hijack the incoming connection. This is just so that we can + // reset the timeout on the underlying network connection + // (which we have no use for once the stream has been + // established), but then we get to run the two-way proxy... + conn, _, err := w.(http.Hijacker).Hijack() + if err != nil { + log.Printf("source: hijack failed: %v", err) http.Error(w, err.Error(), http.StatusServiceUnavailable) return } + defer conn.Close() + if err := conn.SetDeadline(time.Time{}); err != nil { + log.Printf("source: could not reset deadline: %v", err) + } - // Proxy the resulting connection. - proxy := &httputil.ReverseProxy{ - Director: func(req *http.Request) { - req.URL.Scheme = "http" - req.URL.Host = makeIcecastUrl(masterAddr) - req.URL.Path = mount.Name - }, - FlushInterval: time.Second, + // Create the upstream connection, and write the original + // request to it as-is (the URL path on the backend is the + // same, and the headers do not need to change). + upstream, err := net.Dial("tcp", makeIcecastUrl(masterAddr)) + if err != nil { + log.Printf("source: dial upstream: %v", err) + return } - proxy.ServeHTTP(w, r) + defer upstream.Close() + if err := r.Write(upstream); err != nil { + log.Printf("source: write upstream request: %v", err) + return + } + + // Start two copiers, one for the source data, one for the + // replies. Wait until both are done. + var wg sync.WaitGroup + wg.Add(2) + go func() { + if _, err := io.Copy(conn, upstream); err != nil { + log.Printf("upstream -> source: Copy: %v", err) + } + wg.Done() + }() + go func() { + if _, err := io.Copy(upstream, conn); err != nil { + log.Printf("source -> upstream: Copy: %v", err) + } + wg.Done() + }() + wg.Wait() } func (h *HttpRedirector) serveStatusPage(w http.ResponseWriter, r *http.Request) { @@ -129,7 +170,7 @@ func (h *HttpRedirector) ServeHTTP(w http.ResponseWriter, r *http.Request) { if r.URL.Path == "" || r.URL.Path == "/" { // Serve the status page through a GZIPHandler. Binds // to h using function closure. - handler := GZIPHandler( + handler := handlers.GZIPHandler( http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { h.serveStatusPage(w, r) }), nil) @@ -152,13 +193,20 @@ func (h *HttpRedirector) Run(addr, staticDir, templateDir string) { mux := http.NewServeMux() mux.HandleFunc( "/static/", - GZIPHandler( + handlers.GZIPHandler( http.StripPrefix( "/static/", http.FileServer(http.Dir(staticDir))), nil)) mux.Handle("/", h) + // It would be nice to add a logging handler on top of + // everything, but we would become unable to hijack the + // connection on SOURCE requests... + // + //logopts := handlers.NewLogOptions(nil, handlers.Lshort) + //logger := handlers.LogHandler(mux, logopts) + httpServer := &http.Server{ Addr: addr, Handler: mux,