package autoradio

import (
	"bytes"
	"crypto/rand"
	"encoding/base64"
	"encoding/json"
	"errors"
	"net"
	"strings"
	"sync"
	"time"
)

const (
	// Path to the mount configuration in etcd. This should never
	// change, upgrades to the configuration format should be
	// backwards-compatible.
	MountPrefix = "/icecast/mounts/"

	// Path for the cluster runtime data. Whenever the format of
	// this data changes, the ABIVersion should be increased. A
	// rolling restart of the cluster will then seamlessly cause a
	// transition to the new consensus (the cluster will be
	// partitioned in the meantime).
	ABIVersion         = "2"
	MasterElectionPath = "/icecast/" + ABIVersion + "/cluster/master"
	NodePrefix         = "/icecast/" + ABIVersion + "/nodes/"

	IcecastPort        = 8000
	IcecastMountPrefix = "/_stream"
)

var (
	ErrIsDirectory = errors.New("key is a directory")
	ErrIsFile      = errors.New("key is a file")
)

// A mountpoint for a stream.
type Mount struct {
	// Name (path to the mountpoint).
	Name string

	// Username for source authentication.
	Username string

	// Password for source authentication.
	Password string

	// Is this field is set, the mount point will be relaying an
	// external stream and no source connections will be accepted.
	// Each node will pull from the external source directly,
	// ignoring our master election protocol.
	RelayUrl string

	// Fallback stream name (optional).
	Fallback string
}

func (m *Mount) Equal(other *Mount) bool {
	return *m == *other
}

func (m *Mount) IsRelay() bool {
	return m.RelayUrl != ""
}

// Return the path in etcd used to store mountpoint configuration.
func mountEtcdPath(mountName string) string {
	return MountPrefix + mountName[1:]
}

// Return the Icecast mount path for the given public mount name.
func MountNameToIcecastPath(mountName string) string {
	return IcecastMountPrefix + mountName
}

// Return the public mount name from an Icecast mount path.
func IcecastPathToMountName(path string) string {
	return strings.TrimPrefix(path, IcecastMountPrefix)
}

// Status of a mount on an individual Icecast server.
type IcecastMountStatus struct {
	Name         string
	Listeners    int
	BitRate      int
	Quality      float64
	VideoQuality float64
	FrameSize    string
	FrameRate    float64
}

// Status of a node. This is used to report load and stream status.
type NodeStatus struct {
	// Short name of this node.
	Name string

	// Public IP addresses of this server.
	IP []net.IP

	// Is the Icecast server up?
	IcecastUp bool

	// List of mount points.
	Mounts []IcecastMountStatus `xml:"mount"`

	// Bandwidth utilization.
	BandwidthUsage float64
}

func (ns *NodeStatus) NumListeners() int {
	listeners := 0
	for _, m := range ns.Mounts {
		listeners += m.Listeners
	}
	return listeners
}

// Cache the list of active nodes (the front-ends that need to
// retrieve this information continuously, so we limit them to 2qps).
type nodesCache struct {
	ttl      time.Duration
	nodes    []*NodeStatus
	deadline time.Time
	lock     sync.Mutex
}

type getNodesFunc func() ([]*NodeStatus, error)

func newNodesCache() *nodesCache {
	return &nodesCache{
		ttl: 500 * time.Millisecond,
	}
}

// Get returns the cached value of 'fn', if valid. If the value is
// expired and we get an error from 'fn', we will attempt to return
// the previously cached value anyway, along with the error: the
// caller can then pick the right failure behavior.
func (nc *nodesCache) Get(fn getNodesFunc) ([]*NodeStatus, error) {
	nc.lock.Lock()
	defer nc.lock.Unlock()

	var err error
	now := time.Now()
	if now.After(nc.deadline) {
		var nodes []*NodeStatus
		if nodes, err = fn(); err == nil {
			nc.nodes = nodes
			nc.deadline = now.Add(nc.ttl)
		}
	}
	return nc.nodes, err
}

// Client is the actual API to the streaming cluster's database.
type Client struct {
	client           EtcdClient
	activeNodesCache *nodesCache
}

func NewClient(client EtcdClient) *Client {
	return &Client{client, newNodesCache()}
}

// GetMount returns data on a specific mountpoint (returns nil if not
// found).
func (r *Client) GetMount(mountName string) (*Mount, error) {
	response, err := r.client.Get(mountEtcdPath(mountName), false, false)
	if err != nil || response.Node == nil {
		return nil, err
	}
	if response.Node.Dir {
		return nil, ErrIsDirectory
	}

	var m Mount
	if err := json.NewDecoder(strings.NewReader(response.Node.Value)).Decode(&m); err != nil {
		return nil, err
	}
	return &m, nil
}

// SetMount creates or updates a mountpoint.
func (r *Client) SetMount(m *Mount) error {
	var buf bytes.Buffer
	if err := json.NewEncoder(&buf).Encode(m); err != nil {
		return err
	}

	_, err := r.client.Set(mountEtcdPath(m.Name), buf.String(), 0)
	return err
}

// DelMount removes a mountpoint.
func (r *Client) DelMount(mountName string) error {
	_, err := r.client.Delete(mountEtcdPath(mountName), false)
	return err
}

// ListMounts returns a list of all the configured mountpoints.
func (r *Client) ListMounts() ([]*Mount, error) {
	response, err := r.client.Get(MountPrefix, true, false)
	if err != nil || response.Node == nil {
		return nil, err
	}
	if !response.Node.Dir {
		return nil, ErrIsFile
	}

	result := make([]*Mount, 0, len(response.Node.Nodes))
	for _, n := range response.Node.Nodes {
		if n.Dir {
			continue
		}
		var m Mount
		if err := json.NewDecoder(strings.NewReader(n.Value)).Decode(&m); err != nil {
			continue
		}
		result = append(result, &m)
	}
	return result, nil
}

// Location data for the master node. Having the IP address here saves
// another round-trip to etcd to retrieve the node info in the most
// common case.
type MasterNodeInfo struct {
	Name string
	IP   []net.IP
}

// GetMasterAddr returns the address of the current master server.
func (r *Client) GetMasterInfo() (*MasterNodeInfo, error) {
	response, err := r.client.Get(MasterElectionPath, false, false)
	if err != nil || response.Node == nil {
		return nil, err
	}
	if response.Node.Dir {
		return nil, ErrIsDirectory
	}
	var m MasterNodeInfo
	if err := json.NewDecoder(strings.NewReader(response.Node.Value)).Decode(&m); err != nil {
		return nil, err
	}
	return &m, nil
}

// GetNodes returns the list of active cluster nodes.
func (r *Client) doGetNodes() ([]*NodeStatus, error) {
	response, err := r.client.Get(NodePrefix, false, false)
	if err != nil || response.Node == nil {
		return nil, err
	}
	if !response.Node.Dir {
		return nil, ErrIsFile
	}
	result := make([]*NodeStatus, 0, len(response.Node.Nodes))
	for _, entry := range response.Node.Nodes {
		var ns NodeStatus
		if err := json.NewDecoder(strings.NewReader(entry.Value)).Decode(&ns); err == nil {
			result = append(result, &ns)
		}
	}
	return result, nil
}

func (r *Client) GetNodes() ([]*NodeStatus, error) {
	return r.activeNodesCache.Get(r.doGetNodes)
}

// GeneratePassword returns a new random password.
func GeneratePassword() string {
	b := make([]byte, 6)
	rand.Read(b)
	return base64.StdEncoding.EncodeToString(b)
}