Skip to content
Snippets Groups Projects
Commit 4c9e0b30 authored by ale's avatar ale
Browse files

add client API; add graceful stop; clean up code

parent 4960d370
Branches
No related tags found
No related merge requests found
TODO 0 → 100644
Not necessarily ordered by priority:
- HTTP redirectors for sources (point to the master), and relays (point to
any node) for the low-TTL naming service.
- DNS server to provide the necessary bootstrap infrastructure and
the higher-TTL, stable, naming service.
- Debian packages, init scripts, everything that is necessary for proper
packaging and easy distribution.
api.go 0 → 100644
package radioai
import (
"bytes"
"encoding/json"
"errors"
"github.com/coreos/go-etcd/etcd"
"strings"
)
func mountPath(mountName string) string {
return mountPrefix + mountName[1:]
}
// RadioAPI is the actual API to the streaming cluster's database.
type RadioAPI struct {
client *etcd.Client
}
// GetMount returns data on a specific mountpoint (returns an error if
// not found).
func (r *RadioAPI) GetMount(mountName string) (*Mount, error) {
response, err := r.client.Get(mountPath(mountName))
if err != nil {
return nil, err
}
if len(response) != 1 {
return nil, errors.New("not found")
}
var m Mount
if err := json.NewDecoder(strings.NewReader(response[0].Value)).Decode(&m); err != nil {
return nil, err
}
return &m, nil
}
// SetMount creates or updates a mountpoint.
func (r *RadioAPI) SetMount(m *Mount) error {
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(m); err != nil {
return err
}
_, err := r.client.Set(mountPath(m.Name), buf.String(), 0)
return err
}
// DelMount removes a mountpoint.
func (r *RadioAPI) DelMount(mountName string) error {
_, err := r.client.Delete(mountPath(mountName))
return err
}
// ListMounts returns a list of all the configured mountpoints.
func (r *RadioAPI) ListMounts() ([]*Mount, error) {
response, err := r.client.Get(mountPrefix)
if err != nil {
return nil, err
}
result := make([]*Mount, 0, len(response))
for _, entry := range response {
var m Mount
if err := json.NewDecoder(strings.NewReader(entry.Value)).Decode(&m); err != nil {
continue
}
result = append(result, &m)
}
return result, nil
}
// GetNodes returns the list of active cluster nodes.
func (r *RadioAPI) GetNodes() ([]string, error) {
response, err := r.client.Get(nodePrefix)
if err != nil {
return nil, err
}
result := make([]string, 0, len(response))
for _, entry := range response {
result = append(result, entry.Value)
}
return result, nil
}
......@@ -12,7 +12,7 @@ func TestIcecastConfig(t *testing.T) {
Password: "pass",
}
c := NewClusterConfig()
c.UpdateMount(mount)
c.setMount(mount)
// Test a relay config.
ice := NewIcecastConfig(c, "1.2.3.4", false, "2.3.4.5")
......
......@@ -14,6 +14,8 @@ const (
type MasterElection struct {
client *etcd.Client
stop chan bool
stopped bool
Addr string
MasterAddr string
......@@ -24,7 +26,7 @@ type MasterElection struct {
StateChange chan int
}
func NewMasterElection(client *etcd.Client, path, addr string, ttl uint64, sch chan int) *MasterElection {
func NewMasterElection(client *etcd.Client, path, addr string, ttl uint64, sch chan int, stop chan bool) *MasterElection {
if ttl < 2 {
ttl = 2
}
......@@ -35,6 +37,7 @@ func NewMasterElection(client *etcd.Client, path, addr string, ttl uint64, sch c
TTL: ttl,
State: STATE_SLAVE,
StateChange: sch,
stop: stop,
}
}
......@@ -52,13 +55,31 @@ func (m *MasterElection) setState(state int) {
m.State = state
}
func (m *MasterElection) stopper() {
<-m.stop
// Tell the Run() goroutine to exit as soon as it can (we
// could have simply used the 'stop' channel there but Watch()
// ignores the stop channel if invoked without a receiving
// channel and we'd have to refactor Run() to use a temp
// channel for each invocation.
m.stopped = true
// Remove the lock file if we are the master.
if m.State == STATE_MASTER {
m.client.Delete(m.Path)
}
}
func (m *MasterElection) Run() {
go m.stopper()
// Start as a slave.
m.setState(STATE_SLAVE)
halfttl := time.Second * time.Duration(m.TTL / 2)
for {
for !m.stopped {
// Try to acquire the lock. If we are currently the
// master, the previous value should be our own
// address, otherwise it should be unset.
......
......@@ -3,7 +3,6 @@ package radioai
import (
"encoding/json"
"log"
"path/filepath"
"strings"
"sync"
"time"
......@@ -18,6 +17,13 @@ var (
nodePrefix = "/icecast/nodes/"
)
func trigger(c chan bool) {
select {
case c <- true:
default:
}
}
// A mountpoint for a stream.
type Mount struct {
// Name (path to the mountpoint).
......@@ -46,74 +52,91 @@ func NewClusterConfig() *ClusterConfig {
}
}
// TODO: remove?
func (c *ClusterConfig) GetMount(name string) *Mount {
c.lock.Lock()
defer c.lock.Unlock()
return c.mounts[name]
}
func (c *ClusterConfig) UpdateMount(m *Mount) {
// TODO: remove?
func (c *ClusterConfig) ListMounts() []*Mount {
c.lock.Lock()
defer c.lock.Unlock()
c.mounts[m.Name] = m
result := make([]*Mount, 0, len(c.mounts))
for _, m := range c.mounts {
result = append(result, m)
}
return result
}
func (c *ClusterConfig) DelMount(name string) {
// Update a mount (in-memory only).
func (c *ClusterConfig) setMount(m *Mount) {
c.lock.Lock()
defer c.lock.Unlock()
delete(c.mounts, name)
c.mounts[m.Name] = m
}
func (c *ClusterConfig) ListMounts() []*Mount {
// Delete a mount (in-memory only).
func (c *ClusterConfig) delMount(name string) {
c.lock.Lock()
defer c.lock.Unlock()
result := make([]*Mount, 0, len(c.mounts))
for _, m := range c.mounts {
result = append(result, m)
}
return result
delete(c.mounts, name)
}
type ConfigWatcher struct {
// Keeps the in-memory service configuration in sync with the
// distributed database. An update channel is triggered whenever the
// data changes.
type ConfigSyncer struct {
client *etcd.Client
config *ClusterConfig
rch chan *etcd.Response
upch chan bool
stop chan bool
}
func NewConfigWatcher(client *etcd.Client, config *ClusterConfig, upch chan bool) *ConfigWatcher {
return &ConfigWatcher{
func NewConfigSyncer(client *etcd.Client, config *ClusterConfig, upch chan bool, stop chan bool) *ConfigSyncer {
return &ConfigSyncer{
client: client,
config: config,
rch: make(chan *etcd.Response, 100),
upch: upch,
stop: stop,
}
}
func (w *ConfigWatcher) syncer() {
for response := range w.rch {
mountName := filepath.Base(response.Key)
if response.Action == "DELETE" {
log.Printf("deleted mount '%s'", mountName)
w.config.DelMount(mountName)
} else {
log.Printf("update to mount '%s'", mountName)
var m Mount
if err := json.NewDecoder(strings.NewReader(response.Value)).Decode(&m); err != nil {
log.Printf("corrupted data: %s", err)
func (w *ConfigSyncer) syncer() {
for {
select {
case response := <-w.rch:
// Remove mountPrefix from the beginning of
// the path, but keep the leading slash.
mountName := response.Key[len(mountPrefix)-1:]
if response.Action == "DELETE" {
log.Printf("deleted mount '%s'", mountName)
w.config.delMount(mountName)
} else {
w.config.UpdateMount(&m)
log.Printf("update to mount '%s'", mountName)
var m Mount
if err := json.NewDecoder(strings.NewReader(response.Value)).Decode(&m); err != nil {
log.Printf("corrupted data: %s", err)
} else {
w.config.setMount(&m)
}
}
}
trigger(w.upch)
// Only flip the update signal once.
select {
case w.upch <- true:
case <-w.stop:
return
}
}
}
func (w *ConfigWatcher) Run() {
// Run the ConfigSyncer in the background. It will wait for
// initialization to complete, so that when this function returns, the
// in-memory configuration has already been fully synchronized.
func (w *ConfigSyncer) Run() {
go w.syncer()
// Run until the first successful Get().
......@@ -139,34 +162,42 @@ func (w *ConfigWatcher) Run() {
}
}
type RadioCluster struct {
// An active streaming node, managing the local icecast server.
type RadioNode struct {
Config *ClusterConfig
ip string
client *etcd.Client
me *masterelection.MasterElection
config *ClusterConfig
watcher *ConfigWatcher
watcher *ConfigSyncer
icecast *IcecastController
upch chan bool
livenessTtl uint64
upch chan bool
stop chan bool
}
func NewRadioCluster(ip string, client *etcd.Client) *RadioCluster {
func NewRadioNode(ip string, client *etcd.Client) *RadioNode {
config := NewClusterConfig()
// Network updates trigger icecast reconfiguration.
// Network updates trigger icecast reconfiguration. This
// channel is used as an 'event', no more than one entry will
// be queued.
upch := make(chan bool, 1)
// MasterElection changes trigger an update.
mech := make(chan int)
go func() {
for _ = range mech {
select {
case upch <- true:
}
for state := range mech {
log.Printf("master election status changed: %d", state)
trigger(upch)
}
}()
return &RadioCluster{
// Global 'stop' channel.
stopch := make(chan bool)
return &RadioNode{
Config: config,
ip: ip,
client: client,
me: masterelection.NewMasterElection(
......@@ -174,40 +205,35 @@ func NewRadioCluster(ip string, client *etcd.Client) *RadioCluster {
masterElectionPath,
ip,
5,
mech),
config: config,
watcher: NewConfigWatcher(client, config, upch),
mech,
stopch),
watcher: NewConfigSyncer(client, config, upch, stopch),
icecast: NewIcecastController(ip),
upch: upch,
livenessTtl: 2,
upch: upch,
stop: stopch,
}
}
func (rc *RadioCluster) presence() {
// The presence goroutine continuously updates our entry in the list
// of nodes.
func (rc *RadioNode) presence() {
ticker := time.NewTicker(time.Duration(rc.livenessTtl / 2) * time.Second)
for {
if _, err := rc.client.Set(nodePrefix + rc.ip, rc.ip, rc.livenessTtl); err != nil {
log.Printf("Set() error: %s", err)
time.Sleep(100 * time.Millisecond)
} else {
time.Sleep(time.Duration(rc.livenessTtl / 2) * time.Second)
select {
case <-ticker.C:
if _, err := rc.client.Set(nodePrefix + rc.ip, rc.ip, rc.livenessTtl); err != nil {
log.Printf("Set() error: %s", err)
}
case <-rc.stop:
return
}
}
}
// Return the list of currently active nodes.
func (rc *RadioCluster) GetNodes() ([]string, error) {
response, err := rc.client.Get(nodePrefix)
if err != nil {
return nil, err
}
result := make([]string, 0, len(response))
for _, r := range response {
result = append(result, r.Value)
}
return result, nil
}
func (rc *RadioCluster) Run() {
// Run the node. This method does not return until someone calls Stop().
func (rc *RadioNode) Run() {
// Bootstrap the config watcher. This ensures that we have a
// full configuration (thanks to the Get() call) before we
// start managing the icecast server.
......@@ -216,9 +242,24 @@ func (rc *RadioCluster) Run() {
// Start the presence heartbeat.
go rc.presence()
for _ = range rc.upch {
if err := rc.icecast.Update(rc.config, rc.me.IsMaster(), rc.me.MasterAddr); err != nil {
log.Printf("Update() failed: %s", err)
for {
select {
case <-rc.upch:
if err := rc.icecast.Update(rc.Config, rc.me.IsMaster(), rc.me.MasterAddr); err != nil {
log.Printf("Update() failed: %s", err)
}
// Do not reload icecast more often than once
// every two seconds.
time.Sleep(2 * time.Second)
case <-rc.stop:
return
}
}
}
// Stop everything.
func (rc *RadioNode) Stop() {
close(rc.stop)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment