package fe import ( "bytes" "flag" "fmt" "html/template" "io" "log" "net" "net/http" "net/url" "path" "path/filepath" "strconv" "strings" "time" _ "net/http/pprof" "git.autistici.org/ale/autoradio" "git.autistici.org/ale/autoradio/instrumentation" "github.com/PuerkitoBio/ghost/handlers" ) var ( proxyStreams = flag.Bool("enable-icecast-proxy", false, "Proxy the local icecast") httpStatusCodes = instrumentation.NewCounter("http.status") httpTargetStats = instrumentation.NewCounter("http.target") sourceConnections = instrumentation.NewCounter("http.source_connections") source404Errors = instrumentation.NewCounter("http.source_404") sourceErrors = instrumentation.NewCounter("http.source_errors") ) // ResponseWriter wrapper that logs an entry for every incoming HTTP // request, as soon as the response headers are sent. type logResponseWriter struct { http.ResponseWriter logged bool // Parameters of the original request. remoteAddr string method string path string proto string } func (w *logResponseWriter) writeLog(code int) { log.Printf("%s %s %s %s %d", w.remoteAddr, w.method, w.path, w.proto, code) httpStatusCodes.IncrVar(strconv.Itoa(code)) w.logged = true } func (w *logResponseWriter) WriteHeader(code int) { w.writeLog(code) w.ResponseWriter.WriteHeader(code) } func (w *logResponseWriter) Write(data []byte) (int, error) { // Catch the case when WriteHeader wasn't called. if !w.logged { w.writeLog(http.StatusOK) } return w.ResponseWriter.Write(data) } func (w *logResponseWriter) WrappedWriter() http.ResponseWriter { return w.ResponseWriter } func logHandler(h http.Handler) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { ip, _, err := net.SplitHostPort(r.RemoteAddr) if err != nil { ip = r.RemoteAddr } wrapw := &logResponseWriter{ ResponseWriter: w, remoteAddr: ip, method: r.Method, path: r.URL.Path, proto: r.Proto, } h.ServeHTTP(wrapw, r) } } // HTTP redirector. type HttpRedirector struct { domain string staticDir string lb *autoradioLoadBalancer client *autoradio.Client template *template.Template } func NewHttpRedirector(client *autoradio.Client, domain, lbspec, staticDir, templateDir string) (*HttpRedirector, error) { lb, err := parseLoadBalancerSpec(lbspec) if err != nil { return nil, err } tmpl := template.Must( template.ParseGlob( filepath.Join(templateDir, "*.html"))) return &HttpRedirector{ client: client, domain: domain, lb: lb, template: tmpl, staticDir: staticDir, }, nil } // Pick a random IP with a protocol appropriate to the request (based // on the remote address). func randomIpForRequest(ips []net.IP, r *http.Request) net.IP { remoteAddr := net.ParseIP(r.RemoteAddr) isV6 := (remoteAddr != nil && (remoteAddr.To4() == nil)) return randomIpByProto(ips, isV6) } type httpRequestContext struct { req *http.Request } func (r *httpRequestContext) RemoteAddr() net.IP { return net.ParseIP(r.req.RemoteAddr) } // Return an active node, chosen according to the current load // balancing policy. func (h *HttpRedirector) pickActiveNode(r *http.Request) net.IP { result := h.lb.Choose(&httpRequestContext{r}) if result == nil { return nil } return randomIpForRequest(result.IP, r) } func (h *HttpRedirector) lbUpdater() { for range time.NewTicker(2 * time.Second).C { nodes, err := h.client.GetNodes() if err != nil { continue } h.lb.Update(nodes) } } func icecastAddr(server net.IP) string { return net.JoinHostPort(server.String(), strconv.Itoa(autoradio.IcecastPort)) } func streamUrl(server net.IP, mountName string) string { var serverAddr string if *proxyStreams { serverAddr = server.String() } else { serverAddr = icecastAddr(server) } return fmt.Sprintf("http://%s%s", serverAddr, autoradio.MountNameToIcecastPath(mountName)) } // Request wrapper that passes a Mount along with the HTTP request. func (h *HttpRedirector) withMount(f func(*autoradio.Mount, http.ResponseWriter, *http.Request)) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { mountPath := strings.TrimSuffix(r.URL.Path, ".m3u") mount, err := h.client.GetMount(mountPath) if err != nil { http.Error(w, "Not Found", http.StatusNotFound) return } f(mount, w, r) }) } // Serve a M3U response. This simply points back at the stream // redirect handler. func (h *HttpRedirector) serveM3U(mount *autoradio.Mount, w http.ResponseWriter, r *http.Request) { m3u := strings.TrimSuffix(r.URL.String(), ".m3u") + "\n" w.Header().Set("Content-Length", strconv.Itoa(len(m3u))) w.Header().Set("Content-Type", "audio/x-mpegurl") addDefaultHeaders(w) io.WriteString(w, m3u) } // redirect replies to the request with a redirect to url, adding some // cache-busting headers. Code is mostly verbatim from net/http. // Serve a 307 for HTTP/1.1 clients, a 302 otherwise. func redirect(w http.ResponseWriter, r *http.Request, urlStr string) { if u, err := url.Parse(urlStr); err == nil { oldpath := r.URL.Path if oldpath == "" { oldpath = "/" } if u.Scheme == "" { if urlStr == "" || urlStr[0] != '/' { olddir, _ := path.Split(oldpath) urlStr = olddir + urlStr } var query string if i := strings.Index(urlStr, "?"); i != -1 { urlStr, query = urlStr[:i], urlStr[i:] } trailing := strings.HasSuffix(urlStr, "/") urlStr = path.Clean(urlStr) if trailing && !strings.HasSuffix(urlStr, "/") { urlStr += "/" } urlStr += query } } w.Header().Set("Location", urlStr) w.Header().Set("Cache-Control", "max-age=0,no-cache,no-store") w.Header().Set("Pragma", "no-cache") w.Header().Set("Expires", "-1") code := 302 if r.ProtoMinor == 1 { code = 307 } w.WriteHeader(code) } // Serve a response for a client connection to a relay. func (h *HttpRedirector) serveRelay(mount *autoradio.Mount, w http.ResponseWriter, r *http.Request) { // Find an active node. relayAddr := h.pickActiveNode(r) if relayAddr == nil { http.Error(w, "No active nodes", http.StatusServiceUnavailable) return } httpTargetStats.IncrVar(ipToMetric(relayAddr)) // See if we need to serve a M3U response or a redirect. if strings.HasSuffix(r.URL.Path, ".m3u") { h.serveM3U(mount, w, r) } else { targetURL := streamUrl(relayAddr, mount.Name) redirect(w, r, targetURL) } } // Handle SOURCE requests. func (h *HttpRedirector) serveSource(mount *autoradio.Mount, w http.ResponseWriter, r *http.Request) { if mount.IsRelay() { log.Printf("source: connection to relay stream %s", mount.Name) http.Error(w, "Stream is relayed", http.StatusBadRequest) source404Errors.Incr() return } sourceConnections.Incr() // Find the current master node. masterInfo, err := h.client.GetMasterInfo() if err != nil { log.Printf("source error: no master: %v", err) http.Error(w, err.Error(), http.StatusServiceUnavailable) sourceErrors.Incr() return } // Create a ReverseProxy on the fly with the right backend // address. Blindly assume that the master has at least one // public IP, and that we can reach it. target := icecastAddr(masterInfo.IP[0]) tracker.setConnState(r, StateSource, target) proxy := &ReverseProxy{ Director: func(req *http.Request) { req.URL.Scheme = "http" req.URL.Host = target req.URL.Path = autoradio.MountNameToIcecastPath(mount.Name) }, FlushInterval: 500 * time.Millisecond, } proxy.ServeHTTP(w, r) } // Serve our cluster status page. func (h *HttpRedirector) serveStatusPage(w http.ResponseWriter, r *http.Request) { nodes, _ := h.client.GetNodes() mounts, _ := h.client.ListMounts() ctx := struct { Domain string Nodes []*autoradio.NodeStatus Mounts []*autoradio.Mount }{h.domain, nodes, mounts} var buf bytes.Buffer if err := h.template.ExecuteTemplate(&buf, "index.html", ctx); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "text/html; charset=utf-8") w.Header().Set("Content-Length", strconv.Itoa(buf.Len())) addDefaultHeaders(w) w.Write(buf.Bytes()) } func (h *HttpRedirector) createHandler() http.Handler { // Create our HTTP handler stack. mux := http.NewServeMux() // Serve static content. mux.Handle( "/static/", handlers.GZIPHandler( http.StripPrefix( "/static/", http.FileServer(http.Dir(h.staticDir))), nil)) // Pass /debug/ to the default ServeMux, all the default debug // handlers are installed there. mux.Handle( "/debug/", handlers.GZIPHandler(http.DefaultServeMux, nil)) // Optionally enable a reverse proxy to the local Icecast for // the direct stream URLs (below IcecastMountPrefix). if *proxyStreams { iceProxy := NewSingleHostReverseProxy(&url.URL{Scheme: "http", Host: fmt.Sprintf("localhost:%d", autoradio.IcecastPort)}) mux.HandleFunc(autoradio.IcecastMountPrefix+"/", func(w http.ResponseWriter, r *http.Request) { tracker.setConnState(r, StateListener, "localhost") iceProxy.ServeHTTP(w, r) }) } // The root handler should discriminate between requests for // the status page, source and listener connections. relayHandler := h.withMount(h.serveRelay) sourceHandler := h.withMount(h.serveSource) statusPageHandler := handlers.GZIPHandler( http.HandlerFunc(h.serveStatusPage), nil) mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { switch { case r.Method == "SOURCE": sourceHandler.ServeHTTP(w, r) case r.URL.Path == "" || r.URL.Path == "/": statusPageHandler.ServeHTTP(w, r) default: relayHandler.ServeHTTP(w, r) } }) // Instrument the resulting HTTP handler. return logHandler(trackRequestsHandler(mux)) } // Run starts the HTTP server on the given addr. Does not return. func (h *HttpRedirector) Run(addr string) { // Start the background goroutine that updates the // LoadBalancer asynchronously. go h.lbUpdater() httpServer := &http.Server{ Addr: addr, Handler: h.createHandler(), ReadTimeout: 10 * time.Second, WriteTimeout: 10 * time.Second, } log.Printf("starting HTTP server on %s/tcp", httpServer.Addr) log.Fatal(httpServer.ListenAndServe()) } func addDefaultHeaders(w http.ResponseWriter) { w.Header().Set("Expires", "-1") w.Header().Set("Cache-Control", "private, max-age=0") }