package autoradio

import (
	"bytes"
	"crypto/rand"
	"encoding/base64"
	"encoding/json"
	"errors"
	"fmt"
	"hash/crc32"
	"net"
	"strings"
	"time"

	"git.autistici.org/ale/autoradio/coordination/presence"
)

const (
	// MountPrefix stores the path to the mount configuration in
	// etcd. This should never change between releases, upgrades
	// to the configuration format should be backwards-compatible.
	MountPrefix = "/icecast/mounts/"

	// Paths for the cluster runtime data. Whenever the format of
	// this data changes, the ABIVersion should be increased. A
	// rolling restart of the cluster will then seamlessly cause a
	// transition to the new consensus (the cluster will be
	// partitioned in the meantime).
	ABIVersion                   = "3"
	MasterElectionPath           = "/icecast/" + ABIVersion + "/cluster/master"
	TranscoderMasterElectionBase = "/icecast/" + ABIVersion + "/transcode"
	NodePrefix                   = "/icecast/" + ABIVersion + "/nodes/"

	IcecastMountPrefix = "/_stream"
)

// IcecastPort is the port that the Icecast server will listen
// on. Since we fully manage the system-wide Icecast instance,
// there's not much point in making this configurable.
var IcecastPort = 8000

// EncodingParams used to re-encode a stream.
type EncodingParams struct {
	// Path to the source mountpoint.
	SourceName string

	// Parameters for the transcoded stream. The value format is
	// anything that liquidsoap will accept in its 'output'
	// directive.
	Format     string
	BitRate    int
	SampleRate int
	Channels   int
	StereoMode string
	Quality    float64
}

// NewEncodingParams creates an EncodingParams object with the right
// default values.
func NewEncodingParams() *EncodingParams {
	return &EncodingParams{
		SampleRate: 44100,
		Channels:   2,
		Quality:    -1,
	}
}

func (p *EncodingParams) String() string {
	var out []string
	out = append(out, p.Format)
	if p.BitRate > 0 {
		out = append(out, fmt.Sprintf("%dkBps", p.BitRate))
	}
	if p.Quality > -1 {
		out = append(out, fmt.Sprintf("q=%g", p.Quality))
	}
	switch p.Channels {
	case 1:
		out = append(out, "mono")
	case 2:
		out = append(out, "stereo")
	}
	if p.SampleRate > 0 {
		out = append(out, fmt.Sprintf("%gkHz", float64(p.SampleRate)/1000))
	}
	return strings.Join(out, ", ")
}

// Valid returns true if the EncodingParams seem to make sense. We try
// to be as close to the liquidsoap capabilities as possible.
func (p *EncodingParams) Valid() error {
	switch p.Format {
	case "mp3", "mp3.cbr", "mp3.abr", "vorbis.cbr", "vorbis.abr":
		if p.BitRate == 0 {
			return errors.New("bitrate not specified")
		}
	case "mp3.vbr":
		if p.Quality < 0 || p.Quality > 9 {
			return errors.New("quality must be in range [0, 9]")
		}
	case "vorbis":
		if p.Quality < -0.2 || p.Quality > 1 {
			return errors.New("quality must be in range [-0.2, 1]")
		}
	case "":
		return errors.New("format not specified")
	default:
		return fmt.Errorf("unknown format \"%s\"", p.Format)
	}
	if p.SampleRate == 0 {
		return errors.New("sample rate not specified")
	}
	if p.Channels < 1 || p.Channels > 2 {
		return errors.New("bad number of channels")
	}
	if p.Channels > 1 {
		switch p.StereoMode {
		case "", "stereo", "joint_stereo", "default":
		default:
			return fmt.Errorf("unknown stereo mode \"%s\"", p.StereoMode)
		}
	}
	return nil
}

// Mount stores the configuration for a stream (mount, in Icecast lingo).
type Mount struct {
	// Name (path to the mountpoint).
	Name string

	// Username for source authentication.
	Username string

	// Password for source authentication.
	Password string

	// Is this field is set, the mount point will be relaying an
	// external stream and no source connections will be accepted.
	// Each node will pull from the external source directly,
	// ignoring our master election protocol.
	RelayUrl string

	// Fallback stream name (optional).
	Fallback string

	// If Transcoding is non-nil, this mountpoint represents a
	// transcoded stream.
	Transcoding *EncodingParams
}

