api.go 11.5 KB
Newer Older
ale's avatar
ale committed
1
package autoradio
2 3 4

import (
	"bytes"
5 6
	"crypto/rand"
	"encoding/base64"
7
	"encoding/json"
ale's avatar
ale committed
8
	"errors"
ale's avatar
ale committed
9
	"fmt"
ale's avatar
ale committed
10
	"hash/crc32"
ale's avatar
ale committed
11
	"net"
12
	"strings"
13
	"time"
14 15

	"git.autistici.org/ale/autoradio/coordination/presence"
ale's avatar
ale committed
16 17
)

18
const (
ale's avatar
ale committed
19 20 21
	// MountPrefix stores the path to the mount configuration in
	// etcd. This should never change between releases, upgrades
	// to the configuration format should be backwards-compatible.
22 23
	MountPrefix = "/icecast/mounts/"

ale's avatar
ale committed
24
	// Paths for the cluster runtime data. Whenever the format of
25 26 27 28
	// this data changes, the ABIVersion should be increased. A
	// rolling restart of the cluster will then seamlessly cause a
	// transition to the new consensus (the cluster will be
	// partitioned in the meantime).
ale's avatar
ale committed
29 30 31 32
	ABIVersion                   = "3"
	MasterElectionPath           = "/icecast/" + ABIVersion + "/cluster/master"
	TranscoderMasterElectionBase = "/icecast/" + ABIVersion + "/transcode"
	NodePrefix                   = "/icecast/" + ABIVersion + "/nodes/"
ale's avatar
ale committed
33

34
	IcecastMountPrefix = "/_stream"
35
)
ale's avatar
ale committed
36

37 38 39 40
// IcecastPort is the port that the Icecast server will listen
// on. Since we fully manage the system-wide Icecast instance,
// there's not much point in making this configurable.
var IcecastPort = 8000
41

ale's avatar
ale committed
42
// EncodingParams used to re-encode a stream.
ale's avatar
ale committed
43 44 45 46 47 48 49 50 51 52 53
type EncodingParams struct {
	// Path to the source mountpoint.
	SourceName string

	// Parameters for the transcoded stream. The value format is
	// anything that liquidsoap will accept in its 'output'
	// directive.
	Format     string
	BitRate    int
	SampleRate int
	Channels   int
54
	StereoMode string
ale's avatar
ale committed
55 56 57
	Quality    float64
}

ale's avatar
ale committed
58 59
// NewEncodingParams creates an EncodingParams object with the right
// default values.
60 61 62 63 64 65 66 67
func NewEncodingParams() *EncodingParams {
	return &EncodingParams{
		SampleRate: 44100,
		Channels:   2,
		Quality:    -1,
	}
}

ale's avatar
ale committed
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
func (p *EncodingParams) String() string {
	var out []string
	out = append(out, p.Format)
	if p.BitRate > 0 {
		out = append(out, fmt.Sprintf("%dkBps", p.BitRate))
	}
	if p.Quality > -1 {
		out = append(out, fmt.Sprintf("q=%g", p.Quality))
	}
	switch p.Channels {
	case 1:
		out = append(out, "mono")
	case 2:
		out = append(out, "stereo")
	}
	if p.SampleRate > 0 {
ale's avatar
ale committed
84
		out = append(out, fmt.Sprintf("%gkHz", float64(p.SampleRate)/1000))
ale's avatar
ale committed
85 86 87 88
	}
	return strings.Join(out, ", ")
}

