package fe import ( "bytes" "fmt" "html/template" "io" "log" "net" "net/http" "path/filepath" "strconv" "strings" "sync" "time" _ "net/http/pprof" "git.autistici.org/ale/autoradio" "git.autistici.org/ale/autoradio/instrumentation" "github.com/PuerkitoBio/ghost/handlers" ) var ( httpStatusCodes = instrumentation.NewCounter("http.status") httpTargetStats = instrumentation.NewCounter("http.target") sourceConnections = instrumentation.NewCounter("http.source_connections") source404Errors = instrumentation.NewCounter("http.source_404") sourceErrors = instrumentation.NewCounter("http.source_errors") ) type statsResponseWriter struct { http.ResponseWriter code int } func (w *statsResponseWriter) WriteHeader(code int) { w.code = code w.ResponseWriter.WriteHeader(code) } func statsHandler(h http.Handler) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { wrapw := &statsResponseWriter{w, 200} h.ServeHTTP(wrapw, r) httpStatusCodes.IncrVar(strconv.Itoa(wrapw.code)) } } // HTTP redirector. type HttpRedirector struct { domain string lb LoadBalancingPolicy client *autoradio.RadioAPI template *template.Template } func NewHttpRedirector(client *autoradio.RadioAPI, domain string, lbpolicy string) *HttpRedirector { return &HttpRedirector{ client: client, domain: domain, lb: getNamedLoadBalancingPolicy(lbpolicy), } } // Return an active node, chosen according to the current load // balancing policy. func (h *HttpRedirector) pickActiveNode() string { nodes, _ := h.client.GetNodes() if nodes == nil { return "" } // Filter nodes where Icecast is reported to be up. okNodes := make([]*autoradio.NodeStatus, 0, len(nodes)) for _, n := range nodes { if n.IcecastUp { okNodes = append(okNodes, n) } } if len(okNodes) == 0 { return "" } result := h.lb.GetNode(okNodes) if result == nil { return "" } return result.IP } // Parse the request and extract the mount path. func (h *HttpRedirector) getMount(r *http.Request) (*autoradio.Mount, error) { path := r.URL.Path if strings.HasSuffix(path, ".m3u") { path = path[:len(path)-4] } return h.client.GetMount(path) } func makeIcecastUrl(server string) string { return net.JoinHostPort(server, strconv.Itoa(autoradio.IcecastPort)) } // Serve a response for a client connection to a relay. func (h *HttpRedirector) serveRelay(w http.ResponseWriter, r *http.Request) { mount, err := h.getMount(r) if err != nil { http.Error(w, "Not Found", http.StatusNotFound) return } // Find an active node. relayAddr := h.pickActiveNode() if relayAddr == "" { http.Error(w, "No active nodes", http.StatusServiceUnavailable) return } httpTargetStats.IncrVar(ipToMetric(relayAddr)) // Create the m3u response. m3u := fmt.Sprintf("http://%s%s\n", makeIcecastUrl(relayAddr), mount.Name) w.Header().Set("Content-Length", strconv.Itoa(len(m3u))) w.Header().Set("Content-Type", "audio/x-mpegurl") w.Header().Set("Expires", "-1") w.Header().Set("Cache-Control", "private, max-age=0") io.WriteString(w, m3u) } func (h *HttpRedirector) serveSource(w http.ResponseWriter, r *http.Request) { m, err := h.getMount(r) if err != nil || m.IsRelay() { if err != nil { log.Printf("source: error retrieving mount for %+v: %s", r, err) } else { log.Printf("source: connection to relay stream %s", m.Name) } http.Error(w, "Not Found", http.StatusNotFound) source404Errors.Incr() return } sourceConnections.Incr() // Handy function to report a source error (the ones we care // the most about because they measure failures in the // coordination infrastructure). sendErr := func(err error) { http.Error(w, err.Error(), http.StatusServiceUnavailable) sourceErrors.Incr() } // Find the current master node. masterAddr, err := h.client.GetMasterAddr() if err != nil { log.Printf("source: no master: %s", err) sendErr(err) return } // Create the upstream connection, and write the original // request to it as-is (the URL path on the backend is the // same, and the headers do not need to change). upstream, err := net.Dial("tcp", makeIcecastUrl(masterAddr)) if err != nil { log.Printf("source: dial upstream: %v", err) sendErr(err) return } defer upstream.Close() if err := r.Write(upstream); err != nil { log.Printf("source: write upstream request: %v", err) sendErr(err) return } // Hijack the incoming connection. This is necessary, rather // than using httputil.ReverseProxy, for two important // reasons: // // 1) So that we can reset the timeout on the underlying // network connection (which we have no use for once the // stream has been established). // // 2) Because streaming is still mostly a HTTP/1.0 world, the // HTTP/1.1 features used by Go's net/http package (mostly the // chunked encoding, I think) will apparently confuse clients // and servers alike. // conn, _, err := w.(http.Hijacker).Hijack() if err != nil { log.Printf("source: hijack failed: %v", err) sendErr(err) return } defer conn.Close() if err := conn.SetDeadline(time.Time{}); err != nil { log.Printf("source: could not reset deadline: %v", err) } // Start two copiers, one for the source data, one for the // replies. Wait until both are done. var wg sync.WaitGroup wg.Add(2) go func() { if _, err := io.Copy(conn, upstream); err != nil { log.Printf("upstream -> source: Copy: %v", err) } wg.Done() }() go func() { if _, err := io.Copy(upstream, conn); err != nil { log.Printf("source -> upstream: Copy: %v", err) } wg.Done() }() wg.Wait() } func (h *HttpRedirector) serveStatusPage(w http.ResponseWriter, r *http.Request) { nodes, _ := h.client.GetNodes() mounts, _ := h.client.ListMounts() ctx := struct { Domain string Nodes []*autoradio.NodeStatus Mounts []*autoradio.Mount }{h.domain, nodes, mounts} var buf bytes.Buffer if err := h.template.ExecuteTemplate(&buf, "index.html", ctx); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "text/html; charset=utf-8") w.Header().Set("Content-Length", strconv.Itoa(buf.Len())) w.Header().Set("Expires", "-1") w.Header().Set("Cache-Control", "private, max-age=0") w.Write(buf.Bytes()) } // Run starts the HTTP server on the given addr. Does not return. func (h *HttpRedirector) Run(addr, staticDir, templateDir string) { h.template = template.Must( template.ParseGlob( filepath.Join(templateDir, "*.html"))) // Create our HTTP handler stack. Passes the /debug/ queries // along to the global ServeMux (where moodules such as pprof // install their handlers). mux := http.NewServeMux() mux.Handle( "/static/", http.StripPrefix( "/static/", http.FileServer(http.Dir(staticDir)))) mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { switch { case r.URL.Path == "" || r.URL.Path == "/": h.serveStatusPage(w, r) case strings.HasPrefix(r.URL.Path, "/debug/"): http.DefaultServeMux.ServeHTTP(w, r) default: h.serveRelay(w, r) } }) // Add some handlers to support gzip-encoded responses and // request logging. wraph := handlers.GZIPHandler(mux, nil) logopts := handlers.NewLogOptions(nil, handlers.Lshort) wraph = handlers.LogHandler(wraph, logopts) wraph = statsHandler(wraph) // Serve SOURCE requests bypassing the logging and gzip // handlers: since they wrap the ResponseWriter, we would be // unable to hijack the underlying connection for proxying. // TODO: look into using WrapWriter. rooth := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.Method == "SOURCE" { h.serveSource(w, r) } else { wraph.ServeHTTP(w, r) } }) httpServer := &http.Server{ Addr: addr, Handler: rooth, ReadTimeout: 10 * time.Second, WriteTimeout: 10 * time.Second, } log.Printf("starting HTTP server on %s/tcp", httpServer.Addr) log.Fatal(httpServer.ListenAndServe()) }