// Valid performs a consistency check and returns true if the
// configuration for the stream is correct.
func (m *Mount) Valid() error {
	if !strings.HasPrefix(m.Name, "/") {
		return errors.New("name does not start with a slash")
	}
	if m.Username != "" && m.Password == "" {
		return errors.New("username is set but password is empty")
	}
	if m.Username == "" && m.Password != "" {
		return errors.New("password is set but username is empty")
	}
	if m.RelayUrl != "" && m.Transcoding != nil {
		return errors.New("RelayUrl and Transcoding can't both be set")
	}
	if m.Transcoding != nil {
		if err := m.Transcoding.Valid(); err != nil {
			return fmt.Errorf("invalid encoding parameters: %v", err)
		}
	}
	return nil
}

// Equal returns true if the two mounts have the same configuration.
func (m *Mount) Equal(other *Mount) bool {
	return (m.Name == other.Name) && (m.Username == other.Username) && (m.Password == other.Password) && (m.RelayUrl == other.RelayUrl) && (m.Fallback == other.Fallback) && ((m.Transcoding == nil && other.Transcoding == nil) || (m.Transcoding != nil && other.Transcoding != nil && *m.Transcoding == *other.Transcoding))
}

// IsRelay returns true if the stream is configured as a relay of an
// external source.
func (m *Mount) IsRelay() bool {
	return m.RelayUrl != ""
}

// HasTranscoder returns true if the stream has transcoding sub-streams.
func (m *Mount) HasTranscoder() bool {
	return m.Transcoding != nil
}

// Return the path in etcd used to store mountpoint configuration.
func mountEtcdPath(mountName string) string {
	return MountPrefix + mountName[1:]
}

// MountNameToIcecastPath returns the Icecast mount path for the given
// public mount name.
func MountNameToIcecastPath(mountName string) string {
	return IcecastMountPrefix + mountName
}

// IcecastPathToMountName returns the public mount name from an
// Icecast mount path. If 'path' does not start with
// IcecastMountPrefix, it is returned unchanged (though arguably this
// should be an error).
func IcecastPathToMountName(path string) string {
	return strings.TrimPrefix(path, IcecastMountPrefix)
}

// IcecastMountStatus has information about a mount on an individual
// Icecast server, as provided by Icecast itself.
type IcecastMountStatus struct {
	Name         string
	Listeners    int
	BitRate      int
	Quality      float64
	VideoQuality float64
	FrameSize    string
	FrameRate    float64
}

// NodeStatus stores runtime information about an autoradio node. This
// is used to report load and stream status.
type NodeStatus struct {
	// Short name of this node.
	Name string

	// Public IP addresses of this server.
	IP []net.IP

	// Internal IP addresses of this server (i.e. reachable from
	// the other peers of the cluster), if different from IP
	// above. Can be nil, in which case the value of IP is used.
	InternalIP []net.IP

	// Is the Icecast server up?
	IcecastUp bool

	// List of mount points.
	Mounts []IcecastMountStatus `xml:"mount"`

	// Bandwidth utilization.
	BandwidthUsage float64

	// Maximum number of allowed listeners.
	MaxListeners int
}

func (ns *NodeStatus) GetInternalIP() []net.IP {
	if len(ns.InternalIP) != 0 {
		return ns.InternalIP
	}
	return ns.IP
}

// NumListeners returns the total number of listeners across all
// mounts on this node.
func (ns *NodeStatus) NumListeners() int {
	listeners := 0
	for _, m := range ns.Mounts {
		listeners += m.Listeners
	}
	return listeners
}

// Client is the actual API to the streaming cluster's database.
type Client struct {
	client        EtcdClient
	presenceCache *presence.Cache
}

// NewClient creates and returns a new Client.
func NewClient(client EtcdClient) *Client {
	// The Client keeps a cache of node presence information,
	// since it is likely that it will be accessed quite often (in
	// the case of redirectord, on every request).
	return &Client{
		client: client,
		presenceCache: presence.NewCache(client, NodePrefix, 2*time.Second, func(data []string) interface{} {
			// Convert a list of JSON-encoded NodeStatus
			// objects into a lisce of *NodeStatus
			// pointers. Since decoding can be a bit
			// expensive if performed on every query, we
			// only perform it when the data is updated.
			tmp := make([]*NodeStatus, 0, len(data))
			for _, nodeData := range data {
				var ns NodeStatus
				if err := json.NewDecoder(strings.NewReader(nodeData)).Decode(&ns); err == nil {
					tmp = append(tmp, &ns)
				}
			}
			return tmp
		}),
	}
}

// GetNodes returns the list of active cluster nodes.
func (r *Client) GetNodes() ([]*NodeStatus, error) {
	data, err := r.presenceCache.Data()
	if err != nil {
		return nil, err
	}
	return data.([]*NodeStatus), nil
}

// GetMount returns data on a specific mountpoint (returns nil if not
// found).
func (r *Client) GetMount(mountName string) (*Mount, error) {
	response, err := r.client.Get(mountEtcdPath(mountName), false, false)
	if err != nil || response.Node == nil {
		return nil, err
	}
	if response.Node.Dir {
		return nil, errors.New("key is a directory")
	}

	var m Mount
	if err := json.NewDecoder(strings.NewReader(response.Node.Value)).Decode(&m); err != nil {
		return nil, err
	}
	return &m, nil
}

// SetMount creates or updates a mountpoint.
func (r *Client) SetMount(m *Mount) error {
	var buf bytes.Buffer
	if err := json.NewEncoder(&buf).Encode(m); err != nil {
		return err
	}

	_, err := r.client.Set(mountEtcdPath(m.Name), buf.String(), 0)
	return err
}

// DelMount removes a mountpoint.
func (r *Client) DelMount(mountName string) error {
	_, err := r.client.Delete(mountEtcdPath(mountName), false)
	return err
}

// ListMounts returns a list of all the configured mountpoints.
func (r *Client) ListMounts() ([]*Mount, error) {
	response, err := r.client.Get(MountPrefix, true, false)
	if err != nil || response.Node == nil {
		return nil, err
	}
	if !response.Node.Dir {
		return nil, errors.New("key is a file")
	}

	result := make([]*Mount, 0, len(response.Node.Nodes))
	for _, n := range response.Node.Nodes {
		if n.Dir {
			continue
		}
		var m Mount
		if err := json.NewDecoder(strings.NewReader(n.Value)).Decode(&m); err != nil {
			continue
		}
		result = append(result, &m)
	}
	return result, nil
}

// MasterNodeInfo stores location data for the master node. Having the
// IP address here saves another round-trip to etcd to retrieve the
// node info in the most common case.
type MasterNodeInfo struct {
	// Name of the node.
	Name string

	// Public IPs of the node.
	IP []net.IP

	// Internal IPs of the node (possibly the same as IP).
	InternalIP []net.IP
}

func (m MasterNodeInfo) GetInternalIP() []net.IP {
	if len(m.InternalIP) != 0 {
		return m.InternalIP
	}
	return m.IP
}

// GetMasterInfo returns the address of the current master server.
func (r *Client) GetMasterInfo() (*MasterNodeInfo, error) {
	response, err := r.client.Get(MasterElectionPath, false, false)
	if err != nil || response.Node == nil {
		return nil, err
	}
	if response.Node.Dir {
		return nil, errors.New("key is a directory")
	}
	var m MasterNodeInfo
	if err := json.NewDecoder(strings.NewReader(response.Node.Value)).Decode(&m); err != nil {
		return nil, err
	}
	return &m, nil
}

// GeneratePassword returns a new random password.
func GeneratePassword() string {
	b := make([]byte, 6)
	rand.Read(b)
	return base64.StdEncoding.EncodeToString(b)
}

// GenerateUsername returns a username somehow related to the name of
// the mount, possibly unique (but not actually guaranteed to be so).
func GenerateUsername(path string) string {
	return fmt.Sprintf("source%d", crc32.ChecksumIEEE([]byte(path)))
}