ale's avatar
ale committed
89 90
// Valid returns true if the EncodingParams seem to make sense. We try
// to be as close to the liquidsoap capabilities as possible.
ale's avatar
ale committed
91
func (p *EncodingParams) Valid() error {
92 93 94 95 96 97 98 99 100 101 102 103 104 105
	switch p.Format {
	case "mp3", "mp3.cbr", "mp3.abr", "vorbis.cbr", "vorbis.abr":
		if p.BitRate == 0 {
			return errors.New("bitrate not specified")
		}
	case "mp3.vbr":
		if p.Quality < 0 || p.Quality > 9 {
			return errors.New("quality must be in range [0, 9]")
		}
	case "vorbis":
		if p.Quality < -0.2 || p.Quality > 1 {
			return errors.New("quality must be in range [-0.2, 1]")
		}
	case "":
ale's avatar
ale committed
106
		return errors.New("format not specified")
107 108
	default:
		return fmt.Errorf("unknown format \"%s\"", p.Format)
ale's avatar
ale committed
109 110 111 112
	}
	if p.SampleRate == 0 {
		return errors.New("sample rate not specified")
	}
113
	if p.Channels < 1 || p.Channels > 2 {
ale's avatar
ale committed
114 115
		return errors.New("bad number of channels")
	}
116 117 118 119 120 121 122
	if p.Channels > 1 {
		switch p.StereoMode {
		case "", "stereo", "joint_stereo", "default":
		default:
			return fmt.Errorf("unknown stereo mode \"%s\"", p.StereoMode)
		}
	}
ale's avatar
ale committed
123 124 125
	return nil
}

ale's avatar
ale committed
126
// Mount stores the configuration for a stream (mount, in Icecast lingo).
ale's avatar
ale committed
127 128 129 130 131 132 133 134 135 136
type Mount struct {
	// Name (path to the mountpoint).
	Name string

	// Username for source authentication.
	Username string

	// Password for source authentication.
	Password string

137 138 139 140 141 142
	// Is this field is set, the mount point will be relaying an
	// external stream and no source connections will be accepted.
	// Each node will pull from the external source directly,
	// ignoring our master election protocol.
	RelayUrl string

ale's avatar
ale committed
143 144
	// Fallback stream name (optional).
	Fallback string
ale's avatar
ale committed
145 146 147 148 149 150

	// If Transcoding is non-nil, this mountpoint represents a
	// transcoded stream.
	Transcoding *EncodingParams
}

ale's avatar
ale committed
151 152
// Valid performs a consistency check and returns true if the
// configuration for the stream is correct.
ale's avatar
ale committed
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
func (m *Mount) Valid() error {
	if !strings.HasPrefix(m.Name, "/") {
		return errors.New("name does not start with a slash")
	}
	if m.Username != "" && m.Password == "" {
		return errors.New("username is set but password is empty")
	}
	if m.Username == "" && m.Password != "" {
		return errors.New("password is set but username is empty")
	}
	if m.RelayUrl != "" && m.Transcoding != nil {
		return errors.New("RelayUrl and Transcoding can't both be set")
	}
	if m.Transcoding != nil {
		if err := m.Transcoding.Valid(); err != nil {
			return fmt.Errorf("invalid encoding parameters: %v", err)
		}
	}
	return nil
ale's avatar
ale committed
172 173
}

ale's avatar
ale committed
174
// Equal returns true if the two mounts have the same configuration.
ale's avatar
ale committed
175
func (m *Mount) Equal(other *Mount) bool {
ale's avatar
ale committed
176
	return (m.Name == other.Name) && (m.Username == other.Username) && (m.Password == other.Password) && (m.RelayUrl == other.RelayUrl) && (m.Fallback == other.Fallback) && ((m.Transcoding == nil && other.Transcoding == nil) || (m.Transcoding != nil && other.Transcoding != nil && *m.Transcoding == *other.Transcoding))
ale's avatar
ale committed
177 178
}

ale's avatar
ale committed
179 180
// IsRelay returns true if the stream is configured as a relay of an
// external source.
181 182 183 184
func (m *Mount) IsRelay() bool {
	return m.RelayUrl != ""
}

ale's avatar
ale committed
185
// HasTranscoder returns true if the stream has transcoding sub-streams.
ale's avatar
ale committed
186 187 188 189
func (m *Mount) HasTranscoder() bool {
	return m.Transcoding != nil
}

190 191
// Return the path in etcd used to store mountpoint configuration.
func mountEtcdPath(mountName string) string {
ale's avatar
ale committed
192
	return MountPrefix + mountName[1:]
193 194
}

ale's avatar
ale committed
195 196
// MountNameToIcecastPath returns the Icecast mount path for the given
// public mount name.
197 198 199 200
func MountNameToIcecastPath(mountName string) string {
	return IcecastMountPrefix + mountName
}

ale's avatar
ale committed
201 202 203 204
// IcecastPathToMountName returns the public mount name from an
// Icecast mount path. If 'path' does not start with
// IcecastMountPrefix, it is returned unchanged (though arguably this
// should be an error).
205 206 207 208
func IcecastPathToMountName(path string) string {
	return strings.TrimPrefix(path, IcecastMountPrefix)
}

ale's avatar
ale committed
209 210
// IcecastMountStatus has information about a mount on an individual
// Icecast server, as provided by Icecast itself.
211
type IcecastMountStatus struct {
212 213 214 215 216 217 218
	Name         string
	Listeners    int
	BitRate      int
	Quality      float64
	VideoQuality float64
	FrameSize    string
	FrameRate    float64
219 220
}

ale's avatar
ale committed
221 222
// NodeStatus stores runtime information about an autoradio node. This
// is used to report load and stream status.
223
type NodeStatus struct {
ale's avatar
ale committed
224 225 226 227 228
	// Short name of this node.
	Name string

	// Public IP addresses of this server.
	IP []net.IP
229

ale's avatar
ale committed
230 231 232 233 234
	// Internal IP addresses of this server (i.e. reachable from
	// the other peers of the cluster), if different from IP
	// above. Can be nil, in which case the value of IP is used.
	InternalIP []net.IP

235 236 237
	// Is the Icecast server up?
	IcecastUp bool

238
	// List of mount points.
239
	Mounts []IcecastMountStatus `xml:"mount"`
240 241 242

	// Bandwidth utilization.
	BandwidthUsage float64
243 244 245

	// Maximum number of allowed listeners.
	MaxListeners int
246 247
}

ale's avatar
ale committed
248 249 250 251 252 253 254
func (ns *NodeStatus) GetInternalIP() []net.IP {
	if len(ns.InternalIP) != 0 {
		return ns.InternalIP
	}
	return ns.IP
}

255 256
// NumListeners returns the total number of listeners across all
// mounts on this node.
257 258 259 260 261 262 263 264
func (ns *NodeStatus) NumListeners() int {
	listeners := 0
	for _, m := range ns.Mounts {
		listeners += m.Listeners
	}
	return listeners
}

265 266
// Client is the actual API to the streaming cluster's database.
type Client struct {
ale's avatar
ale committed
267
	EtcdClient    EtcdClient
268
	presenceCache *presence.Cache
269 270
}

ale's avatar
ale committed
271
// NewClient creates and returns a new Client.
272
func NewClient(client EtcdClient) *Client {
273 274 275 276
	// The Client keeps a cache of node presence information,
	// since it is likely that it will be accessed quite often (in
	// the case of redirectord, on every request).
	return &Client{
ale's avatar
ale committed
277
		EtcdClient: client,
278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295
		presenceCache: presence.NewCache(client, NodePrefix, 2*time.Second, func(data []string) interface{} {
			// Convert a list of JSON-encoded NodeStatus
			// objects into a lisce of *NodeStatus
			// pointers. Since decoding can be a bit
			// expensive if performed on every query, we
			// only perform it when the data is updated.
			tmp := make([]*NodeStatus, 0, len(data))
			for _, nodeData := range data {
				var ns NodeStatus
				if err := json.NewDecoder(strings.NewReader(nodeData)).Decode(&ns); err == nil {
					tmp = append(tmp, &ns)
				}
			}
			return tmp
		}),
	}
}

ale's avatar
ale committed
296 297 298 299 300
// WaitForNodes waits until the node presence cache is initialized.
func (r *Client) WaitForNodes() {
	r.presenceCache.WaitForInit()
}

301 302 303 304 305 306 307
// GetNodes returns the list of active cluster nodes.
func (r *Client) GetNodes() ([]*NodeStatus, error) {
	data, err := r.presenceCache.Data()
	if err != nil {
		return nil, err
	}
	return data.([]*NodeStatus), nil
308 309 310 311
}

// GetMount returns data on a specific mountpoint (returns nil if not
// found).
312
func (r *Client) GetMount(mountName string) (*Mount, error) {
ale's avatar
ale committed
313
	response, err := r.EtcdClient.Get(mountEtcdPath(mountName), false, false)
ale's avatar
ale committed
314
	if err != nil || response.Node == nil {
315 316
		return nil, err
	}
ale's avatar
ale committed
317
	if response.Node.Dir {
318
		return nil, errors.New("key is a directory")
319 320 321
	}

	var m Mount
ale's avatar
ale committed
322
	if err := json.NewDecoder(strings.NewReader(response.Node.Value)).Decode(&m); err != nil {
323 324 325 326 327 328
		return nil, err
	}
	return &m, nil
}

// SetMount creates or updates a mountpoint.
329
func (r *Client) SetMount(m *Mount) error {
330 331 332 333 334
	var buf bytes.Buffer
	if err := json.NewEncoder(&buf).Encode(m); err != nil {
		return err
	}

ale's avatar
ale committed
335
	_, err := r.EtcdClient.Set(mountEtcdPath(m.Name), buf.String(), 0)
336 337 338 339
	return err
}

// DelMount removes a mountpoint.
340
func (r *Client) DelMount(mountName string) error {
ale's avatar
ale committed
341
	_, err := r.EtcdClient.Delete(mountEtcdPath(mountName), false)
342 343 344 345
	return err
}

// ListMounts returns a list of all the configured mountpoints.
346
func (r *Client) ListMounts() ([]*Mount, error) {
ale's avatar
ale committed
347
	response, err := r.EtcdClient.Get(MountPrefix, true, false)
ale's avatar
ale committed
348
	if err != nil || response.Node == nil {
349 350
		return nil, err
	}
ale's avatar
ale committed
351
	if !response.Node.Dir {
352
		return nil, errors.New("key is a file")
ale's avatar
ale committed
353 354
	}

ale's avatar
ale committed
355 356 357
	result := make([]*Mount, 0, len(response.Node.Nodes))
	for _, n := range response.Node.Nodes {
		if n.Dir {
ale's avatar
ale committed
358 359
			continue
		}
360
		var m Mount
ale's avatar
ale committed
361
		if err := json.NewDecoder(strings.NewReader(n.Value)).Decode(&m); err != nil {
362 363 364 365 366 367 368
			continue
		}
		result = append(result, &m)
	}
	return result, nil
}

ale's avatar
ale committed
369 370 371
// MasterNodeInfo stores location data for the master node. Having the
// IP address here saves another round-trip to etcd to retrieve the
// node info in the most common case.
ale's avatar
ale committed
372
type MasterNodeInfo struct {
ale's avatar
ale committed
373
	// Name of the node.
ale's avatar
ale committed
374
	Name string
ale's avatar
ale committed
375 376 377 378 379 380 381 382 383 384 385 386 387

	// Public IPs of the node.
	IP []net.IP

	// Internal IPs of the node (possibly the same as IP).
	InternalIP []net.IP
}

func (m MasterNodeInfo) GetInternalIP() []net.IP {
	if len(m.InternalIP) != 0 {
		return m.InternalIP
	}
	return m.IP
ale's avatar
ale committed
388 389
}

ale's avatar
ale committed
390
// GetMasterInfo returns the address of the current master server.
391
func (r *Client) GetMasterInfo() (*MasterNodeInfo, error) {
ale's avatar
ale committed
392
	response, err := r.EtcdClient.Get(MasterElectionPath, false, false)
ale's avatar
ale committed
393
	if err != nil || response.Node == nil {
ale's avatar
ale committed
394
		return nil, err
395
	}
ale's avatar
ale committed
396
	if response.Node.Dir {
397
		return nil, errors.New("key is a directory")
398
	}
ale's avatar
ale committed
399 400 401 402 403
	var m MasterNodeInfo
	if err := json.NewDecoder(strings.NewReader(response.Node.Value)).Decode(&m); err != nil {
		return nil, err
	}
	return &m, nil
404 405
}

406 407 408 409 410 411
// GeneratePassword returns a new random password.
func GeneratePassword() string {
	b := make([]byte, 6)
	rand.Read(b)
	return base64.StdEncoding.EncodeToString(b)
}
ale's avatar
ale committed
412 413 414 415 416 417

// GenerateUsername returns a username somehow related to the name of
// the mount, possibly unique (but not actually guaranteed to be so).
func GenerateUsername(path string) string {
	return fmt.Sprintf("source%d", crc32.ChecksumIEEE([]byte(path)))
}