Commit c94d91e4 authored by ale's avatar ale

started writing the http redirector code

parent 5f2cbb6e
......@@ -3,10 +3,33 @@ package radioai
import (
"bytes"
"encoding/json"
"github.com/coreos/go-etcd/etcd"
"errors"
"strings"
"github.com/coreos/go-etcd/etcd"
)
var (
masterElectionPath = "/icecast/cluster/master"
mountPrefix = "/icecast/mounts/"
nodePrefix = "/icecast/nodes/"
)
// A mountpoint for a stream.
type Mount struct {
// Name (path to the mountpoint).
Name string
// Username for source authentication.
Username string
// Password for source authentication.
Password string
// Fallback stream name (optional).
Fallback string
}
func mountPath(mountName string) string {
return mountPrefix + mountName[1:]
}
......@@ -84,3 +107,15 @@ func (r *RadioAPI) GetNodes() ([]string, error) {
}
return result, nil
}
// GetMasterAddr returns the address of the current master server.
func (r *RadioAPI) GetMasterAddr() (string, error) {
response, err := r.client.Get(masterElectionPath)
if err != nil {
return "", err
}
if len(response) < 1 {
return "", errors.New("no active master")
}
return response[0].Value, nil
}
// HTTP redirector.
//
// All user-facing traffic reaches the redirector first (this is
// where the first-level, high-ttl redirection points to).
//
// The purpose of the HTTP redirector is two-fold: sources will be
// proxied to the master icecast server, while clients will be served
// a .m3u file directly pointing at the relays.
//
package radioai
import (
"fmt"
"io"
"math/rand"
"net/http"
"net/http/httputil"
"strconv"
"strings"
"sync"
"time"
)
// Cache the list of active nodes
type activeNodesCache struct {
client *RadioAPI
nodes []string
deadline time.Time
lock sync.Mutex
}
var activeNodesTtl = 500 * time.Millisecond
func (anc *activeNodesCache) GetNodes() []string {
anc.lock.Lock()
defer anc.lock.Unlock()
now := time.Now()
if now.After(anc.deadline) {
if nodes, err := anc.client.GetNodes(); err == nil {
anc.nodes = nodes
anc.deadline = now.Add(activeNodesTtl)
}
}
return anc.nodes
}
type HttpRedirector struct {
client *RadioAPI
nodeCache *activeNodesCache
}
// Return an active node, chosen randomly.
func (h *HttpRedirector) pickActiveNode() string {
nodes := h.nodeCache.GetNodes()
if nodes != nil && len(nodes) > 0 {
return nodes[rand.Intn(len(nodes))]
}
return ""
}
// Parse the request and extract the mount path.
func (h *HttpRedirector) getMount(r *http.Request) (*Mount, error) {
path := r.URL.Path
if strings.HasSuffix(path, ".m3u") {
path = path[:len(path)-4]
}
return h.client.GetMount(path)
}
// Serve a response for a client connection to a relay.
func (h *HttpRedirector) serveRelay(w http.ResponseWriter, r *http.Request) {
mount, err := h.getMount(r)
if err != nil {
http.Error(w, "Not Found", http.StatusNotFound)
return
}
// Find an active node.
relayAddr := h.pickActiveNode()
if relayAddr == "" {
http.Error(w, "No active nodes", http.StatusServiceUnavailable)
return
}
// Create the m3u response.
m3u := fmt.Sprintf("http://%s%s\n", relayAddr, mount.Name)
w.Header().Set("Content-Length", strconv.Itoa(len(m3u)))
w.Header().Set("Content-Type", "audio/x-mpegurl")
io.WriteString(w, m3u)
}
func (h *HttpRedirector) serveSource(w http.ResponseWriter, r *http.Request) {
mount, err := h.getMount(r)
if err != nil {
http.Error(w, "Not Found", http.StatusNotFound)
return
}
// Find the current master node.
masterAddr, err := h.client.GetMasterAddr()
if err != nil {
http.Error(w, err.Error(), http.StatusServiceUnavailable)
return
}
// Proxy the resulting connection.
proxy := &httputil.ReverseProxy{
Director: func(r *http.Request) {
r.URL.Host = masterAddr
},
FlushInterval: time.Second,
}
proxy.ServeHTTP(w, r)
}
func (h *HttpRedirector) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// TODO: pick between serveRelay() and serveSource().
}
......@@ -11,12 +11,6 @@ import (
"github.com/coreos/go-etcd/etcd"
)
var (
masterElectionPath = "/icecast/cluster/master"
mountPrefix = "/icecast/mounts/"
nodePrefix = "/icecast/nodes/"
)
func trigger(c chan bool) {
select {
case c <- true:
......@@ -24,21 +18,6 @@ func trigger(c chan bool) {
}
}
// A mountpoint for a stream.
type Mount struct {
// Name (path to the mountpoint).
Name string
// Username for source authentication.
Username string
// Password for source authentication.
Password string
// Fallback stream name (optional).
Fallback string
}
// In-memory representation of the overall configuration (basically
// just a list of the known mounts).
type ClusterConfig struct {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment