package autoradio import ( "bytes" "crypto/rand" "encoding/base64" "encoding/json" "errors" "net" "strings" "sync" "time" ) const ( // Path to the mount configuration in etcd. This should never // change, upgrades to the configuration format should be // backwards-compatible. MountPrefix = "/icecast/mounts/" // Path for the cluster runtime data. Whenever the format of // this data changes, the ABIVersion should be increased. A // rolling restart of the cluster will then seamlessly cause a // transition to the new consensus (the cluster will be // partitioned in the meantime). ABIVersion = "2" MasterElectionPath = "/icecast/" + ABIVersion + "/cluster/master" NodePrefix = "/icecast/" + ABIVersion + "/nodes/" IcecastPort = 8000 IcecastMountPrefix = "/_stream" ) var ( ErrIsDirectory = errors.New("key is a directory") ErrIsFile = errors.New("key is a file") ) // A mountpoint for a stream. type Mount struct { // Name (path to the mountpoint). Name string // Username for source authentication. Username string // Password for source authentication. Password string // Is this field is set, the mount point will be relaying an // external stream and no source connections will be accepted. // Each node will pull from the external source directly, // ignoring our master election protocol. RelayUrl string // Fallback stream name (optional). Fallback string } func (m *Mount) Equal(other *Mount) bool { return *m == *other } func (m *Mount) IsRelay() bool { return m.RelayUrl != "" } // Return the path in etcd used to store mountpoint configuration. func mountEtcdPath(mountName string) string { return MountPrefix + mountName[1:] } // Return the Icecast mount path for the given public mount name. func MountNameToIcecastPath(mountName string) string { return IcecastMountPrefix + mountName } // Return the public mount name from an Icecast mount path. func IcecastPathToMountName(path string) string { return strings.TrimPrefix(path, IcecastMountPrefix) } // Status of a mount on an individual Icecast server. type IcecastMountStatus struct { Name string Listeners int BitRate int Quality float64 VideoQuality float64 FrameSize string FrameRate float64 } // Status of a node. This is used to report load and stream status. type NodeStatus struct { // Short name of this node. Name string // Public IP addresses of this server. IP []net.IP // Is the Icecast server up? IcecastUp bool // List of mount points. Mounts []IcecastMountStatus `xml:"mount"` // Bandwidth utilization. BandwidthUsage float64 } func (ns *NodeStatus) NumListeners() int { listeners := 0 for _, m := range ns.Mounts { listeners += m.Listeners } return listeners } // Cache the list of active nodes (the front-ends that need to // retrieve this information continuously, so we limit them to 2qps). type nodesCache struct { ttl time.Duration nodes []*NodeStatus deadline time.Time lock sync.Mutex } type getNodesFunc func() ([]*NodeStatus, error) func newNodesCache() *nodesCache { return &nodesCache{ ttl: 500 * time.Millisecond, } } // Get returns the cached value of 'fn', if valid. If the value is // expired and we get an error from 'fn', we will attempt to return // the previously cached value anyway, along with the error: the // caller can then pick the right failure behavior. func (nc *nodesCache) Get(fn getNodesFunc) ([]*NodeStatus, error) { nc.lock.Lock() defer nc.lock.Unlock() var err error now := time.Now() if now.After(nc.deadline) { var nodes []*NodeStatus if nodes, err = fn(); err == nil { nc.nodes = nodes nc.deadline = now.Add(nc.ttl) } } return nc.nodes, err } // Client is the actual API to the streaming cluster's database. type Client struct { client EtcdClient activeNodesCache *nodesCache } func NewClient(client EtcdClient) *Client { return &Client{client, newNodesCache()} } // GetMount returns data on a specific mountpoint (returns nil if not // found). func (r *Client) GetMount(mountName string) (*Mount, error) { response, err := r.client.Get(mountEtcdPath(mountName), false, false) if err != nil || response.Node == nil { return nil, err } if response.Node.Dir { return nil, ErrIsDirectory } var m Mount if err := json.NewDecoder(strings.NewReader(response.Node.Value)).Decode(&m); err != nil { return nil, err } return &m, nil } // SetMount creates or updates a mountpoint. func (r *Client) SetMount(m *Mount) error { var buf bytes.Buffer if err := json.NewEncoder(&buf).Encode(m); err != nil { return err } _, err := r.client.Set(mountEtcdPath(m.Name), buf.String(), 0) return err } // DelMount removes a mountpoint. func (r *Client) DelMount(mountName string) error { _, err := r.client.Delete(mountEtcdPath(mountName), false) return err } // ListMounts returns a list of all the configured mountpoints. func (r *Client) ListMounts() ([]*Mount, error) { response, err := r.client.Get(MountPrefix, true, false) if err != nil || response.Node == nil { return nil, err } if !response.Node.Dir { return nil, ErrIsFile } result := make([]*Mount, 0, len(response.Node.Nodes)) for _, n := range response.Node.Nodes { if n.Dir { continue } var m Mount if err := json.NewDecoder(strings.NewReader(n.Value)).Decode(&m); err != nil { continue } result = append(result, &m) } return result, nil } // Location data for the master node. Having the IP address here saves // another round-trip to etcd to retrieve the node info in the most // common case. type MasterNodeInfo struct { Name string IP []net.IP } // GetMasterAddr returns the address of the current master server. func (r *Client) GetMasterInfo() (*MasterNodeInfo, error) { response, err := r.client.Get(MasterElectionPath, false, false) if err != nil || response.Node == nil { return nil, err } if response.Node.Dir { return nil, ErrIsDirectory } var m MasterNodeInfo if err := json.NewDecoder(strings.NewReader(response.Node.Value)).Decode(&m); err != nil { return nil, err } return &m, nil } // GetNodes returns the list of active cluster nodes. func (r *Client) doGetNodes() ([]*NodeStatus, error) { response, err := r.client.Get(NodePrefix, false, false) if err != nil || response.Node == nil { return nil, err } if !response.Node.Dir { return nil, ErrIsFile } result := make([]*NodeStatus, 0, len(response.Node.Nodes)) for _, entry := range response.Node.Nodes { var ns NodeStatus if err := json.NewDecoder(strings.NewReader(entry.Value)).Decode(&ns); err == nil { result = append(result, &ns) } } return result, nil } func (r *Client) GetNodes() ([]*NodeStatus, error) { return r.activeNodesCache.Get(r.doGetNodes) } // GeneratePassword returns a new random password. func GeneratePassword() string { b := make([]byte, 6) rand.Read(b) return base64.StdEncoding.EncodeToString(b) }