package autoradio import ( "bytes" "crypto/rand" "encoding/base64" "encoding/json" "errors" "fmt" "hash/crc32" "net" "strings" "time" "git.autistici.org/ale/autoradio/coordination/presence" ) const ( // MountPrefix stores the path to the mount configuration in // etcd. This should never change between releases, upgrades // to the configuration format should be backwards-compatible. MountPrefix = "/icecast/mounts/" // Paths for the cluster runtime data. Whenever the format of // this data changes, the ABIVersion should be increased. A // rolling restart of the cluster will then seamlessly cause a // transition to the new consensus (the cluster will be // partitioned in the meantime). ABIVersion = "3" MasterElectionPath = "/icecast/" + ABIVersion + "/cluster/master" TranscoderMasterElectionBase = "/icecast/" + ABIVersion + "/transcode" NodePrefix = "/icecast/" + ABIVersion + "/nodes/" IcecastMountPrefix = "/_stream" ) // IcecastPort is the port that the Icecast server will listen // on. Since we fully manage the system-wide Icecast instance, // there's not much point in making this configurable. var IcecastPort = 8000 // EncodingParams used to re-encode a stream. type EncodingParams struct { // Path to the source mountpoint. SourceName string // Parameters for the transcoded stream. The value format is // anything that liquidsoap will accept in its 'output' // directive. Format string BitRate int SampleRate int Channels int StereoMode string Quality float64 } // NewEncodingParams creates an EncodingParams object with the right // default values. func NewEncodingParams() *EncodingParams { return &EncodingParams{ SampleRate: 44100, Channels: 2, Quality: -1, } } func (p *EncodingParams) String() string { var out []string out = append(out, p.Format) if p.BitRate > 0 { out = append(out, fmt.Sprintf("%dkBps", p.BitRate)) } if p.Quality > -1 { out = append(out, fmt.Sprintf("q=%g", p.Quality)) } switch p.Channels { case 1: out = append(out, "mono") case 2: out = append(out, "stereo") } if p.SampleRate > 0 { out = append(out, fmt.Sprintf("%gkHz", float64(p.SampleRate)/1000)) } return strings.Join(out, ", ") } // Valid returns true if the EncodingParams seem to make sense. We try // to be as close to the liquidsoap capabilities as possible. func (p *EncodingParams) Valid() error { switch p.Format { case "mp3", "mp3.cbr", "mp3.abr", "vorbis.cbr", "vorbis.abr": if p.BitRate == 0 { return errors.New("bitrate not specified") } case "mp3.vbr": if p.Quality < 0 || p.Quality > 9 { return errors.New("quality must be in range [0, 9]") } case "vorbis": if p.Quality < -0.2 || p.Quality > 1 { return errors.New("quality must be in range [-0.2, 1]") } case "": return errors.New("format not specified") default: return fmt.Errorf("unknown format \"%s\"", p.Format) } if p.SampleRate == 0 { return errors.New("sample rate not specified") } if p.Channels < 1 || p.Channels > 2 { return errors.New("bad number of channels") } if p.Channels > 1 { switch p.StereoMode { case "", "stereo", "joint_stereo", "default": default: return fmt.Errorf("unknown stereo mode \"%s\"", p.StereoMode) } } return nil } // Mount stores the configuration for a stream (mount, in Icecast lingo). type Mount struct { // Name (path to the mountpoint). Name string // Username for source authentication. Username string // Password for source authentication. Password string // Is this field is set, the mount point will be relaying an // external stream and no source connections will be accepted. // Each node will pull from the external source directly, // ignoring our master election protocol. RelayUrl string // Fallback stream name (optional). Fallback string // If Transcoding is non-nil, this mountpoint represents a // transcoded stream. Transcoding *EncodingParams } // Valid performs a consistency check and returns true if the // configuration for the stream is correct. func (m *Mount) Valid() error { if !strings.HasPrefix(m.Name, "/") { return errors.New("name does not start with a slash") } if m.Username != "" && m.Password == "" { return errors.New("username is set but password is empty") } if m.Username == "" && m.Password != "" { return errors.New("password is set but username is empty") } if m.RelayUrl != "" && m.Transcoding != nil { return errors.New("RelayUrl and Transcoding can't both be set") } if m.Transcoding != nil { if err := m.Transcoding.Valid(); err != nil { return fmt.Errorf("invalid encoding parameters: %v", err) } } return nil } // Equal returns true if the two mounts have the same configuration. func (m *Mount) Equal(other *Mount) bool { return (m.Name == other.Name) && (m.Username == other.Username) && (m.Password == other.Password) && (m.RelayUrl == other.RelayUrl) && (m.Fallback == other.Fallback) && ((m.Transcoding == nil && other.Transcoding == nil) || (m.Transcoding != nil && other.Transcoding != nil && *m.Transcoding == *other.Transcoding)) } // IsRelay returns true if the stream is configured as a relay of an // external source. func (m *Mount) IsRelay() bool { return m.RelayUrl != "" } // HasTranscoder returns true if the stream has transcoding sub-streams. func (m *Mount) HasTranscoder() bool { return m.Transcoding != nil } // Return the path in etcd used to store mountpoint configuration. func mountEtcdPath(mountName string) string { return MountPrefix + mountName[1:] } // MountNameToIcecastPath returns the Icecast mount path for the given // public mount name. func MountNameToIcecastPath(mountName string) string { return IcecastMountPrefix + mountName } // IcecastPathToMountName returns the public mount name from an // Icecast mount path. If 'path' does not start with // IcecastMountPrefix, it is returned unchanged (though arguably this // should be an error). func IcecastPathToMountName(path string) string { return strings.TrimPrefix(path, IcecastMountPrefix) } // IcecastMountStatus has information about a mount on an individual // Icecast server, as provided by Icecast itself. type IcecastMountStatus struct { Name string Listeners int BitRate int Quality float64 VideoQuality float64 FrameSize string FrameRate float64 } // NodeStatus stores runtime information about an autoradio node. This // is used to report load and stream status. type NodeStatus struct { // Short name of this node. Name string // Public IP addresses of this server. IP []net.IP // Internal IP addresses of this server (i.e. reachable from // the other peers of the cluster), if different from IP // above. Can be nil, in which case the value of IP is used. InternalIP []net.IP // Is the Icecast server up? IcecastUp bool // List of mount points. Mounts []IcecastMountStatus `xml:"mount"` // Bandwidth utilization. BandwidthUsage float64 // Maximum number of allowed listeners. MaxListeners int } func (ns *NodeStatus) GetInternalIP() []net.IP { if len(ns.InternalIP) != 0 { return ns.InternalIP } return ns.IP } // NumListeners returns the total number of listeners across all // mounts on this node. func (ns *NodeStatus) NumListeners() int { listeners := 0 for _, m := range ns.Mounts { listeners += m.Listeners } return listeners } // Client is the actual API to the streaming cluster's database. type Client struct { client EtcdClient presenceCache *presence.Cache } // NewClient creates and returns a new Client. func NewClient(client EtcdClient) *Client { // The Client keeps a cache of node presence information, // since it is likely that it will be accessed quite often (in // the case of redirectord, on every request). return &Client{ client: client, presenceCache: presence.NewCache(client, NodePrefix, 2*time.Second, func(data []string) interface{} { // Convert a list of JSON-encoded NodeStatus // objects into a lisce of *NodeStatus // pointers. Since decoding can be a bit // expensive if performed on every query, we // only perform it when the data is updated. tmp := make([]*NodeStatus, 0, len(data)) for _, nodeData := range data { var ns NodeStatus if err := json.NewDecoder(strings.NewReader(nodeData)).Decode(&ns); err == nil { tmp = append(tmp, &ns) } } return tmp }), } } // GetNodes returns the list of active cluster nodes. func (r *Client) GetNodes() ([]*NodeStatus, error) { data, err := r.presenceCache.Data() if err != nil { return nil, err } return data.([]*NodeStatus), nil } // GetMount returns data on a specific mountpoint (returns nil if not // found). func (r *Client) GetMount(mountName string) (*Mount, error) { response, err := r.client.Get(mountEtcdPath(mountName), false, false) if err != nil || response.Node == nil { return nil, err } if response.Node.Dir { return nil, errors.New("key is a directory") } var m Mount if err := json.NewDecoder(strings.NewReader(response.Node.Value)).Decode(&m); err != nil { return nil, err } return &m, nil } // SetMount creates or updates a mountpoint. func (r *Client) SetMount(m *Mount) error { var buf bytes.Buffer if err := json.NewEncoder(&buf).Encode(m); err != nil { return err } _, err := r.client.Set(mountEtcdPath(m.Name), buf.String(), 0) return err } // DelMount removes a mountpoint. func (r *Client) DelMount(mountName string) error { _, err := r.client.Delete(mountEtcdPath(mountName), false) return err } // ListMounts returns a list of all the configured mountpoints. func (r *Client) ListMounts() ([]*Mount, error) { response, err := r.client.Get(MountPrefix, true, false) if err != nil || response.Node == nil { return nil, err } if !response.Node.Dir { return nil, errors.New("key is a file") } result := make([]*Mount, 0, len(response.Node.Nodes)) for _, n := range response.Node.Nodes { if n.Dir { continue } var m Mount if err := json.NewDecoder(strings.NewReader(n.Value)).Decode(&m); err != nil { continue } result = append(result, &m) } return result, nil } // MasterNodeInfo stores location data for the master node. Having the // IP address here saves another round-trip to etcd to retrieve the // node info in the most common case. type MasterNodeInfo struct { // Name of the node. Name string // Public IPs of the node. IP []net.IP // Internal IPs of the node (possibly the same as IP). InternalIP []net.IP } func (m MasterNodeInfo) GetInternalIP() []net.IP { if len(m.InternalIP) != 0 { return m.InternalIP } return m.IP } // GetMasterInfo returns the address of the current master server. func (r *Client) GetMasterInfo() (*MasterNodeInfo, error) { response, err := r.client.Get(MasterElectionPath, false, false) if err != nil || response.Node == nil { return nil, err } if response.Node.Dir { return nil, errors.New("key is a directory") } var m MasterNodeInfo if err := json.NewDecoder(strings.NewReader(response.Node.Value)).Decode(&m); err != nil { return nil, err } return &m, nil } // GeneratePassword returns a new random password. func GeneratePassword() string { b := make([]byte, 6) rand.Read(b) return base64.StdEncoding.EncodeToString(b) } // GenerateUsername returns a username somehow related to the name of // the mount, possibly unique (but not actually guaranteed to be so). func GenerateUsername(path string) string { return fmt.Sprintf("source%d", crc32.ChecksumIEEE([]byte(path))) }