Skip to content
Snippets Groups Projects
Commit ef5560dc authored by ale's avatar ale
Browse files

unify the request namespace; support stream proxying

This allows running the service on a single port: with the
--enable-icecast-proxy option, all requests can go through
redirectord.
parent e90c5740
Branches
No related tags found
No related merge requests found
...@@ -18,7 +18,8 @@ var ( ...@@ -18,7 +18,8 @@ var (
MountPrefix = "/icecast/mounts/" MountPrefix = "/icecast/mounts/"
NodePrefix = "/icecast/nodes/" NodePrefix = "/icecast/nodes/"
IcecastPort = 8000 IcecastPort = 8000
IcecastMountPrefix = "/_stream"
ErrIsDirectory = errors.New("key is a directory") ErrIsDirectory = errors.New("key is a directory")
ErrIsFile = errors.New("key is a file") ErrIsFile = errors.New("key is a file")
...@@ -49,10 +50,21 @@ func (m *Mount) IsRelay() bool { ...@@ -49,10 +50,21 @@ func (m *Mount) IsRelay() bool {
return m.RelayUrl != "" return m.RelayUrl != ""
} }
func mountPath(mountName string) string { // Return the path in etcd used to store mountpoint configuration.
func mountEtcdPath(mountName string) string {
return MountPrefix + mountName[1:] return MountPrefix + mountName[1:]
} }
// Return the Icecast mount path for the given public mount name.
func MountNameToIcecastPath(mountName string) string {
return IcecastMountPrefix + mountName
}
// Return the public mount name from an Icecast mount path.
func IcecastPathToMountName(path string) string {
return strings.TrimPrefix(path, IcecastMountPrefix)
}
// Status of a mount on an individual Icecast server. // Status of a mount on an individual Icecast server.
type IcecastMountStatus struct { type IcecastMountStatus struct {
Name string Name string
...@@ -137,7 +149,7 @@ func NewRadioAPI(client *etcd.Client) *RadioAPI { ...@@ -137,7 +149,7 @@ func NewRadioAPI(client *etcd.Client) *RadioAPI {
// GetMount returns data on a specific mountpoint (returns nil if not // GetMount returns data on a specific mountpoint (returns nil if not
// found). // found).
func (r *RadioAPI) GetMount(mountName string) (*Mount, error) { func (r *RadioAPI) GetMount(mountName string) (*Mount, error) {
response, err := r.client.Get(mountPath(mountName), false, false) response, err := r.client.Get(mountEtcdPath(mountName), false, false)
if err != nil || response.Node == nil { if err != nil || response.Node == nil {
return nil, err return nil, err
} }
...@@ -159,13 +171,13 @@ func (r *RadioAPI) SetMount(m *Mount) error { ...@@ -159,13 +171,13 @@ func (r *RadioAPI) SetMount(m *Mount) error {
return err return err
} }
_, err := r.client.Set(mountPath(m.Name), buf.String(), 0) _, err := r.client.Set(mountEtcdPath(m.Name), buf.String(), 0)
return err return err
} }
// DelMount removes a mountpoint. // DelMount removes a mountpoint.
func (r *RadioAPI) DelMount(mountName string) error { func (r *RadioAPI) DelMount(mountName string) error {
_, err := r.client.Delete(mountPath(mountName), false) _, err := r.client.Delete(mountEtcdPath(mountName), false)
return err return err
} }
......
...@@ -2,18 +2,21 @@ package fe ...@@ -2,18 +2,21 @@ package fe
import ( import (
"bytes" "bytes"
"flag"
"fmt" "fmt"
"html/template" "html/template"
"io" "io"
"log" "log"
"net" "net"
"net/http" "net/http"
"net/url"
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
"net/http/httputil"
_ "net/http/pprof" _ "net/http/pprof"
"git.autistici.org/ale/autoradio" "git.autistici.org/ale/autoradio"
...@@ -22,6 +25,8 @@ import ( ...@@ -22,6 +25,8 @@ import (
) )
var ( var (
proxyStreams = flag.Bool("enable-icecast-proxy", false, "Proxy the local icecast")
httpStatusCodes = instrumentation.NewCounter("http.status") httpStatusCodes = instrumentation.NewCounter("http.status")
httpTargetStats = instrumentation.NewCounter("http.target") httpTargetStats = instrumentation.NewCounter("http.target")
sourceConnections = instrumentation.NewCounter("http.source_connections") sourceConnections = instrumentation.NewCounter("http.source_connections")
...@@ -89,12 +94,18 @@ func (h *HttpRedirector) pickActiveNode() string { ...@@ -89,12 +94,18 @@ func (h *HttpRedirector) pickActiveNode() string {
return result.IP return result.IP
} }
func makeIcecastAddr(server string) string { func icecastAddr(server string) string {
return net.JoinHostPort(server, strconv.Itoa(autoradio.IcecastPort)) return net.JoinHostPort(server, strconv.Itoa(autoradio.IcecastPort))
} }
func makeIcecastUrl(server, mountName string) string { func streamUrl(server, mountName string) string {
return fmt.Sprintf("http://%s%s", makeIcecastAddr(server), mountName) var serverAddr string
if *proxyStreams {
serverAddr = server
} else {
serverAddr = icecastAddr(server)
}
return fmt.Sprintf("http://%s%s", serverAddr, autoradio.MountNameToIcecastPath(mountName))
} }
// Request wrapper that passes a Mount along with the HTTP request. // Request wrapper that passes a Mount along with the HTTP request.
...@@ -110,6 +121,16 @@ func (h *HttpRedirector) withMount(f func(*autoradio.Mount, http.ResponseWriter, ...@@ -110,6 +121,16 @@ func (h *HttpRedirector) withMount(f func(*autoradio.Mount, http.ResponseWriter,
}) })
} }
// Serve a M3U response. This simply points back at the stream
// redirect handler.
func (h *HttpRedirector) serveM3U(mount *autoradio.Mount, w http.ResponseWriter, r *http.Request) {
m3u := strings.TrimSuffix(r.URL.String(), ".m3u") + "\n"
w.Header().Set("Content-Length", strconv.Itoa(len(m3u)))
w.Header().Set("Content-Type", "audio/x-mpegurl")
addDefaultHeaders(w)
io.WriteString(w, m3u)
}
// Serve a response for a client connection to a relay. // Serve a response for a client connection to a relay.
func (h *HttpRedirector) serveRelay(mount *autoradio.Mount, w http.ResponseWriter, r *http.Request) { func (h *HttpRedirector) serveRelay(mount *autoradio.Mount, w http.ResponseWriter, r *http.Request) {
// Find an active node. // Find an active node.
...@@ -120,16 +141,11 @@ func (h *HttpRedirector) serveRelay(mount *autoradio.Mount, w http.ResponseWrite ...@@ -120,16 +141,11 @@ func (h *HttpRedirector) serveRelay(mount *autoradio.Mount, w http.ResponseWrite
} }
httpTargetStats.IncrVar(ipToMetric(relayAddr)) httpTargetStats.IncrVar(ipToMetric(relayAddr))
targetURL := makeIcecastUrl(relayAddr, mount.Name)
// See if we need to serve a M3U response or a redirect. // See if we need to serve a M3U response or a redirect.
if strings.HasSuffix(r.URL.Path, ".m3u") { if strings.HasSuffix(r.URL.Path, ".m3u") {
m3u := targetURL + "\n" h.serveM3U(mount, w, r)
w.Header().Set("Content-Length", strconv.Itoa(len(m3u)))
w.Header().Set("Content-Type", "audio/x-mpegurl")
addDefaultHeaders(w)
io.WriteString(w, m3u)
} else { } else {
targetURL := streamUrl(relayAddr, mount.Name)
http.Redirect(w, r, targetURL, 302) http.Redirect(w, r, targetURL, 302)
} }
} }
...@@ -161,16 +177,17 @@ func (h *HttpRedirector) serveSource(mount *autoradio.Mount, w http.ResponseWrit ...@@ -161,16 +177,17 @@ func (h *HttpRedirector) serveSource(mount *autoradio.Mount, w http.ResponseWrit
return return
} }
// Create the upstream connection, and write the original // Create the upstream connection, and write the HTTP request
// request to it as-is (the URL path on the backend is the // to it (with the right URL path).
// same, and the headers do not need to change). upstream, err := net.Dial("tcp", icecastAddr(masterAddr))
upstream, err := net.Dial("tcp", makeIcecastAddr(masterAddr))
if err != nil { if err != nil {
log.Printf("source: dial upstream: %v", err) log.Printf("source: dial upstream: %v", err)
sendErr(err) sendErr(err)
return return
} }
defer upstream.Close() defer upstream.Close()
r.URL.Path = autoradio.MountNameToIcecastPath(mount.Name)
if err := r.Write(upstream); err != nil { if err := r.Write(upstream); err != nil {
log.Printf("source: write upstream request: %v", err) log.Printf("source: write upstream request: %v", err)
sendErr(err) sendErr(err)
...@@ -247,22 +264,28 @@ func (h *HttpRedirector) Run(addr, staticDir, templateDir string) { ...@@ -247,22 +264,28 @@ func (h *HttpRedirector) Run(addr, staticDir, templateDir string) {
template.ParseGlob( template.ParseGlob(
filepath.Join(templateDir, "*.html"))) filepath.Join(templateDir, "*.html")))
// Create our HTTP handler stack. Passes the /debug/ queries // Create our HTTP handler stack.
// along to the global ServeMux (where moodules such as pprof
// install their handlers).
relayHandler := h.withMount(h.serveRelay)
mux := http.NewServeMux() mux := http.NewServeMux()
mux.Handle( mux.Handle(
"/static/", "/static/",
http.StripPrefix( http.StripPrefix(
"/static/", "/static/",
http.FileServer(http.Dir(staticDir)))) http.FileServer(http.Dir(staticDir))))
// Optionally enable a reverse proxy to the local Icecast.
if *proxyStreams {
iceurl, _ := url.Parse(fmt.Sprintf("http://localhost:%d", autoradio.IcecastPort))
mux.Handle(autoradio.IcecastMountPrefix+"/", httputil.NewSingleHostReverseProxy(iceurl))
}
// Pass /debug/ to the default ServeMux.
mux.Handle("/debug/", http.DefaultServeMux)
relayHandler := h.withMount(h.serveRelay)
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
switch { switch {
case r.URL.Path == "" || r.URL.Path == "/": case r.URL.Path == "" || r.URL.Path == "/":
h.serveStatusPage(w, r) h.serveStatusPage(w, r)
case strings.HasPrefix(r.URL.Path, "/debug/"):
http.DefaultServeMux.ServeHTTP(w, r)
default: default:
relayHandler.ServeHTTP(w, r) relayHandler.ServeHTTP(w, r)
} }
......
...@@ -153,7 +153,7 @@ func (ic *IcecastController) parseStatusPage(input io.Reader) (*IcecastStatus, e ...@@ -153,7 +153,7 @@ func (ic *IcecastController) parseStatusPage(input io.Reader) (*IcecastStatus, e
} }
for _, um := range ustatus.Mounts { for _, um := range ustatus.Mounts {
m := autoradio.IcecastMountStatus{ m := autoradio.IcecastMountStatus{
Name: um.Name, Name: autoradio.IcecastPathToMountName(um.Name),
Listeners: toi(um.Listeners), Listeners: toi(um.Listeners),
BitRate: toi(um.BitRate), BitRate: toi(um.BitRate),
Quality: tof(um.Quality), Quality: tof(um.Quality),
......
...@@ -19,15 +19,15 @@ var ( ...@@ -19,15 +19,15 @@ var (
) )
type iceLimitsConfig struct { type iceLimitsConfig struct {
Clients int `xml:"clients"` Clients int `xml:"clients"`
Sources int `xml:"sources"` Sources int `xml:"sources"`
// Threadpool int `xml:"threadpool"` // Threadpool int `xml:"threadpool"`
QueueSize int `xml:"queue-size"` QueueSize int `xml:"queue-size"`
ClientTimeout int `xml:"client-timeout"` ClientTimeout int `xml:"client-timeout"`
HeaderTimeout int `xml:"header-timeout"` HeaderTimeout int `xml:"header-timeout"`
SourceTimeout int `xml:"source-timeout"` SourceTimeout int `xml:"source-timeout"`
// BurstOnConnect int `xml:"burst-on-connect"` // BurstOnConnect int `xml:"burst-on-connect"`
BurstSize int `xml:"burst-size"` BurstSize int `xml:"burst-size"`
} }
type iceAuthenticationConfig struct { type iceAuthenticationConfig struct {
...@@ -80,8 +80,8 @@ type iceMountConfig struct { ...@@ -80,8 +80,8 @@ type iceMountConfig struct {
FallbackOverride int `xml:"fallback-override,omitempty"` FallbackOverride int `xml:"fallback-override,omitempty"`
Hidden int `xml:"hidden"` Hidden int `xml:"hidden"`
// NoYp int `xml:"no-yp"` // NoYp int `xml:"no-yp"`
OnConnect string `xml:"on-connect,omitempty"` OnConnect string `xml:"on-connect,omitempty"`
OnDisconnect string `xml:"on-disconnect,omitempty"` OnDisconnect string `xml:"on-disconnect,omitempty"`
} }
// Configuration of the local Icecast daemon (meant for serialization // Configuration of the local Icecast daemon (meant for serialization
...@@ -123,15 +123,15 @@ func defaultDebianConfig(publicIp string) *icecastConfig { ...@@ -123,15 +123,15 @@ func defaultDebianConfig(publicIp string) *icecastConfig {
return &icecastConfig{ return &icecastConfig{
XMLName: xml.Name{"", "icecast"}, XMLName: xml.Name{"", "icecast"},
Limits: iceLimitsConfig{ Limits: iceLimitsConfig{
Clients: maxClients, Clients: maxClients,
Sources: maxClients / 2, Sources: maxClients / 2,
// Threadpool: 16, // Threadpool: 16,
QueueSize: 1 << 20, QueueSize: 1 << 20,
ClientTimeout: 30, ClientTimeout: 30,
HeaderTimeout: 15, HeaderTimeout: 15,
SourceTimeout: 60, SourceTimeout: 60,
// BurstOnConnect: 1, // BurstOnConnect: 1,
BurstSize: 65535, BurstSize: 65535,
}, },
Auth: iceAuthenticationConfig{ Auth: iceAuthenticationConfig{
SourcePassword: sourcePw, SourcePassword: sourcePw,
...@@ -197,7 +197,7 @@ func (c *icecastConfig) EncodeToFile(path string) error { ...@@ -197,7 +197,7 @@ func (c *icecastConfig) EncodeToFile(path string) error {
func mountToConfig(m *autoradio.Mount) iceMountConfig { func mountToConfig(m *autoradio.Mount) iceMountConfig {
mconfig := iceMountConfig{ mconfig := iceMountConfig{
Name: m.Name, Name: autoradio.MountNameToIcecastPath(m.Name),
Username: m.Username, Username: m.Username,
Password: m.Password, Password: m.Password,
Hidden: 0, Hidden: 0,
...@@ -229,7 +229,7 @@ func relayToConfig(m *autoradio.Mount) (iceRelayConfig, bool) { ...@@ -229,7 +229,7 @@ func relayToConfig(m *autoradio.Mount) (iceRelayConfig, bool) {
rc := iceRelayConfig{ rc := iceRelayConfig{
Mount: u.Path, Mount: u.Path,
LocalMount: m.Name, LocalMount: autoradio.MountNameToIcecastPath(m.Name),
Server: server, Server: server,
Port: iport, Port: iport,
OnDemand: 1, OnDemand: 1,
...@@ -246,8 +246,8 @@ func relayToConfig(m *autoradio.Mount) (iceRelayConfig, bool) { ...@@ -246,8 +246,8 @@ func relayToConfig(m *autoradio.Mount) (iceRelayConfig, bool) {
func mountToRelayConfig(masterAddr string, m *autoradio.Mount) iceRelayConfig { func mountToRelayConfig(masterAddr string, m *autoradio.Mount) iceRelayConfig {
return iceRelayConfig{ return iceRelayConfig{
Mount: m.Name, Mount: autoradio.MountNameToIcecastPath(m.Name),
LocalMount: m.Name, LocalMount: autoradio.MountNameToIcecastPath(m.Name),
Server: masterAddr, Server: masterAddr,
Port: autoradio.IcecastPort, Port: autoradio.IcecastPort,
Username: m.Username, Username: m.Username,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment