diff --git a/fe/http.go b/fe/http.go index ef0cf79eb6ac138ce4fa150a4fe4cbb2b7a3742f..45ff06992fe5bf02fd72c17cc4c12184de484e33 100644 --- a/fe/http.go +++ b/fe/http.go @@ -89,27 +89,29 @@ func (h *HttpRedirector) pickActiveNode() string { return result.IP } -// Parse the request and extract the mount path. -func (h *HttpRedirector) getMount(r *http.Request) (*autoradio.Mount, error) { - path := r.URL.Path - if strings.HasSuffix(path, ".m3u") { - path = path[:len(path)-4] - } - return h.client.GetMount(path) +func makeIcecastAddr(server string) string { + return net.JoinHostPort(server, strconv.Itoa(autoradio.IcecastPort)) } -func makeIcecastUrl(server string) string { - return net.JoinHostPort(server, strconv.Itoa(autoradio.IcecastPort)) +func makeIcecastUrl(server, mountName string) string { + return fmt.Sprintf("http://%s%s", makeIcecastAddr(server), mountName) } -// Serve a response for a client connection to a relay. -func (h *HttpRedirector) serveRelay(w http.ResponseWriter, r *http.Request) { - mount, err := h.getMount(r) - if err != nil { - http.Error(w, "Not Found", http.StatusNotFound) - return - } +// Request wrapper that passes a Mount along with the HTTP request. +func (h *HttpRedirector) withMount(f func(*autoradio.Mount, http.ResponseWriter, *http.Request)) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + mountPath := strings.TrimSuffix(r.URL.Path, ".m3u") + mount, err := h.client.GetMount(mountPath) + if err != nil { + http.Error(w, "Not Found", http.StatusNotFound) + return + } + f(mount, w, r) + }) +} +// Serve a response for a client connection to a relay. +func (h *HttpRedirector) serveRelay(mount *autoradio.Mount, w http.ResponseWriter, r *http.Request) { // Find an active node. relayAddr := h.pickActiveNode() if relayAddr == "" { @@ -119,23 +121,18 @@ func (h *HttpRedirector) serveRelay(w http.ResponseWriter, r *http.Request) { httpTargetStats.IncrVar(ipToMetric(relayAddr)) // Create the m3u response. - m3u := fmt.Sprintf("http://%s%s\n", makeIcecastUrl(relayAddr), mount.Name) + m3u := fmt.Sprintf("%s\n", makeIcecastUrl(relayAddr, mount.Name)) w.Header().Set("Content-Length", strconv.Itoa(len(m3u))) w.Header().Set("Content-Type", "audio/x-mpegurl") - w.Header().Set("Expires", "-1") - w.Header().Set("Cache-Control", "private, max-age=0") + addDefaultHeaders(w) io.WriteString(w, m3u) } -func (h *HttpRedirector) serveSource(w http.ResponseWriter, r *http.Request) { - m, err := h.getMount(r) - if err != nil || m.IsRelay() { - if err != nil { - log.Printf("source: error retrieving mount for %+v: %s", r, err) - } else { - log.Printf("source: connection to relay stream %s", m.Name) - } - http.Error(w, "Not Found", http.StatusNotFound) +// Handle SOURCE requests. +func (h *HttpRedirector) serveSource(mount *autoradio.Mount, w http.ResponseWriter, r *http.Request) { + if mount.IsRelay() { + log.Printf("source: connection to relay stream %s", mount.Name) + http.Error(w, "Source Connection To Relay Stream", http.StatusBadRequest) source404Errors.Incr() return } @@ -161,7 +158,7 @@ func (h *HttpRedirector) serveSource(w http.ResponseWriter, r *http.Request) { // Create the upstream connection, and write the original // request to it as-is (the URL path on the backend is the // same, and the headers do not need to change). - upstream, err := net.Dial("tcp", makeIcecastUrl(masterAddr)) + upstream, err := net.Dial("tcp", makeIcecastAddr(masterAddr)) if err != nil { log.Printf("source: dial upstream: %v", err) sendErr(err) @@ -217,6 +214,7 @@ func (h *HttpRedirector) serveSource(w http.ResponseWriter, r *http.Request) { wg.Wait() } +// Serve our cluster status page. func (h *HttpRedirector) serveStatusPage(w http.ResponseWriter, r *http.Request) { nodes, _ := h.client.GetNodes() mounts, _ := h.client.ListMounts() @@ -233,8 +231,7 @@ func (h *HttpRedirector) serveStatusPage(w http.ResponseWriter, r *http.Request) } w.Header().Set("Content-Type", "text/html; charset=utf-8") w.Header().Set("Content-Length", strconv.Itoa(buf.Len())) - w.Header().Set("Expires", "-1") - w.Header().Set("Cache-Control", "private, max-age=0") + addDefaultHeaders(w) w.Write(buf.Bytes()) } @@ -247,6 +244,7 @@ func (h *HttpRedirector) Run(addr, staticDir, templateDir string) { // Create our HTTP handler stack. Passes the /debug/ queries // along to the global ServeMux (where moodules such as pprof // install their handlers). + relayHandler := h.withMount(h.serveRelay) mux := http.NewServeMux() mux.Handle( "/static/", @@ -260,7 +258,7 @@ func (h *HttpRedirector) Run(addr, staticDir, templateDir string) { case strings.HasPrefix(r.URL.Path, "/debug/"): http.DefaultServeMux.ServeHTTP(w, r) default: - h.serveRelay(w, r) + relayHandler.ServeHTTP(w, r) } }) @@ -275,9 +273,10 @@ func (h *HttpRedirector) Run(addr, staticDir, templateDir string) { // handlers: since they wrap the ResponseWriter, we would be // unable to hijack the underlying connection for proxying. // TODO: look into using WrapWriter. + sourceHandler := h.withMount(h.serveSource) rooth := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.Method == "SOURCE" { - h.serveSource(w, r) + sourceHandler.ServeHTTP(w, r) } else { wraph.ServeHTTP(w, r) } @@ -293,3 +292,8 @@ func (h *HttpRedirector) Run(addr, staticDir, templateDir string) { log.Printf("starting HTTP server on %s/tcp", httpServer.Addr) log.Fatal(httpServer.ListenAndServe()) } + +func addDefaultHeaders(w http.ResponseWriter) { + w.Header().Set("Expires", "-1") + w.Header().Set("Cache-Control", "private, max-age=0") +}