diff --git a/api.go b/api.go index 69a3910ce1de6b3c6b3f1000dddc3e8a695ac2c1..8a169251e023ac5f257d561c07825bafddb184e9 100644 --- a/api.go +++ b/api.go @@ -18,7 +18,8 @@ var ( MountPrefix = "/icecast/mounts/" NodePrefix = "/icecast/nodes/" - IcecastPort = 8000 + IcecastPort = 8000 + IcecastMountPrefix = "/_stream" ErrIsDirectory = errors.New("key is a directory") ErrIsFile = errors.New("key is a file") @@ -49,10 +50,21 @@ func (m *Mount) IsRelay() bool { return m.RelayUrl != "" } -func mountPath(mountName string) string { +// Return the path in etcd used to store mountpoint configuration. +func mountEtcdPath(mountName string) string { return MountPrefix + mountName[1:] } +// Return the Icecast mount path for the given public mount name. +func MountNameToIcecastPath(mountName string) string { + return IcecastMountPrefix + mountName +} + +// Return the public mount name from an Icecast mount path. +func IcecastPathToMountName(path string) string { + return strings.TrimPrefix(path, IcecastMountPrefix) +} + // Status of a mount on an individual Icecast server. type IcecastMountStatus struct { Name string @@ -137,7 +149,7 @@ func NewRadioAPI(client *etcd.Client) *RadioAPI { // GetMount returns data on a specific mountpoint (returns nil if not // found). func (r *RadioAPI) GetMount(mountName string) (*Mount, error) { - response, err := r.client.Get(mountPath(mountName), false, false) + response, err := r.client.Get(mountEtcdPath(mountName), false, false) if err != nil || response.Node == nil { return nil, err } @@ -159,13 +171,13 @@ func (r *RadioAPI) SetMount(m *Mount) error { return err } - _, err := r.client.Set(mountPath(m.Name), buf.String(), 0) + _, err := r.client.Set(mountEtcdPath(m.Name), buf.String(), 0) return err } // DelMount removes a mountpoint. func (r *RadioAPI) DelMount(mountName string) error { - _, err := r.client.Delete(mountPath(mountName), false) + _, err := r.client.Delete(mountEtcdPath(mountName), false) return err } diff --git a/fe/http.go b/fe/http.go index 9fbf4fccf822d76e07078f4632db3949b0b9a114..6ed5400fa7a981827f2ee4a99f0abf51571ec6bc 100644 --- a/fe/http.go +++ b/fe/http.go @@ -2,18 +2,21 @@ package fe import ( "bytes" + "flag" "fmt" "html/template" "io" "log" "net" "net/http" + "net/url" "path/filepath" "strconv" "strings" "sync" "time" + "net/http/httputil" _ "net/http/pprof" "git.autistici.org/ale/autoradio" @@ -22,6 +25,8 @@ import ( ) var ( + proxyStreams = flag.Bool("enable-icecast-proxy", false, "Proxy the local icecast") + httpStatusCodes = instrumentation.NewCounter("http.status") httpTargetStats = instrumentation.NewCounter("http.target") sourceConnections = instrumentation.NewCounter("http.source_connections") @@ -89,12 +94,18 @@ func (h *HttpRedirector) pickActiveNode() string { return result.IP } -func makeIcecastAddr(server string) string { +func icecastAddr(server string) string { return net.JoinHostPort(server, strconv.Itoa(autoradio.IcecastPort)) } -func makeIcecastUrl(server, mountName string) string { - return fmt.Sprintf("http://%s%s", makeIcecastAddr(server), mountName) +func streamUrl(server, mountName string) string { + var serverAddr string + if *proxyStreams { + serverAddr = server + } else { + serverAddr = icecastAddr(server) + } + return fmt.Sprintf("http://%s%s", serverAddr, autoradio.MountNameToIcecastPath(mountName)) } // Request wrapper that passes a Mount along with the HTTP request. @@ -110,6 +121,16 @@ func (h *HttpRedirector) withMount(f func(*autoradio.Mount, http.ResponseWriter, }) } +// Serve a M3U response. This simply points back at the stream +// redirect handler. +func (h *HttpRedirector) serveM3U(mount *autoradio.Mount, w http.ResponseWriter, r *http.Request) { + m3u := strings.TrimSuffix(r.URL.String(), ".m3u") + "\n" + w.Header().Set("Content-Length", strconv.Itoa(len(m3u))) + w.Header().Set("Content-Type", "audio/x-mpegurl") + addDefaultHeaders(w) + io.WriteString(w, m3u) +} + // Serve a response for a client connection to a relay. func (h *HttpRedirector) serveRelay(mount *autoradio.Mount, w http.ResponseWriter, r *http.Request) { // Find an active node. @@ -120,16 +141,11 @@ func (h *HttpRedirector) serveRelay(mount *autoradio.Mount, w http.ResponseWrite } httpTargetStats.IncrVar(ipToMetric(relayAddr)) - targetURL := makeIcecastUrl(relayAddr, mount.Name) - // See if we need to serve a M3U response or a redirect. if strings.HasSuffix(r.URL.Path, ".m3u") { - m3u := targetURL + "\n" - w.Header().Set("Content-Length", strconv.Itoa(len(m3u))) - w.Header().Set("Content-Type", "audio/x-mpegurl") - addDefaultHeaders(w) - io.WriteString(w, m3u) + h.serveM3U(mount, w, r) } else { + targetURL := streamUrl(relayAddr, mount.Name) http.Redirect(w, r, targetURL, 302) } } @@ -161,16 +177,17 @@ func (h *HttpRedirector) serveSource(mount *autoradio.Mount, w http.ResponseWrit return } - // Create the upstream connection, and write the original - // request to it as-is (the URL path on the backend is the - // same, and the headers do not need to change). - upstream, err := net.Dial("tcp", makeIcecastAddr(masterAddr)) + // Create the upstream connection, and write the HTTP request + // to it (with the right URL path). + upstream, err := net.Dial("tcp", icecastAddr(masterAddr)) if err != nil { log.Printf("source: dial upstream: %v", err) sendErr(err) return } defer upstream.Close() + + r.URL.Path = autoradio.MountNameToIcecastPath(mount.Name) if err := r.Write(upstream); err != nil { log.Printf("source: write upstream request: %v", err) sendErr(err) @@ -247,22 +264,28 @@ func (h *HttpRedirector) Run(addr, staticDir, templateDir string) { template.ParseGlob( filepath.Join(templateDir, "*.html"))) - // Create our HTTP handler stack. Passes the /debug/ queries - // along to the global ServeMux (where moodules such as pprof - // install their handlers). - relayHandler := h.withMount(h.serveRelay) + // Create our HTTP handler stack. mux := http.NewServeMux() mux.Handle( "/static/", http.StripPrefix( "/static/", http.FileServer(http.Dir(staticDir)))) + + // Optionally enable a reverse proxy to the local Icecast. + if *proxyStreams { + iceurl, _ := url.Parse(fmt.Sprintf("http://localhost:%d", autoradio.IcecastPort)) + mux.Handle(autoradio.IcecastMountPrefix+"/", httputil.NewSingleHostReverseProxy(iceurl)) + } + + // Pass /debug/ to the default ServeMux. + mux.Handle("/debug/", http.DefaultServeMux) + + relayHandler := h.withMount(h.serveRelay) mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { switch { case r.URL.Path == "" || r.URL.Path == "/": h.serveStatusPage(w, r) - case strings.HasPrefix(r.URL.Path, "/debug/"): - http.DefaultServeMux.ServeHTTP(w, r) default: relayHandler.ServeHTTP(w, r) } diff --git a/node/icecast.go b/node/icecast.go index fe9770f8bdcf39adc47606a5a9bc22cdc9f96c3e..403a9219f3f8ceb4395bbc224e30451b44afeaf7 100644 --- a/node/icecast.go +++ b/node/icecast.go @@ -153,7 +153,7 @@ func (ic *IcecastController) parseStatusPage(input io.Reader) (*IcecastStatus, e } for _, um := range ustatus.Mounts { m := autoradio.IcecastMountStatus{ - Name: um.Name, + Name: autoradio.IcecastPathToMountName(um.Name), Listeners: toi(um.Listeners), BitRate: toi(um.BitRate), Quality: tof(um.Quality), diff --git a/node/icecast_config.go b/node/icecast_config.go index de2edc0051523cfafc3635c1184eb83be44ad07c..dad85a3a82844a27b2bc2c01c021675d624f3cbf 100644 --- a/node/icecast_config.go +++ b/node/icecast_config.go @@ -19,15 +19,15 @@ var ( ) type iceLimitsConfig struct { - Clients int `xml:"clients"` - Sources int `xml:"sources"` + Clients int `xml:"clients"` + Sources int `xml:"sources"` // Threadpool int `xml:"threadpool"` - QueueSize int `xml:"queue-size"` - ClientTimeout int `xml:"client-timeout"` - HeaderTimeout int `xml:"header-timeout"` - SourceTimeout int `xml:"source-timeout"` + QueueSize int `xml:"queue-size"` + ClientTimeout int `xml:"client-timeout"` + HeaderTimeout int `xml:"header-timeout"` + SourceTimeout int `xml:"source-timeout"` // BurstOnConnect int `xml:"burst-on-connect"` - BurstSize int `xml:"burst-size"` + BurstSize int `xml:"burst-size"` } type iceAuthenticationConfig struct { @@ -80,8 +80,8 @@ type iceMountConfig struct { FallbackOverride int `xml:"fallback-override,omitempty"` Hidden int `xml:"hidden"` // NoYp int `xml:"no-yp"` - OnConnect string `xml:"on-connect,omitempty"` - OnDisconnect string `xml:"on-disconnect,omitempty"` + OnConnect string `xml:"on-connect,omitempty"` + OnDisconnect string `xml:"on-disconnect,omitempty"` } // Configuration of the local Icecast daemon (meant for serialization @@ -123,15 +123,15 @@ func defaultDebianConfig(publicIp string) *icecastConfig { return &icecastConfig{ XMLName: xml.Name{"", "icecast"}, Limits: iceLimitsConfig{ - Clients: maxClients, - Sources: maxClients / 2, + Clients: maxClients, + Sources: maxClients / 2, // Threadpool: 16, - QueueSize: 1 << 20, - ClientTimeout: 30, - HeaderTimeout: 15, - SourceTimeout: 60, + QueueSize: 1 << 20, + ClientTimeout: 30, + HeaderTimeout: 15, + SourceTimeout: 60, // BurstOnConnect: 1, - BurstSize: 65535, + BurstSize: 65535, }, Auth: iceAuthenticationConfig{ SourcePassword: sourcePw, @@ -197,7 +197,7 @@ func (c *icecastConfig) EncodeToFile(path string) error { func mountToConfig(m *autoradio.Mount) iceMountConfig { mconfig := iceMountConfig{ - Name: m.Name, + Name: autoradio.MountNameToIcecastPath(m.Name), Username: m.Username, Password: m.Password, Hidden: 0, @@ -229,7 +229,7 @@ func relayToConfig(m *autoradio.Mount) (iceRelayConfig, bool) { rc := iceRelayConfig{ Mount: u.Path, - LocalMount: m.Name, + LocalMount: autoradio.MountNameToIcecastPath(m.Name), Server: server, Port: iport, OnDemand: 1, @@ -246,8 +246,8 @@ func relayToConfig(m *autoradio.Mount) (iceRelayConfig, bool) { func mountToRelayConfig(masterAddr string, m *autoradio.Mount) iceRelayConfig { return iceRelayConfig{ - Mount: m.Name, - LocalMount: m.Name, + Mount: autoradio.MountNameToIcecastPath(m.Name), + LocalMount: autoradio.MountNameToIcecastPath(m.Name), Server: masterAddr, Port: autoradio.IcecastPort, Username: m.Username,