Skip to content
Snippets Groups Projects
Commit 4960d370 authored by ale's avatar ale
Browse files

initial commit: base building blocks and node logic

parents
No related branches found
No related tags found
No related merge requests found
README 0 → 100644
radio.ai
========
A distributed, fault-tolerant icecast streaming network.
package radioai
import (
"flag"
"io/ioutil"
"log"
"net"
"strings"
"github.com/coreos/go-etcd/etcd"
)
var (
etcdMachines = flag.String("etcd-servers", "localhost:7800", "Etcd servers (comma-separated list)")
etcdCertFile = flag.String("etcd-cert", "", "SSL certificate for etcd client")
etcdKeyFile = flag.String("etcd-key", "", "SSL private key for etcd client")
)
func loadFile(path string) string {
data, err := ioutil.ReadFile(path)
if err != nil {
log.Fatal(err)
}
return string(data)
}
func resolveAll(input []string) []string {
result := make([]string, 0)
for _, hostport := range input {
host, port, err := net.SplitHostPort(hostport)
if err != nil {
log.Fatal("Error parsing etcd server spec '%s': %s", hostport, err)
}
addrs, err := net.LookupHost(host)
if err != nil {
log.Fatal("Error resolving etcd server spec '%s': %s", hostport, err)
}
for _, a := range addrs {
result = append(result, net.JoinHostPort(a, port))
}
}
return result
}
func NewEtcdClient() *etcd.Client {
machines := resolveAll(strings.Split(*etcdMachines, ","))
if len(machines) == 0 {
log.Fatal("No etcd servers specified!")
}
c := etcd.NewClient(machines)
if *etcdCertFile != "" && *etcdKeyFile != "" {
c.SetScheme(etcd.HTTPS)
if _, err := c.SetCertAndKey(loadFile(*etcdCertFile), loadFile(*etcdKeyFile)); err != nil {
log.Fatal("Error setting up SSL for etcd client: %s", err)
}
}
return c
}
package radioai
import (
"os"
"os/exec"
)
type IcecastController struct {
PublicIp string
ConfigFile string
InitScript string
}
func NewIcecastController(publicIp string) *IcecastController {
return &IcecastController{
PublicIp: publicIp,
ConfigFile: "/etc/icecast2/icecast.conf",
InitScript: "/etc/init.d/icecast2",
}
}
func (ic *IcecastController) reload() error {
err := exec.Command(ic.InitScript, "reload").Run()
if err != nil {
err = exec.Command(ic.InitScript, "start").Run()
}
return err
}
func (ic *IcecastController) Update(conf *ClusterConfig, isMaster bool, masterAddr string) error {
tmpf := ic.ConfigFile + ".tmp"
defer os.Remove(tmpf)
iconfig := NewIcecastConfig(conf, ic.PublicIp, isMaster, masterAddr)
if err := iconfig.EncodeToFile(tmpf); err != nil {
return err
}
if err := os.Rename(tmpf, ic.ConfigFile); err != nil {
return err
}
return ic.reload()
}
package radioai
import (
"bytes"
"encoding/xml"
"io"
"os"
)
var (
baseHttpPort = 8000
shoutHttpPort = 8001
maxClients = 10000
)
type iceLimitsConfig struct {
Clients int `xml:"clients"`
Sources int `xml:"sources"`
Threadpool int `xml:"threadpool"`
QueueSize int `xml:"queue-size"`
ClientTimeout int `xml:"client-timeout"`
HeaderTimeout int `xml:"header-timeout"`
SourceTimeout int `xml:"source-timeout"`
BurstOnConnect int `xml:"burst-on-connect"`
BurstSize int `xml:"burst-size"`
}
type iceAuthenticationConfig struct {
SourcePassword string `xml:"source-password"`
AdminUser string `xml:"admin-user"`
AdminPassword string `xml:"admin-password"`
}
type iceListenConfig struct {
BindAddress string `xml:"bind-address"`
Port int `xml:"port"`
ShoutcastCompat int `xml:"shoutcast-compat"`
}
type icePathsConfig struct {
Basedir string `xml:"basedir"`
Logdir string `xml:"logdir"`
Webroot string `xml:"webroot"`
Adminroot string `xml:"adminroot"`
}
type iceLoggingConfig struct {
Accesslog string `xml:"accesslog"`
Errorlog string `xml:"errorlog"`
Loglevel int `xml:"loglevel"`
Logsize int `xml:"logsize"`
}
type iceSecurityConfig struct {
Chroot int `xml:"chroot"`
}
type iceRelayConfig struct {
Server string `xml:"server"`
Port int `xml:"port"`
Mount string `xml:"mount"`
LocalMount string `xml:"local-mount"`
Username string `xml:"username"`
Password string `xml:"password"`
OnDemand int `xml:"on-demand"`
RelayShoutcastMetadata int `xml:"relay-shoutcast-metadata"`
}
type iceMountConfig struct {
Name string `xml:"mount-name"`
Username string `xml:"username"`
Password string `xml:"password"`
// MaxListeners int `xml:"max-listeners"`
FallbackMount string `xml:"fallback-mount,omitempty"`
FallbackOverride int `xml:"fallback-override,omitempty"`
Hidden int `xml:"hidden"`
NoYp int `xml:"no-yp"`
OnConnect string `xml:"on-connect,omitempty"`
OnDisconnect string `xml:"on-disconnect,omitempty"`
}
type IcecastConfig struct {
XMLName xml.Name
Limits iceLimitsConfig `xml:"limits"`
Auth iceAuthenticationConfig `xml:"authentication"`
Hostname string `xml:"hostname"`
Fileserve int `xml:"fileserve"`
Paths icePathsConfig `xml:"paths"`
Logging iceLoggingConfig `xml:"logging"`
Security iceSecurityConfig `xml:"security"`
Listen []iceListenConfig `xml:"listen-socket"`
Relays []iceRelayConfig `xml:"relay"`
Mounts []iceMountConfig `xml:"mount"`
}
func defaultDebianConfig() *IcecastConfig {
sourcePw := "x"
adminPw := "password"
return &IcecastConfig{
XMLName: xml.Name{"", "icecast"},
Limits: iceLimitsConfig{
Clients: maxClients,
Sources: maxClients / 2,
Threadpool: 16,
QueueSize: 2 << 10,
ClientTimeout: 30,
HeaderTimeout: 15,
SourceTimeout: 5,
BurstOnConnect: 1,
BurstSize: 65535,
},
Auth: iceAuthenticationConfig{
SourcePassword: sourcePw,
AdminUser: "admin",
AdminPassword: adminPw,
},
Fileserve: 1,
Paths: icePathsConfig{
Basedir: "/usr/share/icecast2",
Logdir: "/var/log/icecast2",
Webroot: "/usr/share/icecast2/web",
Adminroot: "/usr/share/icecast2/admin",
},
Logging: iceLoggingConfig{
Accesslog: "access.log",
Errorlog: "error.log",
Loglevel: 3,
Logsize: 10000,
},
Security: iceSecurityConfig{0},
Listen: []iceListenConfig{
{"0.0.0.0", baseHttpPort, 0},
{"0.0.0.0", shoutHttpPort, 1},
},
Relays: []iceRelayConfig{},
Mounts: []iceMountConfig{},
}
}
func (c *IcecastConfig) Encode() ([]byte, error) {
var buf bytes.Buffer
output, err := xml.MarshalIndent(c, "", " ")
if err != nil {
return nil, err
}
io.WriteString(&buf, "<!-- Automatically generated, do not edit -->\n\n")
if _, err := buf.Write(output); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func (c *IcecastConfig) EncodeToFile(path string) error {
file, err := os.Create(path)
if err != nil {
return err
}
defer file.Close()
data, err := c.Encode()
if err != nil {
return err
}
_, err = file.Write(data)
return err
}
func mountToConfig(m *Mount) iceMountConfig {
mconfig := iceMountConfig{
Name: m.Name,
Username: m.Username,
Password: m.Password,
// MaxListeners: 1000,
Hidden: 0,
NoYp: 1,
}
if m.Fallback != "" {
mconfig.FallbackMount = m.Fallback
mconfig.FallbackOverride = 1
}
return mconfig
}
func mountToRelay(masterAddr string, m *Mount) iceRelayConfig {
return iceRelayConfig{
Mount: m.Name,
LocalMount: m.Name,
Server: masterAddr,
Port: baseHttpPort,
Username: m.Username,
Password: m.Password,
OnDemand: 1,
RelayShoutcastMetadata: 1,
}
}
func NewIcecastConfig(config *ClusterConfig, publicIp string, isMaster bool, masterAddr string) *IcecastConfig {
iconf := defaultDebianConfig()
iconf.Hostname = publicIp
for _, m := range config.ListMounts() {
if isMaster {
iconf.Mounts = append(iconf.Mounts, mountToConfig(m))
} else {
iconf.Relays = append(iconf.Relays, mountToRelay(masterAddr, m))
}
}
return iconf
}
package radioai
import (
"strings"
"testing"
)
func TestIcecastConfig(t *testing.T) {
mount := &Mount{
Name: "/test.ogg",
Username: "user",
Password: "pass",
}
c := NewClusterConfig()
c.UpdateMount(mount)
// Test a relay config.
ice := NewIcecastConfig(c, "1.2.3.4", false, "2.3.4.5")
output, err := ice.Encode()
if err != nil {
t.Fatal(err)
}
outputs := string(output)
if !strings.Contains(outputs, "<icecast>") {
t.Fatalf("No <icecast> element:\n%s", output)
}
if !strings.Contains(outputs, "<relay>") {
t.Fatalf("Mount not configured as relay:\n%s", output)
}
// Test a master config.
ice = NewIcecastConfig(c, "1.2.3.4", true, "2.3.4.5")
output, err = ice.Encode()
if err != nil {
t.Fatal(err)
}
outputs = string(output)
if !strings.Contains(outputs, "<mount>") {
t.Fatalf("Mount not configured as master:\n%s", output)
}
}
package masterelection
import (
"log"
"time"
"github.com/coreos/go-etcd/etcd"
)
const (
STATE_SLAVE = iota
STATE_MASTER
)
type MasterElection struct {
client *etcd.Client
Addr string
MasterAddr string
Path string
TTL uint64
State int
StateChange chan int
}
func NewMasterElection(client *etcd.Client, path, addr string, ttl uint64, sch chan int) *MasterElection {
if ttl < 2 {
ttl = 2
}
return &MasterElection{
client: client,
Path: path,
Addr: addr,
TTL: ttl,
State: STATE_SLAVE,
StateChange: sch,
}
}
func (m *MasterElection) IsMaster() bool {
return m.State == STATE_MASTER
}
func (m *MasterElection) setState(state int) {
if m.State == state {
return
}
if m.StateChange != nil {
m.StateChange <- state
}
m.State = state
}
func (m *MasterElection) Run() {
// Start as a slave.
m.setState(STATE_SLAVE)
halfttl := time.Second * time.Duration(m.TTL / 2)
for {
// Try to acquire the lock. If we are currently the
// master, the previous value should be our own
// address, otherwise it should be unset.
prevValue := ""
if m.State == STATE_MASTER {
prevValue = m.Addr
}
resp, ok, err := m.client.TestAndSet(m.Path, prevValue, m.Addr, m.TTL)
if err != nil {
log.Printf("%s: error from etcd: %s", m.Path, err)
time.Sleep(20 * time.Millisecond)
continue
}
if ok {
// Howdy, we're the master now. Wait a while
// and renew our TTL.
m.setState(STATE_MASTER)
m.MasterAddr = m.Addr
time.Sleep(halfttl)
} else {
// We're not the master. Watch for a DELETE
// (in theory, but we're not actually
// verifying the action type, just waiting for
// the first event...)
m.setState(STATE_SLAVE)
m.MasterAddr = resp.PrevValue
_, err := m.client.Watch(m.Path, resp.Index, nil, nil)
if err != nil {
log.Printf("%s: watch error: %s", m.Path, err)
}
}
}
}
node.go 0 → 100644
package radioai
import (
"encoding/json"
"log"
"path/filepath"
"strings"
"sync"
"time"
"git.autistici.org/ale/radioai/masterelection"
"github.com/coreos/go-etcd/etcd"
)
var (
masterElectionPath = "/icecast/cluster/master"
mountPrefix = "/icecast/mounts/"
nodePrefix = "/icecast/nodes/"
)
// A mountpoint for a stream.
type Mount struct {
// Name (path to the mountpoint).
Name string
// Username for source authentication.
Username string
// Password for source authentication.
Password string
// Fallback stream name (optional).
Fallback string
}
// In-memory representation of the overall configuration (basically
// just a list of the known mounts).
type ClusterConfig struct {
mounts map[string]*Mount
lock sync.Mutex
}
func NewClusterConfig() *ClusterConfig {
return &ClusterConfig{
mounts: make(map[string]*Mount),
}
}
func (c *ClusterConfig) GetMount(name string) *Mount {
c.lock.Lock()
defer c.lock.Unlock()
return c.mounts[name]
}
func (c *ClusterConfig) UpdateMount(m *Mount) {
c.lock.Lock()
defer c.lock.Unlock()
c.mounts[m.Name] = m
}
func (c *ClusterConfig) DelMount(name string) {
c.lock.Lock()
defer c.lock.Unlock()
delete(c.mounts, name)
}
func (c *ClusterConfig) ListMounts() []*Mount {
c.lock.Lock()
defer c.lock.Unlock()
result := make([]*Mount, 0, len(c.mounts))
for _, m := range c.mounts {
result = append(result, m)
}
return result
}
type ConfigWatcher struct {
client *etcd.Client
config *ClusterConfig
rch chan *etcd.Response
upch chan bool
}
func NewConfigWatcher(client *etcd.Client, config *ClusterConfig, upch chan bool) *ConfigWatcher {
return &ConfigWatcher{
client: client,
config: config,
rch: make(chan *etcd.Response, 100),
upch: upch,
}
}
func (w *ConfigWatcher) syncer() {
for response := range w.rch {
mountName := filepath.Base(response.Key)
if response.Action == "DELETE" {
log.Printf("deleted mount '%s'", mountName)
w.config.DelMount(mountName)
} else {
log.Printf("update to mount '%s'", mountName)
var m Mount
if err := json.NewDecoder(strings.NewReader(response.Value)).Decode(&m); err != nil {
log.Printf("corrupted data: %s", err)
} else {
w.config.UpdateMount(&m)
}
}
// Only flip the update signal once.
select {
case w.upch <- true:
}
}
}
func (w *ConfigWatcher) Run() {
go w.syncer()
// Run until the first successful Get().
var index uint64
for {
responses, err := w.client.Get(mountPrefix)
if err == nil {
// Inject all the replies into the channel.
for _, r := range responses {
index = r.Index
w.rch <- r
}
break
}
log.Printf("Get error: %s", err)
time.Sleep(1 * time.Second)
}
// Now start the watcher.
_, err := w.client.Watch(mountPrefix, index, w.rch, nil)
if err != nil {
log.Printf("Watch error: %s", err)
}
}
type RadioCluster struct {
ip string
client *etcd.Client
me *masterelection.MasterElection
config *ClusterConfig
watcher *ConfigWatcher
icecast *IcecastController
upch chan bool
livenessTtl uint64
}
func NewRadioCluster(ip string, client *etcd.Client) *RadioCluster {
config := NewClusterConfig()
// Network updates trigger icecast reconfiguration.
upch := make(chan bool, 1)
// MasterElection changes trigger an update.
mech := make(chan int)
go func() {
for _ = range mech {
select {
case upch <- true:
}
}
}()
return &RadioCluster{
ip: ip,
client: client,
me: masterelection.NewMasterElection(
client,
masterElectionPath,
ip,
5,
mech),
config: config,
watcher: NewConfigWatcher(client, config, upch),
icecast: NewIcecastController(ip),
upch: upch,
livenessTtl: 2,
}
}
func (rc *RadioCluster) presence() {
for {
if _, err := rc.client.Set(nodePrefix + rc.ip, rc.ip, rc.livenessTtl); err != nil {
log.Printf("Set() error: %s", err)
time.Sleep(100 * time.Millisecond)
} else {
time.Sleep(time.Duration(rc.livenessTtl / 2) * time.Second)
}
}
}
// Return the list of currently active nodes.
func (rc *RadioCluster) GetNodes() ([]string, error) {
response, err := rc.client.Get(nodePrefix)
if err != nil {
return nil, err
}
result := make([]string, 0, len(response))
for _, r := range response {
result = append(result, r.Value)
}
return result, nil
}
func (rc *RadioCluster) Run() {
// Bootstrap the config watcher. This ensures that we have a
// full configuration (thanks to the Get() call) before we
// start managing the icecast server.
rc.watcher.Run()
// Start the presence heartbeat.
go rc.presence()
for _ = range rc.upch {
if err := rc.icecast.Update(rc.config, rc.me.IsMaster(), rc.me.MasterAddr); err != nil {
log.Printf("Update() failed: %s", err)
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment