Skip to content
Snippets Groups Projects
Commit 8f0de3a9 authored by ale's avatar ale
Browse files

split code into subpackages

parent 15e77d20
No related branches found
No related tags found
No related merge requests found
......@@ -14,9 +14,9 @@ import (
)
var (
masterElectionPath = "/icecast/cluster/master"
mountPrefix = "/icecast/mounts/"
nodePrefix = "/icecast/nodes/"
MasterElectionPath = "/icecast/cluster/master"
MountPrefix = "/icecast/mounts/"
NodePrefix = "/icecast/nodes/"
)
// A mountpoint for a stream.
......@@ -35,7 +35,7 @@ type Mount struct {
}
func mountPath(mountName string) string {
return mountPrefix + mountName[1:]
return MountPrefix + mountName[1:]
}
// Cache the list of active nodes.
......@@ -121,7 +121,7 @@ func (r *RadioAPI) DelMount(mountName string) error {
// ListMounts returns a list of all the configured mountpoints.
func (r *RadioAPI) ListMounts() ([]*Mount, error) {
response, err := r.client.Get(mountPrefix)
response, err := r.client.Get(MountPrefix)
if err != nil {
return nil, err
}
......@@ -138,7 +138,7 @@ func (r *RadioAPI) ListMounts() ([]*Mount, error) {
// GetMasterAddr returns the address of the current master server.
func (r *RadioAPI) GetMasterAddr() (string, error) {
response, err := r.client.Get(masterElectionPath)
response, err := r.client.Get(MasterElectionPath)
if err != nil {
return "", err
}
......@@ -150,7 +150,7 @@ func (r *RadioAPI) GetMasterAddr() (string, error) {
// GetNodes returns the list of active cluster nodes.
func (r *RadioAPI) doGetNodes() ([]string, error) {
response, err := r.client.Get(nodePrefix)
response, err := r.client.Get(NodePrefix)
if err != nil {
return nil, err
}
......
......@@ -8,6 +8,7 @@ import (
"syscall"
"git.autistici.org/ale/radioai"
"git.autistici.org/ale/radioai/node"
)
var (
......@@ -19,16 +20,16 @@ func main() {
flag.Parse()
client := radioai.NewEtcdClient()
node := radioai.NewRadioNode(*publicIp, client)
n := node.NewRadioNode(*publicIp, client)
// Set up a clean shutdown function on SIGTERM.
stopch := make(chan os.Signal)
go func() {
<- stopch
log.Printf("terminating...")
node.Stop()
n.Stop()
}()
signal.Notify(stopch, syscall.SIGTERM, syscall.SIGINT)
node.Run()
}
\ No newline at end of file
n.Run()
}
......@@ -8,6 +8,7 @@ import (
"time"
"git.autistici.org/ale/radioai"
"git.autistici.org/ale/radioai/fe"
)
var (
......@@ -29,9 +30,9 @@ func main() {
client := radioai.NewEtcdClient()
api := radioai.NewRadioAPI(client)
red := radioai.NewHttpRedirector(api)
red := fe.NewHttpRedirector(api)
dnsRed := radioai.NewDnsRedirector(api, *domain, *publicIp, dnsTtl)
dnsRed := fe.NewDnsRedirector(api, *domain, *publicIp, dnsTtl)
dnsRed.Run(fmt.Sprintf(":%d", *dnsPort))
httpServer := &http.Server{
......
package radioai
package fe
import (
"fmt"
......@@ -9,6 +9,7 @@ import (
"time"
"github.com/miekg/dns"
"git.autistici.org/ale/radioai"
)
var (
......@@ -26,7 +27,7 @@ var (
)
type DnsRedirector struct {
client *RadioAPI
client *radioai.RadioAPI
origin string
originNumParts int
publicIp string
......@@ -34,7 +35,7 @@ type DnsRedirector struct {
soa dns.RR
}
func NewDnsRedirector(client *RadioAPI, origin, publicIp string, ttl int) *DnsRedirector {
func NewDnsRedirector(client *radioai.RadioAPI, origin, publicIp string, ttl int) *DnsRedirector {
if !strings.HasSuffix(origin, ".") {
origin += "."
}
......
package radioai
package fe
import (
"fmt"
......@@ -9,6 +9,8 @@ import (
"strconv"
"strings"
"time"
"git.autistici.org/ale/radioai"
)
// HTTP redirector.
......@@ -21,10 +23,10 @@ import (
// a .m3u file directly pointing at the relays.
//
type HttpRedirector struct {
client *RadioAPI
client *radioai.RadioAPI
}
func NewHttpRedirector(client *RadioAPI) *HttpRedirector {
func NewHttpRedirector(client *radioai.RadioAPI) *HttpRedirector {
return &HttpRedirector{
client: client,
}
......@@ -40,7 +42,7 @@ func (h *HttpRedirector) pickActiveNode() string {
}
// Parse the request and extract the mount path.
func (h *HttpRedirector) getMount(r *http.Request) (*Mount, error) {
func (h *HttpRedirector) getMount(r *http.Request) (*radioai.Mount, error) {
path := r.URL.Path
if strings.HasSuffix(path, ".m3u") {
path = path[:len(path)-4]
......
package radioai
package node
import (
"os"
......
package radioai
package node
import (
"bytes"
"encoding/xml"
"io"
"os"
"git.autistici.org/ale/radioai"
)
var (
......@@ -96,8 +98,8 @@ type icecastConfig struct {
func defaultDebianConfig(publicIp string) *icecastConfig {
// Pick some random passwords on startup. We don't use them,
// but icecast is happier if they're set.
sourcePw := GeneratePassword()
adminPw := GeneratePassword()
sourcePw := radioai.GeneratePassword()
adminPw := radioai.GeneratePassword()
return &icecastConfig{
XMLName: xml.Name{"", "icecast"},
......@@ -172,7 +174,7 @@ func (c *icecastConfig) EncodeToFile(path string) error {
return err
}
func mountToConfig(m *Mount) iceMountConfig {
func mountToConfig(m *radioai.Mount) iceMountConfig {
mconfig := iceMountConfig{
Name: m.Name,
Username: m.Username,
......@@ -188,7 +190,7 @@ func mountToConfig(m *Mount) iceMountConfig {
return mconfig
}
func mountToRelay(masterAddr string, m *Mount) iceRelayConfig {
func mountToRelay(masterAddr string, m *radioai.Mount) iceRelayConfig {
return iceRelayConfig{
Mount: m.Name,
LocalMount: m.Name,
......
package radioai
package node
import (
"strings"
......
package radioai
package node
import (
"encoding/json"
......@@ -7,6 +7,7 @@ import (
"sync"
"time"
"git.autistici.org/ale/radioai"
"git.autistici.org/ale/radioai/masterelection"
"github.com/coreos/go-etcd/etcd"
)
......@@ -21,28 +22,28 @@ func trigger(c chan bool) {
// In-memory representation of the overall configuration (basically
// just a list of the known mounts).
type ClusterConfig struct {
mounts map[string]*Mount
mounts map[string]*radioai.Mount
lock sync.Mutex
}
func NewClusterConfig() *ClusterConfig {
return &ClusterConfig{
mounts: make(map[string]*Mount),
mounts: make(map[string]*radioai.Mount),
}
}
// TODO: remove?
func (c *ClusterConfig) GetMount(name string) *Mount {
func (c *ClusterConfig) GetMount(name string) *radioai.Mount {
c.lock.Lock()
defer c.lock.Unlock()
return c.mounts[name]
}
// TODO: remove?
func (c *ClusterConfig) ListMounts() []*Mount {
func (c *ClusterConfig) ListMounts() []*radioai.Mount {
c.lock.Lock()
defer c.lock.Unlock()
result := make([]*Mount, 0, len(c.mounts))
result := make([]*radioai.Mount, 0, len(c.mounts))
for _, m := range c.mounts {
result = append(result, m)
}
......@@ -50,7 +51,7 @@ func (c *ClusterConfig) ListMounts() []*Mount {
}
// Update a mount (in-memory only).
func (c *ClusterConfig) setMount(m *Mount) {
func (c *ClusterConfig) setMount(m *radioai.Mount) {
c.lock.Lock()
defer c.lock.Unlock()
c.mounts[m.Name] = m
......@@ -91,7 +92,7 @@ func (w *ConfigSyncer) syncer() {
case response := <-w.rch:
// Remove mountPrefix from the beginning of
// the path, but keep the leading slash.
mountName := response.Key[len(mountPrefix)-1:]
mountName := response.Key[len(radioai.MountPrefix)-1:]
switch response.Action {
case "DELETE":
......@@ -99,7 +100,7 @@ func (w *ConfigSyncer) syncer() {
w.config.delMount(mountName)
case "SET":
log.Printf("update to mount %s: %+v", mountName, response)
var m Mount
var m radioai.Mount
if err := json.NewDecoder(strings.NewReader(response.Value)).Decode(&m); err != nil {
log.Printf("corrupted data: %s", err)
continue
......@@ -133,7 +134,7 @@ func (w *ConfigSyncer) Run() {
// Run until the first successful Get().
for {
responses, err := w.client.Get(mountPrefix)
responses, err := w.client.Get(radioai.MountPrefix)
if err == nil {
// Inject all the replies into the channel.
for _, r := range responses {
......@@ -160,7 +161,7 @@ func (w *ConfigSyncer) Run() {
for {
curIndex := w.index + 1
log.Printf("starting watcher at index %d", curIndex)
_, err := w.client.Watch(mountPrefix, curIndex, w.rch, w.stop)
_, err := w.client.Watch(radioai.MountPrefix, curIndex, w.rch, w.stop)
if err == etcd.ErrWatchStoppedByUser {
return
} else if err != nil {
......@@ -210,7 +211,7 @@ func NewRadioNode(ip string, client *etcd.Client) *RadioNode {
client: client,
me: masterelection.NewMasterElection(
client,
masterElectionPath,
radioai.MasterElectionPath,
ip,
5,
mech,
......@@ -231,7 +232,7 @@ func (rc *RadioNode) presence() {
for {
select {
case <-ticker.C:
if _, err := rc.client.Set(nodePrefix + rc.ip, rc.ip, rc.livenessTtl); err != nil {
if _, err := rc.client.Set(radioai.NodePrefix + rc.ip, rc.ip, rc.livenessTtl); err != nil {
log.Printf("presence: Set(): %s", err)
}
case <-rc.stop:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment