Commit 6c9f8d31 authored by ale's avatar ale

update to the etcd v2 API

parent 93e5231a
......@@ -18,7 +18,10 @@ var (
MountPrefix = "/icecast/mounts/"
NodePrefix = "/icecast/nodes/"
IcecastPort = 8000
IcecastPort = 8000
ErrIsDirectory = errors.New("key is a directory")
ErrIsFile = errors.New("key is a file")
)
// A mountpoint for a stream.
......@@ -40,7 +43,8 @@ func mountPath(mountName string) string {
return MountPrefix + mountName[1:]
}
// Cache the list of active nodes.
// Cache the list of active nodes (the front-ends that need to
// retrieve this information continuously, so we limit them to 2qps).
type nodesCache struct {
ttl time.Duration
nodes []string
......@@ -89,16 +93,16 @@ func NewRadioAPI(client *etcd.Client) *RadioAPI {
// GetMount returns data on a specific mountpoint (returns nil if not
// found).
func (r *RadioAPI) GetMount(mountName string) (*Mount, error) {
response, err := r.client.Get(mountPath(mountName))
response, err := r.client.Get(mountPath(mountName), false)
if err != nil {
return nil, err
}
if len(response) != 1 {
return nil, nil
if response.Dir {
return nil, ErrIsDirectory
}
var m Mount
if err := json.NewDecoder(strings.NewReader(response[0].Value)).Decode(&m); err != nil {
if err := json.NewDecoder(strings.NewReader(response.Value)).Decode(&m); err != nil {
return nil, err
}
return &m, nil
......@@ -123,14 +127,21 @@ func (r *RadioAPI) DelMount(mountName string) error {
// ListMounts returns a list of all the configured mountpoints.
func (r *RadioAPI) ListMounts() ([]*Mount, error) {
response, err := r.client.Get(MountPrefix)
response, err := r.client.Get(MountPrefix, false)
if err != nil {
return nil, err
}
result := make([]*Mount, 0, len(response))
for _, entry := range response {
if !response.Dir {
return nil, ErrIsFile
}
result := make([]*Mount, 0, len(response.Kvs))
for _, kv := range response.Kvs {
if kv.Dir {
continue
}
var m Mount
if err := json.NewDecoder(strings.NewReader(entry.Value)).Decode(&m); err != nil {
if err := json.NewDecoder(strings.NewReader(kv.Value)).Decode(&m); err != nil {
continue
}
result = append(result, &m)
......@@ -140,24 +151,27 @@ func (r *RadioAPI) ListMounts() ([]*Mount, error) {
// GetMasterAddr returns the address of the current master server.
func (r *RadioAPI) GetMasterAddr() (string, error) {
response, err := r.client.Get(MasterElectionPath)
response, err := r.client.Get(MasterElectionPath, false)
if err != nil {
return "", err
}
if len(response) < 1 {
return "", errors.New("no active master")
if response.Dir {
return "", ErrIsDirectory
}
return response[0].Value, nil
return response.Value, nil
}
// GetNodes returns the list of active cluster nodes.
func (r *RadioAPI) doGetNodes() ([]string, error) {
response, err := r.client.Get(NodePrefix)
response, err := r.client.Get(NodePrefix, false)
if err != nil {
return nil, err
}
result := make([]string, 0, len(response))
for _, entry := range response {
if !response.Dir {
return nil, ErrIsFile
}
result := make([]string, 0, len(response.Kvs))
for _, entry := range response.Kvs {
result = append(result, entry.Value)
}
return result, nil
......
......@@ -104,9 +104,10 @@ func generateUsername(path string) string {
func createMount(args []string) {
path := args[0]
if !strings.HasPrefix(path, "/") {
log.Fatal("Mount points should specify a full path")
if strings.Contains(path, "/") {
log.Fatal("Mount points should not contain a slash ('/').")
}
path = "/" + path
// Check if the mount already exists.
client := getClient()
......
......@@ -59,7 +59,7 @@ func NewEtcdClient() *etcd.Client {
c := etcd.NewClient(machines)
if proto == "https" {
c.SetScheme(etcd.HTTPS)
if _, err := c.SetCertAndKey(loadFile(*etcdCertFile), loadFile(*etcdKeyFile)); err != nil {
if err := c.SetCertAndKey(loadFile(*etcdCertFile), loadFile(*etcdKeyFile)); err != nil {
log.Fatal("Error setting up SSL for etcd client: %s", err)
}
}
......
......@@ -23,8 +23,8 @@ func stateToString(state int) string {
}
type MasterElection struct {
client *etcd.Client
stop chan bool
client *etcd.Client
stop chan bool
stopped bool
Addr string
......@@ -55,11 +55,11 @@ func (m *MasterElection) IsMaster() bool {
}
func (m *MasterElection) GetMasterAddr() string {
responses, err := m.client.Get(m.Path)
if err != nil || len(responses) != 1 {
response, err := m.client.Get(m.Path, false)
if err != nil {
return ""
}
return responses[0].Value
return response.Value
}
func (m *MasterElection) setState(state int) {
......@@ -87,42 +87,113 @@ func (m *MasterElection) stopper() {
// Remove the lock file if we are the master.
if m.State == STATE_MASTER {
log.Printf("releasing masterelection lock")
m.client.Delete(m.Path)
}
}
func boolTimer(delay time.Duration) chan bool {
ch := make(chan bool)
go func() {
time.Sleep(delay)
close(ch)
}()
return ch
}
func (m *MasterElection) loop() {
}
func (m *MasterElection) runMaster(index uint64) {
m.setState(STATE_MASTER)
// If we renew the lease every TTL / N, we allow N renewal
// errors before we stop believing being the master.
ttl := time.Second * time.Duration(m.TTL)
tick := time.NewTicker(ttl / 3)
lastUpdate := time.Now()
for {
select {
case t := <- tick.C:
// To verify that we actually are still the
// master (not just we believe we are), try
// yet another compare-and-swap to check that
// the stored master address is still our own,
// and no-one stole our lock. If not, the TTL
// will be updated (and the lock renewed).
response, err := m.client.CompareAndSwap(m.Path, m.Addr, m.TTL, m.Addr, index)
if err != nil {
log.Printf("error updating lock: %s", err)
// If we can't renew the lock for a
// TTL, we must assume we lost it.
if t.Sub(lastUpdate) > ttl {
log.Printf("too many errors, lost lock")
return
}
}
index = response.ModifiedIndex
lastUpdate = t
case <-m.stop:
return
}
}
}
func (m *MasterElection) runSlave(index uint64) {
m.setState(STATE_SLAVE)
for {
// Start a watch on the lock, waiting for its removal.
response, err := m.client.Watch(m.Path, index+1, nil, m.stop)
if err != nil {
log.Printf("slave Watch() error: %+v", err)
return
}
if response.Action == "delete" || response.Action == "expire" {
return
}
index = response.ModifiedIndex
}
}
func (m *MasterElection) Run() {
go m.stopper()
// Start as a slave.
m.setState(STATE_SLAVE)
halfttl := time.Second * time.Duration(m.TTL / 2)
var watchIndex uint64
for !m.stopped {
// Since a failed Create does not return the
// RAFT index, let's optimistically query the lock
// before starting just to set a baseline for the
// index.
if iresponse, err := m.client.Get(m.Path, false); err == nil {
log.Printf("lock already exists: %+v", iresponse)
watchIndex = iresponse.ModifiedIndex
}
// Try to acquire the lock. If we are currently the
// master, the previous value should be our own
// address, otherwise it should be unset.
prevValue := ""
if m.State == STATE_MASTER {
prevValue = m.Addr
}
if _, ok, _ := m.client.TestAndSet(m.Path, prevValue, m.Addr, m.TTL); ok {
response, err := m.client.Create(m.Path, m.Addr, m.TTL)
if err == nil {
// Howdy, we're the master now. Wait a while
// and renew our TTL.
m.setState(STATE_MASTER)
time.Sleep(halfttl)
m.runMaster(response.ModifiedIndex)
} else {
// We're not the master. We could wait for a
// DELETE but I'm not sure if that's what you
// get on expiry, so we just wait for the
// first event which will be another SET from
// the current master. Oh well.
m.setState(STATE_SLAVE)
_, err := m.client.Watch(m.Path, 0, nil, nil)
if err != nil {
log.Printf("%s: watch error: %s", m.Path, err)
}
// We're not the master. Wait until the lock
// is deleted or expires.
m.runSlave(watchIndex)
}
}
}
......@@ -86,7 +86,7 @@ func NewConfigSyncer(client *etcd.Client, config *ClusterConfig, upch chan bool,
return &ConfigSyncer{
client: client,
config: config,
rch: make(chan *etcd.Response, 10),
rch: make(chan *etcd.Response),
upch: upch,
stop: stop,
}
......@@ -97,15 +97,13 @@ func (w *ConfigSyncer) syncer() {
for {
select {
case response := <-w.rch:
switch response.Action {
case "DELETE":
if response.Action == "delete" {
mountName := keyToMount(response.Key)
log.Printf("deleted mount %s", mountName)
w.config.delMount(mountName)
case "SET":
w.updateConfigWithResponse(response)
default:
} else if response.Action == "set" || response.Action == "create" || response.Action == "update" {
w.updateConfigWithResponse(response.Key, response.Value)
} else {
continue
}
......@@ -113,7 +111,7 @@ func (w *ConfigSyncer) syncer() {
// the Watcher dies, it knows where to start
// from and we do not have to download the
// full configuration again.
w.index = response.Index
w.index = response.ModifiedIndex
// Trigger an update.
trigger(w.upch)
......@@ -124,12 +122,12 @@ func (w *ConfigSyncer) syncer() {
}
}
func (w *ConfigSyncer) updateConfigWithResponse(response *etcd.Response) {
mountName := keyToMount(response.Key)
log.Printf("updating mount %s: %+v", mountName, response)
func (w *ConfigSyncer) updateConfigWithResponse(key, value string) {
mountName := keyToMount(key)
log.Printf("updating mount %s: %s", mountName, value)
var m radioai.Mount
if err := json.NewDecoder(strings.NewReader(response.Value)).Decode(&m); err != nil {
log.Printf("corrupted data: %s: %s", response.Value, err)
if err := json.NewDecoder(strings.NewReader(value)).Decode(&m); err != nil {
log.Printf("corrupted data: %s: %s", value, err)
} else {
w.config.setMount(&m)
}
......@@ -142,14 +140,13 @@ func (w *ConfigSyncer) Run() {
// Run until the first successful Get().
log.Printf("attempting to retrieve initial config...")
for {
responses, err := w.client.Get(radioai.MountPrefix)
log.Printf("Get(): %+v", responses)
if err == nil {
response, err := w.client.Get(radioai.MountPrefix, false)
if err == nil && response.Dir {
// Directly update the configuration.
for _, r := range responses {
w.updateConfigWithResponse(r)
w.index = r.Index
for _, r := range response.Kvs {
w.updateConfigWithResponse(r.Key, r.Value)
}
w.index = response.ModifiedIndex
break
}
log.Printf("Get error: %s", err)
......@@ -207,8 +204,7 @@ func NewRadioNode(ip string, client *etcd.Client) *RadioNode {
// MasterElection changes trigger an update.
mech := make(chan int)
go func() {
for state := range mech {
log.Printf("master election status changed: %d", state)
for _ = range mech {
trigger(upch)
}
}()
......@@ -286,4 +282,7 @@ func (rc *RadioNode) Run() {
// Stop everything.
func (rc *RadioNode) Stop() {
close(rc.stop)
// We should use WaitGroups here. Instead, wait 2 seconds.
time.Sleep(2 * time.Second)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment