api.go 11 KB
Newer Older
ale's avatar
ale committed
1
package autoradio
2 3 4

import (
	"bytes"
5 6
	"crypto/rand"
	"encoding/base64"
7
	"encoding/json"
8
	"errors"
ale's avatar
ale committed
9
	"fmt"
ale's avatar
ale committed
10
	"net"
11
	"strings"
12
	"time"
13 14

	"git.autistici.org/ale/autoradio/coordination/presence"
15 16
)

17
const (
ale's avatar
ale committed
18 19 20
	// MountPrefix stores the path to the mount configuration in
	// etcd. This should never change between releases, upgrades
	// to the configuration format should be backwards-compatible.
21 22
	MountPrefix = "/icecast/mounts/"

ale's avatar
ale committed
23
	// Paths for the cluster runtime data. Whenever the format of
24 25 26 27
	// this data changes, the ABIVersion should be increased. A
	// rolling restart of the cluster will then seamlessly cause a
	// transition to the new consensus (the cluster will be
	// partitioned in the meantime).
ale's avatar
ale committed
28 29 30 31
	ABIVersion                   = "3"
	MasterElectionPath           = "/icecast/" + ABIVersion + "/cluster/master"
	TranscoderMasterElectionBase = "/icecast/" + ABIVersion + "/transcode"
	NodePrefix                   = "/icecast/" + ABIVersion + "/nodes/"
ale's avatar
ale committed
32

33
	IcecastMountPrefix = "/_stream"
34
)
ale's avatar
ale committed
35

36 37 38 39
// IcecastPort is the port that the Icecast server will listen
// on. Since we fully manage the system-wide Icecast instance,
// there's not much point in making this configurable.
var IcecastPort = 8000
40

ale's avatar
ale committed
41
// EncodingParams used to re-encode a stream.
ale's avatar
ale committed
42 43 44 45 46 47 48 49 50 51 52
type EncodingParams struct {
	// Path to the source mountpoint.
	SourceName string

	// Parameters for the transcoded stream. The value format is
	// anything that liquidsoap will accept in its 'output'
	// directive.
	Format     string
	BitRate    int
	SampleRate int
	Channels   int
53
	StereoMode string
ale's avatar
ale committed
54 55 56
	Quality    float64
}

ale's avatar
ale committed
57 58
// NewEncodingParams creates an EncodingParams object with the right
// default values.
59 60 61 62 63 64 65 66
func NewEncodingParams() *EncodingParams {
	return &EncodingParams{
		SampleRate: 44100,
		Channels:   2,
		Quality:    -1,
	}
}

ale's avatar
ale committed
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
func (p *EncodingParams) String() string {
	var out []string
	out = append(out, p.Format)
	if p.BitRate > 0 {
		out = append(out, fmt.Sprintf("%dkBps", p.BitRate))
	}
	if p.Quality > -1 {
		out = append(out, fmt.Sprintf("q=%g", p.Quality))
	}
	switch p.Channels {
	case 1:
		out = append(out, "mono")
	case 2:
		out = append(out, "stereo")
	}
	if p.SampleRate > 0 {
ale's avatar
ale committed
83
		out = append(out, fmt.Sprintf("%gkHz", float64(p.SampleRate)/1000))
ale's avatar
ale committed
84 85 86 87
	}
	return strings.Join(out, ", ")
}

ale's avatar
ale committed
88 89
// Valid returns true if the EncodingParams seem to make sense. We try
// to be as close to the liquidsoap capabilities as possible.
ale's avatar
ale committed
90
func (p *EncodingParams) Valid() error {
91 92 93 94 95 96 97 98 99 100 101 102 103 104
	switch p.Format {
	case "mp3", "mp3.cbr", "mp3.abr", "vorbis.cbr", "vorbis.abr":
		if p.BitRate == 0 {
			return errors.New("bitrate not specified")
		}
	case "mp3.vbr":
		if p.Quality < 0 || p.Quality > 9 {
			return errors.New("quality must be in range [0, 9]")
		}
	case "vorbis":
		if p.Quality < -0.2 || p.Quality > 1 {
			return errors.New("quality must be in range [-0.2, 1]")
		}
	case "":
ale's avatar
ale committed
105
		return errors.New("format not specified")
106 107
	default:
		return fmt.Errorf("unknown format \"%s\"", p.Format)
ale's avatar
ale committed
108 109 110 111
	}
	if p.SampleRate == 0 {
		return errors.New("sample rate not specified")
	}
112
	if p.Channels < 1 || p.Channels > 2 {
ale's avatar
ale committed
113 114
		return errors.New("bad number of channels")
	}
115 116 117 118 119 120 121
	if p.Channels > 1 {
		switch p.StereoMode {
		case "", "stereo", "joint_stereo", "default":
		default:
			return fmt.Errorf("unknown stereo mode \"%s\"", p.StereoMode)
		}
	}
ale's avatar
ale committed
122 123 124
	return nil
}

ale's avatar
ale committed
125
// Mount stores the configuration for a stream (mount, in Icecast lingo).
126 127 128 129 130 131 132 133 134 135
type Mount struct {
	// Name (path to the mountpoint).
	Name string

	// Username for source authentication.
	Username string

	// Password for source authentication.
	Password string

136 137 138 139 140 141
	// Is this field is set, the mount point will be relaying an
	// external stream and no source connections will be accepted.
	// Each node will pull from the external source directly,
	// ignoring our master election protocol.
	RelayUrl string

142 143
	// Fallback stream name (optional).
	Fallback string
ale's avatar
ale committed
144 145 146 147 148 149

	// If Transcoding is non-nil, this mountpoint represents a
	// transcoded stream.
	Transcoding *EncodingParams
}

ale's avatar
ale committed
150 151
// Valid performs a consistency check and returns true if the
// configuration for the stream is correct.
ale's avatar
ale committed
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
func (m *Mount) Valid() error {
	if !strings.HasPrefix(m.Name, "/") {
		return errors.New("name does not start with a slash")
	}
	if m.Username != "" && m.Password == "" {
		return errors.New("username is set but password is empty")
	}
	if m.Username == "" && m.Password != "" {
		return errors.New("password is set but username is empty")
	}
	if m.RelayUrl != "" && m.Transcoding != nil {
		return errors.New("RelayUrl and Transcoding can't both be set")
	}
	if m.Transcoding != nil {
		if err := m.Transcoding.Valid(); err != nil {
			return fmt.Errorf("invalid encoding parameters: %v", err)
		}
	}
	return nil
171 172
}

ale's avatar
ale committed
173
// Equal returns true if the two mounts have the same configuration.
ale's avatar
ale committed
174
func (m *Mount) Equal(other *Mount) bool {
ale's avatar
ale committed
175
	return (m.Name == other.Name) && (m.Username == other.Username) && (m.Password == other.Password) && (m.RelayUrl == other.RelayUrl) && (m.Fallback == other.Fallback) && ((m.Transcoding == nil && other.Transcoding == nil) || (m.Transcoding != nil && other.Transcoding != nil && *m.Transcoding == *other.Transcoding))
ale's avatar
ale committed
176 177
}

ale's avatar
ale committed
178 179
// IsRelay returns true if the stream is configured as a relay of an
// external source.
180 181 182 183
func (m *Mount) IsRelay() bool {
	return m.RelayUrl != ""
}

ale's avatar
ale committed
184
// HasTranscoder returns true if the stream has transcoding sub-streams.
ale's avatar
ale committed
185 186 187 188
func (m *Mount) HasTranscoder() bool {
	return m.Transcoding != nil
}

189 190
// Return the path in etcd used to store mountpoint configuration.
func mountEtcdPath(mountName string) string {
ale's avatar
ale committed
191
	return MountPrefix + mountName[1:]
192 193
}

ale's avatar
ale committed
194 195
// MountNameToIcecastPath returns the Icecast mount path for the given
// public mount name.
196 197 198 199
func MountNameToIcecastPath(mountName string) string {
	return IcecastMountPrefix + mountName
}

ale's avatar
ale committed
200 201 202 203
// IcecastPathToMountName returns the public mount name from an
// Icecast mount path. If 'path' does not start with
// IcecastMountPrefix, it is returned unchanged (though arguably this
// should be an error).
204 205 206 207
func IcecastPathToMountName(path string) string {
	return strings.TrimPrefix(path, IcecastMountPrefix)
}

ale's avatar
ale committed
208 209
// IcecastMountStatus has information about a mount on an individual
// Icecast server, as provided by Icecast itself.
210
type IcecastMountStatus struct {
211 212 213 214 215 216 217
	Name         string
	Listeners    int
	BitRate      int
	Quality      float64
	VideoQuality float64
	FrameSize    string
	FrameRate    float64
218 219
}

ale's avatar
ale committed
220 221
// NodeStatus stores runtime information about an autoradio node. This
// is used to report load and stream status.
222
type NodeStatus struct {
ale's avatar
ale committed
223 224 225 226 227
	// Short name of this node.
	Name string

	// Public IP addresses of this server.
	IP []net.IP
228

ale's avatar
ale committed
229 230 231 232 233
	// Internal IP addresses of this server (i.e. reachable from
	// the other peers of the cluster), if different from IP
	// above. Can be nil, in which case the value of IP is used.
	InternalIP []net.IP

234 235 236
	// Is the Icecast server up?
	IcecastUp bool

237
	// List of mount points.
238
	Mounts []IcecastMountStatus `xml:"mount"`
239 240 241

	// Bandwidth utilization.
	BandwidthUsage float64
242 243 244

	// Maximum number of allowed listeners.
	MaxListeners int
245 246
}

ale's avatar
ale committed
247 248 249 250 251 252 253
func (ns *NodeStatus) GetInternalIP() []net.IP {
	if len(ns.InternalIP) != 0 {
		return ns.InternalIP
	}
	return ns.IP
}

254 255
// NumListeners returns the total number of listeners across all
// mounts on this node.
256 257 258 259 260 261 262 263
func (ns *NodeStatus) NumListeners() int {
	listeners := 0
	for _, m := range ns.Mounts {
		listeners += m.Listeners
	}
	return listeners
}

264 265
// Client is the actual API to the streaming cluster's database.
type Client struct {
266 267
	client        EtcdClient
	presenceCache *presence.Cache
268 269
}

ale's avatar
ale committed
270
// NewClient creates and returns a new Client.
271
func NewClient(client EtcdClient) *Client {
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301
	// The Client keeps a cache of node presence information,
	// since it is likely that it will be accessed quite often (in
	// the case of redirectord, on every request).
	return &Client{
		client: client,
		presenceCache: presence.NewCache(client, NodePrefix, 2*time.Second, func(data []string) interface{} {
			// Convert a list of JSON-encoded NodeStatus
			// objects into a lisce of *NodeStatus
			// pointers. Since decoding can be a bit
			// expensive if performed on every query, we
			// only perform it when the data is updated.
			tmp := make([]*NodeStatus, 0, len(data))
			for _, nodeData := range data {
				var ns NodeStatus
				if err := json.NewDecoder(strings.NewReader(nodeData)).Decode(&ns); err == nil {
					tmp = append(tmp, &ns)
				}
			}
			return tmp
		}),
	}
}

// GetNodes returns the list of active cluster nodes.
func (r *Client) GetNodes() ([]*NodeStatus, error) {
	data, err := r.presenceCache.Data()
	if err != nil {
		return nil, err
	}
	return data.([]*NodeStatus), nil
302 303 304 305
}

// GetMount returns data on a specific mountpoint (returns nil if not
// found).
306
func (r *Client) GetMount(mountName string) (*Mount, error) {
307
	response, err := r.client.Get(mountEtcdPath(mountName), false, false)
ale's avatar
ale committed
308
	if err != nil || response.Node == nil {
309 310
		return nil, err
	}
ale's avatar
ale committed
311
	if response.Node.Dir {
312
		return nil, errors.New("key is a directory")
313 314 315
	}

	var m Mount
ale's avatar
ale committed
316
	if err := json.NewDecoder(strings.NewReader(response.Node.Value)).Decode(&m); err != nil {
317 318 319 320 321 322
		return nil, err
	}
	return &m, nil
}

// SetMount creates or updates a mountpoint.
323
func (r *Client) SetMount(m *Mount) error {
324 325 326 327 328
	var buf bytes.Buffer
	if err := json.NewEncoder(&buf).Encode(m); err != nil {
		return err
	}

329
	_, err := r.client.Set(mountEtcdPath(m.Name), buf.String(), 0)
330 331 332 333
	return err
}

// DelMount removes a mountpoint.
334
func (r *Client) DelMount(mountName string) error {
335
	_, err := r.client.Delete(mountEtcdPath(mountName), false)
336 337 338 339
	return err
}

// ListMounts returns a list of all the configured mountpoints.
340
func (r *Client) ListMounts() ([]*Mount, error) {
ale's avatar
ale committed
341 342
	response, err := r.client.Get(MountPrefix, true, false)
	if err != nil || response.Node == nil {
343 344
		return nil, err
	}
ale's avatar
ale committed
345
	if !response.Node.Dir {
346
		return nil, errors.New("key is a file")
ale's avatar
ale committed
347 348
	}

ale's avatar
ale committed
349 350 351
	result := make([]*Mount, 0, len(response.Node.Nodes))
	for _, n := range response.Node.Nodes {
		if n.Dir {
ale's avatar
ale committed
352 353
			continue
		}
354
		var m Mount
ale's avatar
ale committed
355
		if err := json.NewDecoder(strings.NewReader(n.Value)).Decode(&m); err != nil {
356 357 358 359 360 361 362
			continue
		}
		result = append(result, &m)
	}
	return result, nil
}

ale's avatar
ale committed
363 364 365
// MasterNodeInfo stores location data for the master node. Having the
// IP address here saves another round-trip to etcd to retrieve the
// node info in the most common case.
ale's avatar
ale committed
366
type MasterNodeInfo struct {
ale's avatar
ale committed
367
	// Name of the node.
ale's avatar
ale committed
368
	Name string
ale's avatar
ale committed
369 370 371 372 373 374 375 376 377 378 379 380 381

	// Public IPs of the node.
	IP []net.IP

	// Internal IPs of the node (possibly the same as IP).
	InternalIP []net.IP
}

func (m MasterNodeInfo) GetInternalIP() []net.IP {
	if len(m.InternalIP) != 0 {
		return m.InternalIP
	}
	return m.IP
ale's avatar
ale committed
382 383
}

ale's avatar
ale committed
384
// GetMasterInfo returns the address of the current master server.
385
func (r *Client) GetMasterInfo() (*MasterNodeInfo, error) {
ale's avatar
ale committed
386 387
	response, err := r.client.Get(MasterElectionPath, false, false)
	if err != nil || response.Node == nil {
ale's avatar
ale committed
388
		return nil, err
389
	}
ale's avatar
ale committed
390
	if response.Node.Dir {
391
		return nil, errors.New("key is a directory")
392
	}
ale's avatar
ale committed
393 394 395 396 397
	var m MasterNodeInfo
	if err := json.NewDecoder(strings.NewReader(response.Node.Value)).Decode(&m); err != nil {
		return nil, err
	}
	return &m, nil
398 399
}

400 401 402 403 404 405
// GeneratePassword returns a new random password.
func GeneratePassword() string {
	b := make([]byte, 6)
	rand.Read(b)
	return base64.StdEncoding.EncodeToString(b)
}