Commit 67dd50ee authored by ale's avatar ale

upstream fixes to coordination lib

Ensure that the Syncer listens on all interesting updates (added
compareAndSwap and compareAndDelete).
parent fd4b1c84
......@@ -59,7 +59,8 @@ func (s *FakeEtcdClient) trigger(action, key string) *etcd.Response {
Action: action,
EtcdIndex: s.index,
Node: &etcd.Node{
Key: key,
Key: key,
ModifiedIndex: s.index,
},
}
if action != "delete" {
......@@ -122,9 +123,9 @@ func (s *FakeEtcdClient) CompareAndDelete(key, oldvalue string, index uint64) (*
s.lock.Lock()
d, ok := s.data[key]
if ok && ((oldvalue == "" || d.value == oldvalue) ||
(index == 0 || d.index <= index)) {
(index == 0 || d.index != index)) {
delete(s.data, key)
return s.trigger("delete", key), nil
return s.trigger("compareAndDelete", key), nil
}
s.lock.Unlock()
return nil, &etcd.EtcdError{Message: "failed", Index: s.index}
......@@ -135,13 +136,13 @@ func (s *FakeEtcdClient) CompareAndSwap(key, value string, ttl uint64, oldvalue
s.lock.Lock()
d, ok := s.data[key]
if ok && ((oldvalue == "" || d.value == oldvalue) ||
(index == 0 || d.index <= index)) {
(index == 0 || d.index != index)) {
s.data[key] = datum{
value: value,
expire: ttlToTime(ttl),
index: s.index,
}
return s.trigger("update", key), nil
return s.trigger("compareAndSwap", key), nil
}
s.lock.Unlock()
return nil, &etcd.EtcdError{Message: "failed", Index: s.index}
......@@ -169,8 +170,9 @@ func (s *FakeEtcdClient) Get(key string, recursive, boh bool) (*etcd.Response, e
if (path == key || strings.HasPrefix(path, keyDirPfx)) &&
(datum.expire.IsZero() || datum.expire.After(now)) {
nodes = append(nodes, &etcd.Node{
Key: path,
Value: datum.value,
Key: path,
Value: datum.value,
ModifiedIndex: datum.index,
})
}
}
......@@ -226,8 +228,9 @@ func (s *FakeEtcdClient) Watch(key string, index uint64, recursive bool, respch
for k, d := range s.data {
if strings.HasPrefix(k, key) && d.index >= index {
nodes = append(nodes, &etcd.Node{
Key: k,
Value: d.value,
Key: k,
Value: d.value,
ModifiedIndex: d.index,
})
}
}
......
......@@ -42,18 +42,15 @@ func (s *Syncer) Close() {
func (s *Syncer) handleResponse(resp *etcd.Response) {
nodes := flatten(nil, resp.Node)
//data, _ := json.Marshal(resp)
//log.Printf("Syncer: %s", string(data))
switch resp.Action {
case "delete":
case "delete", "compareAndDelete":
for _, n := range nodes {
s.target.Delete(n.Key)
}
case "get", "set", "create", "update":
case "get", "set", "create", "update", "compareAndSwap":
for _, n := range nodes {
s.target.Set(n.Key, n.Value, resp.EtcdIndex)
s.target.Set(n.Key, n.Value, n.ModifiedIndex)
}
}
}
package watcher
import (
"fmt"
"testing"
"time"
"git.autistici.org/ale/autoradio/coordination/etcdtest"
)
type testSyncable struct {
values map[string]string
indexes map[string]uint64
}
func (s *testSyncable) Set(key, value string, index uint64) {
s.values[key] = value
s.indexes[key] = index
}
func (s *testSyncable) Delete(key string) {
delete(s.values, key)
delete(s.indexes, key)
}
func newTestSyncable() *testSyncable {
return &testSyncable{
values: make(map[string]string),
indexes: make(map[string]uint64),
}
}
func TestSyncer_IndexCorrectness(t *testing.T) {
etcd := etcdtest.NewClient()
target := newTestSyncable()
syn := NewSyncer(etcd, "/test/", target)
defer syn.Close()
n := 1000
expectedIndex := make(map[string]uint64)
for i := 0; i < n; i++ {
key := fmt.Sprintf("/test/key%06d", i)
resp, _ := etcd.Set(key, fmt.Sprintf("%08x", i), 0)
expectedIndex[key] = resp.Node.ModifiedIndex
}
time.Sleep(100 * time.Millisecond)
if len(target.values) != n {
t.Fatalf("Syncable only saw %d keys", len(target.values))
}
for key, expected := range expectedIndex {
idx := target.indexes[key]
if idx != expected {
t.Errorf("Synced key has bad index: got %d, expected %d", idx, expected)
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment