Skip to content
Snippets Groups Projects
Commit ad23ce5a authored by ale's avatar ale
Browse files

Refactor the proxy code and add instrumentation

We only use the proxy for Icecast streaming so there's no need to be
generic.
parent 098c74b3
No related branches found
No related tags found
1 merge request!1v2.0
......@@ -26,22 +26,18 @@ const (
// partitioned in the meantime).
ABIVersion = "4"
// Prefixes for the etcd-based leader elections.
ElectionPrefix = EtcdPrefix + "election/" + ABIVersion + "/"
IcecastElectionPrefix = ElectionPrefix + "icecast/"
TranscoderElectionPrefix = ElectionPrefix + "transcode/"
//MasterElectionPath = "/icecast/" + ABIVersion + "/cluster/master"
//TranscoderMasterElectionBase = "/icecast/" + ABIVersion + "/transcode"
//NodePrefix = "/icecast/" + ABIVersion + "/nodes/"
// Prefix for the streams served directly by Icecast.
IcecastMountPrefix = "/_stream"
// Prefixes for etcd-based service discovery.
EndpointPrefix = EtcdPrefix + "endpoints/" + ABIVersion + "/"
PublicEndpointPrefix = EndpointPrefix + "frontend/public/"
GossipEndpointPrefix = EndpointPrefix + "frontend/gossip/"
StatusEndpointPrefix = EndpointPrefix + "frontend/status/"
IcecastPublicEndpointPrefix = EndpointPrefix + "node/icecast-public/"
IcecastPeerEndpointPrefix = EndpointPrefix + "node/icecast-peer/"
EndpointPrefix = EtcdPrefix + "endpoints/" + ABIVersion + "/"
PublicEndpointPrefix = EndpointPrefix + "frontend/public/"
StatusEndpointPrefix = EndpointPrefix + "frontend/status/"
)
// IcecastPort is the port that the Icecast server will listen
......
......@@ -13,10 +13,6 @@ import (
var spacesRx = regexp.MustCompile(`\s+`)
type netDevCounts struct {
bytesSent int
}
func getBytesSentForDevice(dev string) (uint64, error) {
file, err := os.Open("/proc/net/dev")
if err != nil {
......
......@@ -6,31 +6,31 @@ import (
pb "git.autistici.org/ale/autoradio/proto"
)
// MountStatus reports the configuration and status of a mount,
// mountStatus reports the configuration and status of a mount,
// including eventual transcoded mounts that source it.
type MountStatus struct {
type mountStatus struct {
Mount *pb.Mount
Listeners int
TransMounts []*MountStatus
TransMounts []*mountStatus
}
func newMountStatus(m *pb.Mount, nodes []*pb.Status) *MountStatus {
func newMountStatus(m *pb.Mount, nodes []*nodeInfo) *mountStatus {
var listeners int
for _, n := range nodes {
for _, ims := range n.IcecastMounts {
for _, ims := range n.status.IcecastMounts {
if ims.Path == m.Path {
listeners += int(ims.Listeners)
break
}
}
}
return &MountStatus{
return &mountStatus{
Mount: m,
Listeners: listeners,
}
}
type mountStatusList []*MountStatus
type mountStatusList []*mountStatus
func (l mountStatusList) Len() int { return len(l) }
func (l mountStatusList) Swap(i, j int) { l[i], l[j] = l[j], l[i] }
......@@ -38,13 +38,13 @@ func (l mountStatusList) Less(i, j int) bool {
return l[i].Mount.Path < l[j].Mount.Path
}
// MountsToStatus converts a list of mounts (and eventually the
// mountsToStatus converts a list of mounts (and eventually the
// current list of nodes) to a nicely sorted and tree-aggregated list
// of MountStatus objects. The list of nodes can be nil, in which case
// of mountStatus objects. The list of nodes can be nil, in which case
// listener statistics will be omitted.
func MountsToStatus(mounts []*pb.Mount, nodes []*pb.Status) []*MountStatus {
func mountsToStatus(mounts []*pb.Mount, nodes []*nodeInfo) []*mountStatus {
// Aggregate stats, and create a tree of transcoding mounts.
ms := make(map[string]*MountStatus)
ms := make(map[string]*mountStatus)
for _, m := range mounts {
if m.HasTranscoder() {
continue
......@@ -61,7 +61,7 @@ func MountsToStatus(mounts []*pb.Mount, nodes []*pb.Status) []*MountStatus {
}
src.TransMounts = append(src.TransMounts, newMountStatus(m, nodes))
}
msl := make([]*MountStatus, 0, len(ms))
msl := make([]*mountStatus, 0, len(ms))
for _, m := range ms {
msl = append(msl, m)
}
......
......@@ -111,17 +111,17 @@ func (d *dnsServer) ServeDNS(w dns.ResponseWriter, req *dns.Msg) {
m.SetReply(req)
m.MsgHdr.Authoritative = true
w.WriteMsg(m)
w.WriteMsg(m) //nolint
return
servFail:
m.SetRcode(req, dns.RcodeServerFailure)
w.WriteMsg(m)
w.WriteMsg(m) //nolint
return
nxDomain:
m.SetRcode(req, dns.RcodeNameError)
w.WriteMsg(m)
w.WriteMsg(m) //nolint
}
func (d *dnsServer) getNodeIPs(q dns.Question) []net.IP {
......
package node
//go:generate go-bindata --nocompress --pkg node static/... templates/...
import (
"bytes"
"fmt"
"io"
"log"
......@@ -11,43 +14,68 @@ import (
"git.autistici.org/ale/autoradio"
pb "git.autistici.org/ale/autoradio/proto"
"github.com/alecthomas/template"
assetfs "github.com/elazarl/go-bindata-assetfs"
)
func NewHTTP(n *Node, lb *loadBalancer, icecastPort int) http.Handler {
func NewHTTP(n *Node, lb *loadBalancer, icecastPort int, domain string) http.Handler {
mux := http.NewServeMux()
proxy := NewIcecastProxy(&url.URL{
Scheme: "http",
Host: fmt.Sprintf("http://localhost:%d", icecastPort),
Path: "/",
})
mux.HandleFunc(autoradio.IcecastMountPrefix+"/", func(w http.ResponseWriter, r *http.Request) {
// instrumentation.addListener
// defer instrumentation.deleteListener
proxy.ServeHTTP(w, r)
tpl := mustParseEmbeddedTemplates()
// Serve /static/ from builtin assets.
mux.Handle("/static/", http.StripPrefix("/static/", http.FileServer(&assetfs.AssetFS{
Asset: Asset,
AssetDir: AssetDir,
AssetInfo: AssetInfo,
Prefix: "static",
})))
// Requests for /_stream/ go straight to the local Icecast.
proxyHandler := http.StripPrefix(autoradio.IcecastMountPrefix,
withMount(n, func(m *pb.Mount, w http.ResponseWriter, r *http.Request) {
doIcecastProxy(w, r, &url.URL{
Scheme: "http",
Host: fmt.Sprintf("localhost:%d", icecastPort),
Path: autoradio.MountPathToIcecastPath(m.Path),
}, m.Path)
}))
// redirectHandler serves different kinds of redirects, either
// an M3U or a HTTP redirect, for the public stream URLs (ones
// without the /_stream prefix).
redirectHandler := withMount(n, func(m *pb.Mount, w http.ResponseWriter, r *http.Request) {
serveRedirect(n.lb, m, w, r)
})
relayHandler := withMount(n, func(m *pb.Mount, w http.ResponseWriter, r *http.Request) {
serveRelay(lb, m, w, r)
})
// sourceHandler deals with SOURCE requests to the public
// stream URL, which are forwarded straight to the master
// Icecast node.
sourceHandler := withMount(n, func(m *pb.Mount, w http.ResponseWriter, r *http.Request) {
serveSource(n, m, w, r)
})
streamPrefixSlash := autoradio.IcecastMountPrefix + "/"
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
switch {
case r.Method == "GET" && strings.HasPrefix(r.URL.Path, streamPrefixSlash):
proxyHandler.ServeHTTP(w, r)
case r.Method == "SOURCE" || r.Method == "PUT":
sourceHandler.ServeHTTP(w, r)
case r.URL.Path == "" || r.URL.Path == "/":
serveStatusPage(n, w, r)
case r.Method == "GET" && (r.URL.Path == "" || r.URL.Path == "/"):
serveStatusPage(n, w, r, tpl, domain)
case r.Method == "GET":
redirectHandler.ServeHTTP(w, r)
default:
relayHandler.ServeHTTP(w, r)
http.NotFound(w, r)
}
})
return mux
}
// Request wrapper that passes a Mount along with the HTTP request.
// Request wrapper that passes a Mount along with the HTTP
// request. The mount path is just the URL path without an eventual
// .m3u extension.
func withMount(n *Node, f func(*pb.Mount, http.ResponseWriter, *http.Request)) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
mountPath := strings.TrimSuffix(r.URL.Path, ".m3u")
......@@ -74,15 +102,14 @@ func serveSource(n *Node, mount *pb.Mount, w http.ResponseWriter, r *http.Reques
return
}
proxy := NewIcecastProxy(&url.URL{
doIcecastProxy(w, r, &url.URL{
Scheme: "http",
Host: targetAddr,
Path: autoradio.MountPathToIcecastPath(mount.Path),
})
proxy.ServeHTTP(w, r)
}, mount.Path)
}
func serveRelay(lb *loadBalancer, mount *pb.Mount, w http.ResponseWriter, r *http.Request) {
func serveRedirect(lb *loadBalancer, mount *pb.Mount, w http.ResponseWriter, r *http.Request) {
// Redirect the user to a final target, depending on the load
// balancing algorithm's decision. This enforces the
// 1:1 mapping between Icecasts and frontends.
......@@ -101,24 +128,24 @@ func serveRelay(lb *loadBalancer, mount *pb.Mount, w http.ResponseWriter, r *htt
}
if strings.HasSuffix(r.URL.Path, ".m3u") {
serveM3U(w, r)
sendM3U(w, r)
} else {
targetURL := fmt.Sprintf("http://%s%s", targetAddr, autoradio.MountPathToIcecastPath(mount.Path))
serveRedirect(w, r, targetURL)
sendRedirect(w, r, targetURL)
}
}
// Serve a M3U response. This simply points back at the stream
// redirect handler by dropping the .m3u suffix in the request URL.
func serveM3U(w http.ResponseWriter, r *http.Request) {
func sendM3U(w http.ResponseWriter, r *http.Request) {
m3u := strings.TrimSuffix(r.URL.String(), ".m3u") + "\n"
w.Header().Set("Content-Length", strconv.Itoa(len(m3u)))
w.Header().Set("Content-Type", "audio/x-mpegurl")
addDefaultHeaders(w)
io.WriteString(w, m3u)
io.WriteString(w, m3u) //nolint
}
func serveRedirect(w http.ResponseWriter, r *http.Request, targetURL string) {
func sendRedirect(w http.ResponseWriter, r *http.Request, targetURL string) {
// Firefox apparently caches redirects regardless of
// the status code, so we have to add some quite
// aggressive cache-busting headers. We serve a status
......@@ -133,10 +160,47 @@ func serveRedirect(w http.ResponseWriter, r *http.Request, targetURL string) {
http.Redirect(w, r, targetURL, code)
}
func serveStatusPage(n *Node, w http.ResponseWriter, r *http.Request) {
func serveStatusPage(n *Node, w http.ResponseWriter, r *http.Request, tpl *template.Template, domain string) {
nodes := n.lb.getNodes()
ms := mountsToStatus(n.mounts.GetMounts(), nodes)
ctx := struct {
Domain string
Nodes []*nodeInfo
Mounts []*mountStatus
}{domain, nodes, ms}
var buf bytes.Buffer
if err := tpl.ExecuteTemplate(&buf, "index.html", ctx); err != nil {
log.Printf("error rendering status page: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.Header().Set("Content-Length", strconv.Itoa(buf.Len()))
addDefaultHeaders(w)
w.Write(buf.Bytes()) //nolint
}
func addDefaultHeaders(w http.ResponseWriter) {
w.Header().Set("Expires", "-1")
w.Header().Set("Cache-Control", "no-cache,no-store")
}
// Parse the templates that are embedded with the binary (in bindata.go).
func mustParseEmbeddedTemplates() *template.Template {
root := template.New("")
files, err := AssetDir("templates")
if err != nil {
panic(err)
}
for _, f := range files {
b, err := Asset("templates/" + f)
if err != nil {
log.Fatalf("could not read embedded template %s: %v", f, err)
}
if _, err := root.New(f).Parse(string(b)); err != nil {
log.Fatalf("error parsing template %s: %v", f, err)
}
}
return root
}
package node
import "github.com/prometheus/client_golang/prometheus"
var (
streamSentBytes = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "bytes_sent",
Help: "Bytes proxied to the client, by stream.",
},
[]string{"stream"},
)
streamRcvdBytes = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "bytes_received",
Help: "Bytes received from the client, by stream.",
},
[]string{"stream"},
)
streamListeners = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "listeners",
Help: "Number of current listeners.",
},
[]string{"stream"},
)
)
func init() {
prometheus.MustRegister(
streamSentBytes,
streamRcvdBytes,
streamListeners,
)
}
......@@ -18,50 +18,9 @@ import (
"strings"
"sync"
"time"
)
// ReverseProxy is an HTTP Handler that takes an incoming request and
// sends it to another server, proxying the response back to the
// client.
type ReverseProxy struct {
// Director must be a function which modifies the request into
// a new request to be sent to the upstream. Its response is
// then copied back to the original client unmodified.
Director func(*http.Request)
}
func singleJoiningSlash(a, b string) string {
aslash := strings.HasSuffix(a, "/")
bslash := strings.HasPrefix(b, "/")
switch {
case aslash && bslash:
return a + b[1:]
case !aslash && !bslash:
return a + "/" + b
}
return a + b
}
// NewIcecastProxy returns a new ReverseProxy that rewrites URLs to
// the scheme, host, and base path provided in target. If the target's
// path is "/base" and the incoming request was for "/dir", the target
// request will be for /base/dir.
func NewIcecastProxy(target *url.URL) *ReverseProxy {
targetQuery := target.RawQuery
director := func(req *http.Request) {
req.URL.Scheme = target.Scheme
req.URL.Host = target.Host
req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path)
if targetQuery == "" || req.URL.RawQuery == "" {
req.URL.RawQuery = targetQuery + req.URL.RawQuery
} else {
req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery
}
}
return &ReverseProxy{
Director: director,
}
}
"github.com/prometheus/client_golang/prometheus"
)
func copyHeader(dst, src http.Header) {
for k, vv := range src {
......@@ -88,12 +47,22 @@ type wrappedWriter interface {
WrappedWriter() http.ResponseWriter
}
func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
// Proxy a request to the desired backend. Due to the way the Icecast
// protocol works, this just dumps the initial (rewritten) HTTP/1.0
// request, and then switches to a full bi-directional TCP proxy. The
// outbound request is built from the target host, path, and eventual
// query string parameters and headers passed on from the original
// request. The additional streamName parameter is used for
// instrumentation.
func doIcecastProxy(rw http.ResponseWriter, req *http.Request, target *url.URL, streamName string) {
outreq := new(http.Request)
*outreq = *req // includes shallow copies of maps, but okay
// Make a HTTP/1.0 connection to the backend.
p.Director(outreq)
outreq.URL.Scheme = target.Scheme
outreq.URL.Host = target.Host
//outreq.URL.Path = singleJoiningSlash(target.Path, req.URL.Path)
outreq.URL.Path = target.Path
outreq.Proto = "HTTP/1.0"
outreq.ProtoMajor = 1
outreq.ProtoMinor = 0
......@@ -171,37 +140,45 @@ func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
}
// Run two-way proxying.
p.handleProxy(conn.(*net.TCPConn), upstream.(*net.TCPConn))
handleProxy(conn.(*net.TCPConn), upstream.(*net.TCPConn), streamName)
}
func copyStream(out, in *net.TCPConn, promCounter prometheus.Counter) {
buf := getBuf()
defer releaseBuf(buf)
defer in.CloseRead() //nolint
defer out.CloseWrite() //nolint
for {
n, err := io.CopyBuffer(out, in, buf)
promCounter.Add(float64(n))
if err != nil {
if err != io.EOF {
log.Printf("http: proxy error: %v", err)
}
return
}
}
}
// Simple two-way TCP proxy that copies data in both directions and
// can shutdown each direction of the connection independently.
func (p *ReverseProxy) handleProxy(conn *net.TCPConn, upstream *net.TCPConn) {
func handleProxy(conn *net.TCPConn, upstream *net.TCPConn, streamName string) {
var wg sync.WaitGroup
wg.Add(2)
streamListeners.WithLabelValues(streamName).Inc()
go func() {
buf := getBuf()
defer releaseBuf(buf)
if _, err := io.CopyBuffer(conn, upstream, buf); err != nil {
log.Printf("http: proxy error: client: %v", err)
}
upstream.CloseRead()
conn.CloseWrite()
copyStream(conn, upstream, streamSentBytes.WithLabelValues(streamName))
wg.Done()
}()
go func() {
buf := getBuf()
defer releaseBuf(buf)
if _, err := io.CopyBuffer(upstream, conn, buf); err != nil {
log.Printf("http: proxy error: upstream: %v", err)
}
conn.CloseRead()
upstream.CloseWrite()
copyStream(upstream, conn, streamRcvdBytes.WithLabelValues(streamName))
wg.Done()
}()
wg.Wait()
streamListeners.WithLabelValues(streamName).Dec()
}
// Implementation of a simple buffer cache, to minimize large
......
File moved
File moved
File moved
File moved
File moved
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment