diff --git a/api.go b/api.go index ec0ff4e90ba38caf33d8f40e53ba261337575a7d..d1b3789c1811f675aea081dd6425838cc14bee68 100644 --- a/api.go +++ b/api.go @@ -26,22 +26,18 @@ const ( // partitioned in the meantime). ABIVersion = "4" + // Prefixes for the etcd-based leader elections. ElectionPrefix = EtcdPrefix + "election/" + ABIVersion + "/" IcecastElectionPrefix = ElectionPrefix + "icecast/" TranscoderElectionPrefix = ElectionPrefix + "transcode/" - //MasterElectionPath = "/icecast/" + ABIVersion + "/cluster/master" - //TranscoderMasterElectionBase = "/icecast/" + ABIVersion + "/transcode" - //NodePrefix = "/icecast/" + ABIVersion + "/nodes/" + // Prefix for the streams served directly by Icecast. IcecastMountPrefix = "/_stream" // Prefixes for etcd-based service discovery. - EndpointPrefix = EtcdPrefix + "endpoints/" + ABIVersion + "/" - PublicEndpointPrefix = EndpointPrefix + "frontend/public/" - GossipEndpointPrefix = EndpointPrefix + "frontend/gossip/" - StatusEndpointPrefix = EndpointPrefix + "frontend/status/" - IcecastPublicEndpointPrefix = EndpointPrefix + "node/icecast-public/" - IcecastPeerEndpointPrefix = EndpointPrefix + "node/icecast-peer/" + EndpointPrefix = EtcdPrefix + "endpoints/" + ABIVersion + "/" + PublicEndpointPrefix = EndpointPrefix + "frontend/public/" + StatusEndpointPrefix = EndpointPrefix + "frontend/status/" ) // IcecastPort is the port that the Icecast server will listen diff --git a/node/bwmonitor/bwmonitor.go b/node/bwmonitor/bwmonitor.go index 25f915be549ca7eb0e812e862a773fc024623ba9..13757e917c5b1245f1cb9d5dd3285c697614eca0 100644 --- a/node/bwmonitor/bwmonitor.go +++ b/node/bwmonitor/bwmonitor.go @@ -13,10 +13,6 @@ import ( var spacesRx = regexp.MustCompile(`\s+`) -type netDevCounts struct { - bytesSent int -} - func getBytesSentForDevice(dev string) (uint64, error) { file, err := os.Open("/proc/net/dev") if err != nil { diff --git a/node/debug.go b/node/debug.go index 726827b1ce5c89dda484581a1b6f96fe3d9410c7..5020d98e4972df0f5dfffe8b4dd093e8708bc535 100644 --- a/node/debug.go +++ b/node/debug.go @@ -6,31 +6,31 @@ import ( pb "git.autistici.org/ale/autoradio/proto" ) -// MountStatus reports the configuration and status of a mount, +// mountStatus reports the configuration and status of a mount, // including eventual transcoded mounts that source it. -type MountStatus struct { +type mountStatus struct { Mount *pb.Mount Listeners int - TransMounts []*MountStatus + TransMounts []*mountStatus } -func newMountStatus(m *pb.Mount, nodes []*pb.Status) *MountStatus { +func newMountStatus(m *pb.Mount, nodes []*nodeInfo) *mountStatus { var listeners int for _, n := range nodes { - for _, ims := range n.IcecastMounts { + for _, ims := range n.status.IcecastMounts { if ims.Path == m.Path { listeners += int(ims.Listeners) break } } } - return &MountStatus{ + return &mountStatus{ Mount: m, Listeners: listeners, } } -type mountStatusList []*MountStatus +type mountStatusList []*mountStatus func (l mountStatusList) Len() int { return len(l) } func (l mountStatusList) Swap(i, j int) { l[i], l[j] = l[j], l[i] } @@ -38,13 +38,13 @@ func (l mountStatusList) Less(i, j int) bool { return l[i].Mount.Path < l[j].Mount.Path } -// MountsToStatus converts a list of mounts (and eventually the +// mountsToStatus converts a list of mounts (and eventually the // current list of nodes) to a nicely sorted and tree-aggregated list -// of MountStatus objects. The list of nodes can be nil, in which case +// of mountStatus objects. The list of nodes can be nil, in which case // listener statistics will be omitted. -func MountsToStatus(mounts []*pb.Mount, nodes []*pb.Status) []*MountStatus { +func mountsToStatus(mounts []*pb.Mount, nodes []*nodeInfo) []*mountStatus { // Aggregate stats, and create a tree of transcoding mounts. - ms := make(map[string]*MountStatus) + ms := make(map[string]*mountStatus) for _, m := range mounts { if m.HasTranscoder() { continue @@ -61,7 +61,7 @@ func MountsToStatus(mounts []*pb.Mount, nodes []*pb.Status) []*MountStatus { } src.TransMounts = append(src.TransMounts, newMountStatus(m, nodes)) } - msl := make([]*MountStatus, 0, len(ms)) + msl := make([]*mountStatus, 0, len(ms)) for _, m := range ms { msl = append(msl, m) } diff --git a/node/dns.go b/node/dns.go index 473a2ce52e400e9d0a616bfde1b406f340556023..c9bb8bbca540e9a9169d55a3fed618d1e5d12799 100644 --- a/node/dns.go +++ b/node/dns.go @@ -111,17 +111,17 @@ func (d *dnsServer) ServeDNS(w dns.ResponseWriter, req *dns.Msg) { m.SetReply(req) m.MsgHdr.Authoritative = true - w.WriteMsg(m) + w.WriteMsg(m) //nolint return servFail: m.SetRcode(req, dns.RcodeServerFailure) - w.WriteMsg(m) + w.WriteMsg(m) //nolint return nxDomain: m.SetRcode(req, dns.RcodeNameError) - w.WriteMsg(m) + w.WriteMsg(m) //nolint } func (d *dnsServer) getNodeIPs(q dns.Question) []net.IP { diff --git a/node/http.go b/node/http.go index eb240d57733b4812690dbbf348be08a0b78b44cf..95861ffdf2a576ec45901f440ba06b69ac0f9e5b 100644 --- a/node/http.go +++ b/node/http.go @@ -1,6 +1,9 @@ package node +//go:generate go-bindata --nocompress --pkg node static/... templates/... + import ( + "bytes" "fmt" "io" "log" @@ -11,43 +14,68 @@ import ( "git.autistici.org/ale/autoradio" pb "git.autistici.org/ale/autoradio/proto" + "github.com/alecthomas/template" + assetfs "github.com/elazarl/go-bindata-assetfs" ) -func NewHTTP(n *Node, lb *loadBalancer, icecastPort int) http.Handler { +func NewHTTP(n *Node, lb *loadBalancer, icecastPort int, domain string) http.Handler { mux := http.NewServeMux() - - proxy := NewIcecastProxy(&url.URL{ - Scheme: "http", - Host: fmt.Sprintf("http://localhost:%d", icecastPort), - Path: "/", - }) - mux.HandleFunc(autoradio.IcecastMountPrefix+"/", func(w http.ResponseWriter, r *http.Request) { - // instrumentation.addListener - // defer instrumentation.deleteListener - proxy.ServeHTTP(w, r) + tpl := mustParseEmbeddedTemplates() + + // Serve /static/ from builtin assets. + mux.Handle("/static/", http.StripPrefix("/static/", http.FileServer(&assetfs.AssetFS{ + Asset: Asset, + AssetDir: AssetDir, + AssetInfo: AssetInfo, + Prefix: "static", + }))) + + // Requests for /_stream/ go straight to the local Icecast. + proxyHandler := http.StripPrefix(autoradio.IcecastMountPrefix, + withMount(n, func(m *pb.Mount, w http.ResponseWriter, r *http.Request) { + doIcecastProxy(w, r, &url.URL{ + Scheme: "http", + Host: fmt.Sprintf("localhost:%d", icecastPort), + Path: autoradio.MountPathToIcecastPath(m.Path), + }, m.Path) + })) + + // redirectHandler serves different kinds of redirects, either + // an M3U or a HTTP redirect, for the public stream URLs (ones + // without the /_stream prefix). + redirectHandler := withMount(n, func(m *pb.Mount, w http.ResponseWriter, r *http.Request) { + serveRedirect(n.lb, m, w, r) }) - relayHandler := withMount(n, func(m *pb.Mount, w http.ResponseWriter, r *http.Request) { - serveRelay(lb, m, w, r) - }) + // sourceHandler deals with SOURCE requests to the public + // stream URL, which are forwarded straight to the master + // Icecast node. sourceHandler := withMount(n, func(m *pb.Mount, w http.ResponseWriter, r *http.Request) { serveSource(n, m, w, r) }) + + streamPrefixSlash := autoradio.IcecastMountPrefix + "/" mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { switch { + case r.Method == "GET" && strings.HasPrefix(r.URL.Path, streamPrefixSlash): + proxyHandler.ServeHTTP(w, r) case r.Method == "SOURCE" || r.Method == "PUT": sourceHandler.ServeHTTP(w, r) - case r.URL.Path == "" || r.URL.Path == "/": - serveStatusPage(n, w, r) + case r.Method == "GET" && (r.URL.Path == "" || r.URL.Path == "/"): + serveStatusPage(n, w, r, tpl, domain) + case r.Method == "GET": + redirectHandler.ServeHTTP(w, r) default: - relayHandler.ServeHTTP(w, r) + http.NotFound(w, r) } }) return mux } -// Request wrapper that passes a Mount along with the HTTP request. +// Request wrapper that passes a Mount along with the HTTP +// request. The mount path is just the URL path without an eventual +// .m3u extension. func withMount(n *Node, f func(*pb.Mount, http.ResponseWriter, *http.Request)) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { mountPath := strings.TrimSuffix(r.URL.Path, ".m3u") @@ -74,15 +102,14 @@ func serveSource(n *Node, mount *pb.Mount, w http.ResponseWriter, r *http.Reques return } - proxy := NewIcecastProxy(&url.URL{ + doIcecastProxy(w, r, &url.URL{ Scheme: "http", Host: targetAddr, Path: autoradio.MountPathToIcecastPath(mount.Path), - }) - proxy.ServeHTTP(w, r) + }, mount.Path) } -func serveRelay(lb *loadBalancer, mount *pb.Mount, w http.ResponseWriter, r *http.Request) { +func serveRedirect(lb *loadBalancer, mount *pb.Mount, w http.ResponseWriter, r *http.Request) { // Redirect the user to a final target, depending on the load // balancing algorithm's decision. This enforces the // 1:1 mapping between Icecasts and frontends. @@ -101,24 +128,24 @@ func serveRelay(lb *loadBalancer, mount *pb.Mount, w http.ResponseWriter, r *htt } if strings.HasSuffix(r.URL.Path, ".m3u") { - serveM3U(w, r) + sendM3U(w, r) } else { targetURL := fmt.Sprintf("http://%s%s", targetAddr, autoradio.MountPathToIcecastPath(mount.Path)) - serveRedirect(w, r, targetURL) + sendRedirect(w, r, targetURL) } } // Serve a M3U response. This simply points back at the stream // redirect handler by dropping the .m3u suffix in the request URL. -func serveM3U(w http.ResponseWriter, r *http.Request) { +func sendM3U(w http.ResponseWriter, r *http.Request) { m3u := strings.TrimSuffix(r.URL.String(), ".m3u") + "\n" w.Header().Set("Content-Length", strconv.Itoa(len(m3u))) w.Header().Set("Content-Type", "audio/x-mpegurl") addDefaultHeaders(w) - io.WriteString(w, m3u) + io.WriteString(w, m3u) //nolint } -func serveRedirect(w http.ResponseWriter, r *http.Request, targetURL string) { +func sendRedirect(w http.ResponseWriter, r *http.Request, targetURL string) { // Firefox apparently caches redirects regardless of // the status code, so we have to add some quite // aggressive cache-busting headers. We serve a status @@ -133,10 +160,47 @@ func serveRedirect(w http.ResponseWriter, r *http.Request, targetURL string) { http.Redirect(w, r, targetURL, code) } -func serveStatusPage(n *Node, w http.ResponseWriter, r *http.Request) { +func serveStatusPage(n *Node, w http.ResponseWriter, r *http.Request, tpl *template.Template, domain string) { + nodes := n.lb.getNodes() + ms := mountsToStatus(n.mounts.GetMounts(), nodes) + ctx := struct { + Domain string + Nodes []*nodeInfo + Mounts []*mountStatus + }{domain, nodes, ms} + + var buf bytes.Buffer + if err := tpl.ExecuteTemplate(&buf, "index.html", ctx); err != nil { + log.Printf("error rendering status page: %v", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "text/html; charset=utf-8") + w.Header().Set("Content-Length", strconv.Itoa(buf.Len())) + addDefaultHeaders(w) + w.Write(buf.Bytes()) //nolint } func addDefaultHeaders(w http.ResponseWriter) { w.Header().Set("Expires", "-1") w.Header().Set("Cache-Control", "no-cache,no-store") } + +// Parse the templates that are embedded with the binary (in bindata.go). +func mustParseEmbeddedTemplates() *template.Template { + root := template.New("") + files, err := AssetDir("templates") + if err != nil { + panic(err) + } + for _, f := range files { + b, err := Asset("templates/" + f) + if err != nil { + log.Fatalf("could not read embedded template %s: %v", f, err) + } + if _, err := root.New(f).Parse(string(b)); err != nil { + log.Fatalf("error parsing template %s: %v", f, err) + } + } + return root +} diff --git a/node/instrumentation.go b/node/instrumentation.go new file mode 100644 index 0000000000000000000000000000000000000000..e542e7bae5199a43fd66b139935190f5334a2508 --- /dev/null +++ b/node/instrumentation.go @@ -0,0 +1,35 @@ +package node + +import "github.com/prometheus/client_golang/prometheus" + +var ( + streamSentBytes = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "bytes_sent", + Help: "Bytes proxied to the client, by stream.", + }, + []string{"stream"}, + ) + streamRcvdBytes = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "bytes_received", + Help: "Bytes received from the client, by stream.", + }, + []string{"stream"}, + ) + streamListeners = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "listeners", + Help: "Number of current listeners.", + }, + []string{"stream"}, + ) +) + +func init() { + prometheus.MustRegister( + streamSentBytes, + streamRcvdBytes, + streamListeners, + ) +} diff --git a/node/proxy.go b/node/proxy.go index 26c77515b8a7d7140f9d8c4808e49a33e17088e0..906c33666636759c83a358ec5fb8d73f401c0edb 100644 --- a/node/proxy.go +++ b/node/proxy.go @@ -18,50 +18,9 @@ import ( "strings" "sync" "time" -) - -// ReverseProxy is an HTTP Handler that takes an incoming request and -// sends it to another server, proxying the response back to the -// client. -type ReverseProxy struct { - // Director must be a function which modifies the request into - // a new request to be sent to the upstream. Its response is - // then copied back to the original client unmodified. - Director func(*http.Request) -} - -func singleJoiningSlash(a, b string) string { - aslash := strings.HasSuffix(a, "/") - bslash := strings.HasPrefix(b, "/") - switch { - case aslash && bslash: - return a + b[1:] - case !aslash && !bslash: - return a + "/" + b - } - return a + b -} -// NewIcecastProxy returns a new ReverseProxy that rewrites URLs to -// the scheme, host, and base path provided in target. If the target's -// path is "/base" and the incoming request was for "/dir", the target -// request will be for /base/dir. -func NewIcecastProxy(target *url.URL) *ReverseProxy { - targetQuery := target.RawQuery - director := func(req *http.Request) { - req.URL.Scheme = target.Scheme - req.URL.Host = target.Host - req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path) - if targetQuery == "" || req.URL.RawQuery == "" { - req.URL.RawQuery = targetQuery + req.URL.RawQuery - } else { - req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery - } - } - return &ReverseProxy{ - Director: director, - } -} + "github.com/prometheus/client_golang/prometheus" +) func copyHeader(dst, src http.Header) { for k, vv := range src { @@ -88,12 +47,22 @@ type wrappedWriter interface { WrappedWriter() http.ResponseWriter } -func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) { +// Proxy a request to the desired backend. Due to the way the Icecast +// protocol works, this just dumps the initial (rewritten) HTTP/1.0 +// request, and then switches to a full bi-directional TCP proxy. The +// outbound request is built from the target host, path, and eventual +// query string parameters and headers passed on from the original +// request. The additional streamName parameter is used for +// instrumentation. +func doIcecastProxy(rw http.ResponseWriter, req *http.Request, target *url.URL, streamName string) { outreq := new(http.Request) *outreq = *req // includes shallow copies of maps, but okay // Make a HTTP/1.0 connection to the backend. - p.Director(outreq) + outreq.URL.Scheme = target.Scheme + outreq.URL.Host = target.Host + //outreq.URL.Path = singleJoiningSlash(target.Path, req.URL.Path) + outreq.URL.Path = target.Path outreq.Proto = "HTTP/1.0" outreq.ProtoMajor = 1 outreq.ProtoMinor = 0 @@ -171,37 +140,45 @@ func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) { } // Run two-way proxying. - p.handleProxy(conn.(*net.TCPConn), upstream.(*net.TCPConn)) + handleProxy(conn.(*net.TCPConn), upstream.(*net.TCPConn), streamName) +} + +func copyStream(out, in *net.TCPConn, promCounter prometheus.Counter) { + buf := getBuf() + defer releaseBuf(buf) + defer in.CloseRead() //nolint + defer out.CloseWrite() //nolint + + for { + n, err := io.CopyBuffer(out, in, buf) + promCounter.Add(float64(n)) + if err != nil { + if err != io.EOF { + log.Printf("http: proxy error: %v", err) + } + return + } + } } // Simple two-way TCP proxy that copies data in both directions and // can shutdown each direction of the connection independently. -func (p *ReverseProxy) handleProxy(conn *net.TCPConn, upstream *net.TCPConn) { +func handleProxy(conn *net.TCPConn, upstream *net.TCPConn, streamName string) { var wg sync.WaitGroup wg.Add(2) + streamListeners.WithLabelValues(streamName).Inc() go func() { - buf := getBuf() - defer releaseBuf(buf) - if _, err := io.CopyBuffer(conn, upstream, buf); err != nil { - log.Printf("http: proxy error: client: %v", err) - } - upstream.CloseRead() - conn.CloseWrite() + copyStream(conn, upstream, streamSentBytes.WithLabelValues(streamName)) wg.Done() }() go func() { - buf := getBuf() - defer releaseBuf(buf) - if _, err := io.CopyBuffer(upstream, conn, buf); err != nil { - log.Printf("http: proxy error: upstream: %v", err) - } - conn.CloseRead() - upstream.CloseWrite() + copyStream(upstream, conn, streamRcvdBytes.WithLabelValues(streamName)) wg.Done() }() wg.Wait() + streamListeners.WithLabelValues(streamName).Dec() } // Implementation of a simple buffer cache, to minimize large diff --git a/fe/static/fullbg.js b/node/static/fullbg.js similarity index 100% rename from fe/static/fullbg.js rename to node/static/fullbg.js diff --git a/fe/static/radio52.png b/node/static/radio52.png similarity index 100% rename from fe/static/radio52.png rename to node/static/radio52.png diff --git a/fe/static/radiomast_bw.jpg b/node/static/radiomast_bw.jpg similarity index 100% rename from fe/static/radiomast_bw.jpg rename to node/static/radiomast_bw.jpg diff --git a/fe/static/style.css b/node/static/style.css similarity index 100% rename from fe/static/style.css rename to node/static/style.css diff --git a/fe/templates/index.html b/node/templates/index.html similarity index 100% rename from fe/templates/index.html rename to node/templates/index.html