From c94d91e48db24dc864d1a26cd5294953bc982b67 Mon Sep 17 00:00:00 2001 From: ale <ale@incal.net> Date: Tue, 5 Nov 2013 10:03:19 +0000 Subject: [PATCH] started writing the http redirector code --- api.go | 37 ++++++++++++++++- http.go | 120 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ node.go | 21 ---------- 3 files changed, 156 insertions(+), 22 deletions(-) create mode 100644 http.go diff --git a/api.go b/api.go index 1a43d779..086ba6a8 100644 --- a/api.go +++ b/api.go @@ -3,10 +3,33 @@ package radioai import ( "bytes" "encoding/json" - "github.com/coreos/go-etcd/etcd" + "errors" "strings" + + "github.com/coreos/go-etcd/etcd" +) + +var ( + masterElectionPath = "/icecast/cluster/master" + mountPrefix = "/icecast/mounts/" + nodePrefix = "/icecast/nodes/" ) +// A mountpoint for a stream. +type Mount struct { + // Name (path to the mountpoint). + Name string + + // Username for source authentication. + Username string + + // Password for source authentication. + Password string + + // Fallback stream name (optional). + Fallback string +} + func mountPath(mountName string) string { return mountPrefix + mountName[1:] } @@ -84,3 +107,15 @@ func (r *RadioAPI) GetNodes() ([]string, error) { } return result, nil } + +// GetMasterAddr returns the address of the current master server. +func (r *RadioAPI) GetMasterAddr() (string, error) { + response, err := r.client.Get(masterElectionPath) + if err != nil { + return "", err + } + if len(response) < 1 { + return "", errors.New("no active master") + } + return response[0].Value, nil +} diff --git a/http.go b/http.go new file mode 100644 index 00000000..036a91b3 --- /dev/null +++ b/http.go @@ -0,0 +1,120 @@ +// HTTP redirector. +// +// All user-facing traffic reaches the redirector first (this is +// where the first-level, high-ttl redirection points to). +// +// The purpose of the HTTP redirector is two-fold: sources will be +// proxied to the master icecast server, while clients will be served +// a .m3u file directly pointing at the relays. +// + +package radioai + +import ( + "fmt" + "io" + "math/rand" + "net/http" + "net/http/httputil" + "strconv" + "strings" + "sync" + "time" +) + +// Cache the list of active nodes +type activeNodesCache struct { + client *RadioAPI + nodes []string + deadline time.Time + lock sync.Mutex +} + +var activeNodesTtl = 500 * time.Millisecond + +func (anc *activeNodesCache) GetNodes() []string { + anc.lock.Lock() + defer anc.lock.Unlock() + + now := time.Now() + if now.After(anc.deadline) { + if nodes, err := anc.client.GetNodes(); err == nil { + anc.nodes = nodes + anc.deadline = now.Add(activeNodesTtl) + } + } + return anc.nodes +} + +type HttpRedirector struct { + client *RadioAPI + nodeCache *activeNodesCache +} + +// Return an active node, chosen randomly. +func (h *HttpRedirector) pickActiveNode() string { + nodes := h.nodeCache.GetNodes() + if nodes != nil && len(nodes) > 0 { + return nodes[rand.Intn(len(nodes))] + } + return "" +} + +// Parse the request and extract the mount path. +func (h *HttpRedirector) getMount(r *http.Request) (*Mount, error) { + path := r.URL.Path + if strings.HasSuffix(path, ".m3u") { + path = path[:len(path)-4] + } + return h.client.GetMount(path) +} + +// Serve a response for a client connection to a relay. +func (h *HttpRedirector) serveRelay(w http.ResponseWriter, r *http.Request) { + mount, err := h.getMount(r) + if err != nil { + http.Error(w, "Not Found", http.StatusNotFound) + return + } + + // Find an active node. + relayAddr := h.pickActiveNode() + if relayAddr == "" { + http.Error(w, "No active nodes", http.StatusServiceUnavailable) + return + } + + // Create the m3u response. + m3u := fmt.Sprintf("http://%s%s\n", relayAddr, mount.Name) + w.Header().Set("Content-Length", strconv.Itoa(len(m3u))) + w.Header().Set("Content-Type", "audio/x-mpegurl") + io.WriteString(w, m3u) +} + +func (h *HttpRedirector) serveSource(w http.ResponseWriter, r *http.Request) { + mount, err := h.getMount(r) + if err != nil { + http.Error(w, "Not Found", http.StatusNotFound) + return + } + + // Find the current master node. + masterAddr, err := h.client.GetMasterAddr() + if err != nil { + http.Error(w, err.Error(), http.StatusServiceUnavailable) + return + } + + // Proxy the resulting connection. + proxy := &httputil.ReverseProxy{ + Director: func(r *http.Request) { + r.URL.Host = masterAddr + }, + FlushInterval: time.Second, + } + proxy.ServeHTTP(w, r) +} + +func (h *HttpRedirector) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // TODO: pick between serveRelay() and serveSource(). +} diff --git a/node.go b/node.go index d6120e99..d8d391e1 100644 --- a/node.go +++ b/node.go @@ -11,12 +11,6 @@ import ( "github.com/coreos/go-etcd/etcd" ) -var ( - masterElectionPath = "/icecast/cluster/master" - mountPrefix = "/icecast/mounts/" - nodePrefix = "/icecast/nodes/" -) - func trigger(c chan bool) { select { case c <- true: @@ -24,21 +18,6 @@ func trigger(c chan bool) { } } -// A mountpoint for a stream. -type Mount struct { - // Name (path to the mountpoint). - Name string - - // Username for source authentication. - Username string - - // Password for source authentication. - Password string - - // Fallback stream name (optional). - Fallback string -} - // In-memory representation of the overall configuration (basically // just a list of the known mounts). type ClusterConfig struct { -- GitLab