Skip to content
Snippets Groups Projects
Commit f0601d62 authored by ale's avatar ale
Browse files

merge the global-relays branch

parents d4f0c467 c02dd672
Branches
Tags
No related merge requests found
...@@ -35,10 +35,20 @@ type Mount struct { ...@@ -35,10 +35,20 @@ type Mount struct {
// Password for source authentication. // Password for source authentication.
Password string Password string
// Is this field is set, the mount point will be relaying an
// external stream and no source connections will be accepted.
// Each node will pull from the external source directly,
// ignoring our master election protocol.
RelayUrl string
// Fallback stream name (optional). // Fallback stream name (optional).
Fallback string Fallback string
} }
func (m *Mount) IsRelay() bool {
return m.RelayUrl != ""
}
func mountPath(mountName string) string { func mountPath(mountName string) string {
return MountPrefix + mountName[1:] return MountPrefix + mountName[1:]
} }
...@@ -62,8 +72,8 @@ type NodeStatus struct { ...@@ -62,8 +72,8 @@ type NodeStatus struct {
// Is the Icecast server up? // Is the Icecast server up?
IcecastUp bool IcecastUp bool
// List of // List of
Mounts []IcecastMountStatus `xml:"mount"` Mounts []IcecastMountStatus `xml:"mount"`
} }
func (ns *NodeStatus) NumListeners() int { func (ns *NodeStatus) NumListeners() int {
...@@ -221,7 +231,7 @@ func (r *RadioAPI) GetNodeIPs() ([]string, error) { ...@@ -221,7 +231,7 @@ func (r *RadioAPI) GetNodeIPs() ([]string, error) {
return nil, err return nil, err
} }
ips := make([]string, 0, len(nodes)) ips := make([]string, 0, len(nodes))
for _, n := range(nodes) { for _, n := range nodes {
ips = append(ips, n.IP) ips = append(ips, n.IP)
} }
return ips, nil return ips, nil
......
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
"log" "log"
"os" "os"
"path/filepath" "path/filepath"
"net/url"
"strings" "strings"
"git.autistici.org/ale/autoradio" "git.autistici.org/ale/autoradio"
...@@ -102,8 +103,7 @@ func generateUsername(path string) string { ...@@ -102,8 +103,7 @@ func generateUsername(path string) string {
return fmt.Sprintf("source%d", crc32.ChecksumIEEE([]byte(path))) return fmt.Sprintf("source%d", crc32.ChecksumIEEE([]byte(path)))
} }
func createMount(args []string) { func doCreateMount(path, relayUrl string) {
path := args[0]
if strings.Contains(path, "/") { if strings.Contains(path, "/") {
log.Fatal("Mount points should not contain a slash ('/').") log.Fatal("Mount points should not contain a slash ('/').")
} }
...@@ -111,18 +111,24 @@ func createMount(args []string) { ...@@ -111,18 +111,24 @@ func createMount(args []string) {
// Check if the mount already exists. // Check if the mount already exists.
client := getClient() client := getClient()
oldm, _ := client.GetMount(path) if oldm, _ := client.GetMount(path); oldm != nil {
if oldm != nil {
log.Fatal("A mount with that name already exists!") log.Fatal("A mount with that name already exists!")
} }
// Create the new mount, randomly generate source authentication. // Create the new mount and set the relevant fields (depending
username := generateUsername(path) // on the relayUrl value).
password := autoradio.GeneratePassword() m := &autoradio.Mount{Name: path}
m := &autoradio.Mount{ if relayUrl == "" {
Name: path, // Randomly generate source credentials.
Username: username, m.Username = generateUsername(path)
Password: password, m.Password = autoradio.GeneratePassword()
} else {
// Validate the given relay URL.
u, err := url.Parse(relayUrl)
if err != nil {
log.Fatal(err)
}
m.RelayUrl = u.String()
} }
if err := client.SetMount(m); err != nil { if err := client.SetMount(m); err != nil {
...@@ -132,8 +138,17 @@ func createMount(args []string) { ...@@ -132,8 +138,17 @@ func createMount(args []string) {
fmt.Printf("%+v\n", m) fmt.Printf("%+v\n", m)
} }
func createMount(args []string) {
doCreateMount(args[0], "")
}
func createRelay(args []string) {
doCreateMount(args[0], args[1])
}
var commands = CommandList{ var commands = CommandList{
{"create-mount", "<path>", "Create a new mountpoint", createMount, 1, 1}, {"create-mount", "<path>", "Create a new mountpoint", createMount, 1, 1},
{"create-relay", "<path> <source_url>", "Create a new relay", createRelay, 2, 2},
{"delete-mount", "<path>", "Delete a mountpoint", deleteMount, 1, 1}, {"delete-mount", "<path>", "Delete a mountpoint", deleteMount, 1, 1},
{"list-mounts", "", "List all known mountpoints", listMounts, 0, 0}, {"list-mounts", "", "List all known mountpoints", listMounts, 0, 0},
{"show-mount", "<path>", "Show configuration of a mount", showMount, 1, 1}, {"show-mount", "<path>", "Show configuration of a mount", showMount, 1, 1},
......
...@@ -130,9 +130,13 @@ func (h *HttpRedirector) serveRelay(w http.ResponseWriter, r *http.Request) { ...@@ -130,9 +130,13 @@ func (h *HttpRedirector) serveRelay(w http.ResponseWriter, r *http.Request) {
} }
func (h *HttpRedirector) serveSource(w http.ResponseWriter, r *http.Request) { func (h *HttpRedirector) serveSource(w http.ResponseWriter, r *http.Request) {
_, err := h.getMount(r) m, err := h.getMount(r)
if err != nil { if err != nil || m.IsRelay() {
log.Printf("source: error retrieving mount for %+v: %s", r, err) if err != nil {
log.Printf("source: error retrieving mount for %+v: %s", r, err)
} else {
log.Printf("source: connection to relay stream %s", m.Name)
}
http.Error(w, "Not Found", http.StatusNotFound) http.Error(w, "Not Found", http.StatusNotFound)
h.stats.Add("source_404", 1) h.stats.Add("source_404", 1)
return return
......
...@@ -4,14 +4,18 @@ import ( ...@@ -4,14 +4,18 @@ import (
"bytes" "bytes"
"encoding/xml" "encoding/xml"
"io" "io"
"log"
"net"
"net/url"
"os" "os"
"strconv"
"git.autistici.org/ale/autoradio" "git.autistici.org/ale/autoradio"
) )
var ( var (
//shoutHttpPort = 8001 //shoutHttpPort = 8001
maxClients = 10000 maxClients = 10000
) )
type iceLimitsConfig struct { type iceLimitsConfig struct {
...@@ -134,7 +138,7 @@ func defaultDebianConfig(publicIp string) *icecastConfig { ...@@ -134,7 +138,7 @@ func defaultDebianConfig(publicIp string) *icecastConfig {
AdminUser: "admin", AdminUser: "admin",
AdminPassword: adminPw, AdminPassword: adminPw,
}, },
Hostname: publicIp, Hostname: publicIp,
Fileserve: 1, Fileserve: 1,
Paths: icePathsConfig{ Paths: icePathsConfig{
Basedir: "/usr/share/icecast2", Basedir: "/usr/share/icecast2",
...@@ -207,7 +211,40 @@ func mountToConfig(m *autoradio.Mount) iceMountConfig { ...@@ -207,7 +211,40 @@ func mountToConfig(m *autoradio.Mount) iceMountConfig {
return mconfig return mconfig
} }
func mountToRelay(masterAddr string, m *autoradio.Mount) iceRelayConfig { func relayToConfig(m *autoradio.Mount) (iceRelayConfig, bool) {
u, err := url.Parse(m.RelayUrl)
if err != nil {
// A failure here is almost invisible and not very
// useful, but at least we can prevent garbling the
// resulting icecast config.
log.Printf("can't parse relay url for %s: %s", m.Name, err)
return iceRelayConfig{}, false
}
server, port, err := net.SplitHostPort(u.Host)
if err != nil {
server = u.Host
port = "80"
}
iport, _ := strconv.Atoi(port)
rc := iceRelayConfig{
Mount: u.Path,
LocalMount: m.Name,
Server: server,
Port: iport,
OnDemand: 1,
RelayShoutcastMetadata: 1,
}
if u.User != nil {
rc.Username = u.User.Username()
if p, ok := u.User.Password(); ok {
rc.Password = p
}
}
return rc, true
}
func mountToRelayConfig(masterAddr string, m *autoradio.Mount) iceRelayConfig {
return iceRelayConfig{ return iceRelayConfig{
Mount: m.Name, Mount: m.Name,
LocalMount: m.Name, LocalMount: m.Name,
...@@ -226,17 +263,26 @@ func mountToRelay(masterAddr string, m *autoradio.Mount) iceRelayConfig { ...@@ -226,17 +263,26 @@ func mountToRelay(masterAddr string, m *autoradio.Mount) iceRelayConfig {
func (ic *icecastConfig) Update(config *ClusterConfig, isMaster bool, masterAddr string) { func (ic *icecastConfig) Update(config *ClusterConfig, isMaster bool, masterAddr string) {
ic.Mounts = nil ic.Mounts = nil
ic.Relays = nil ic.Relays = nil
if isMaster { mounts := make([]iceMountConfig, 0)
mounts := make([]iceMountConfig, 0) relays := make([]iceRelayConfig, 0)
for _, m := range config.ListMounts() {
for _, m := range config.ListMounts() {
switch {
case m.IsRelay():
if rc, ok := relayToConfig(m); ok {
relays = append(relays, rc)
}
case isMaster:
mounts = append(mounts, mountToConfig(m)) mounts = append(mounts, mountToConfig(m))
default:
relays = append(relays, mountToRelayConfig(masterAddr, m))
} }
}
if len(mounts) > 0 {
ic.Mounts = mounts ic.Mounts = mounts
} else { }
relays := make([]iceRelayConfig, 0) if len(relays) > 0 {
for _, m := range config.ListMounts() {
relays = append(relays, mountToRelay(masterAddr, m))
}
ic.Relays = relays ic.Relays = relays
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment