diff --git a/coordination/etcdtest/fake_etcd.go b/coordination/etcdtest/fake_etcd.go index 6cf02c287abc22ed1b23e14d11d44544067d9d1a..f163c116f2c33aba783fe1dddce2dd9ef5f23c97 100644 --- a/coordination/etcdtest/fake_etcd.go +++ b/coordination/etcdtest/fake_etcd.go @@ -59,7 +59,8 @@ func (s *FakeEtcdClient) trigger(action, key string) *etcd.Response { Action: action, EtcdIndex: s.index, Node: &etcd.Node{ - Key: key, + Key: key, + ModifiedIndex: s.index, }, } if action != "delete" { @@ -122,9 +123,9 @@ func (s *FakeEtcdClient) CompareAndDelete(key, oldvalue string, index uint64) (* s.lock.Lock() d, ok := s.data[key] if ok && ((oldvalue == "" || d.value == oldvalue) || - (index == 0 || d.index <= index)) { + (index == 0 || d.index != index)) { delete(s.data, key) - return s.trigger("delete", key), nil + return s.trigger("compareAndDelete", key), nil } s.lock.Unlock() return nil, &etcd.EtcdError{Message: "failed", Index: s.index} @@ -135,13 +136,13 @@ func (s *FakeEtcdClient) CompareAndSwap(key, value string, ttl uint64, oldvalue s.lock.Lock() d, ok := s.data[key] if ok && ((oldvalue == "" || d.value == oldvalue) || - (index == 0 || d.index <= index)) { + (index == 0 || d.index != index)) { s.data[key] = datum{ value: value, expire: ttlToTime(ttl), index: s.index, } - return s.trigger("update", key), nil + return s.trigger("compareAndSwap", key), nil } s.lock.Unlock() return nil, &etcd.EtcdError{Message: "failed", Index: s.index} @@ -169,8 +170,9 @@ func (s *FakeEtcdClient) Get(key string, recursive, boh bool) (*etcd.Response, e if (path == key || strings.HasPrefix(path, keyDirPfx)) && (datum.expire.IsZero() || datum.expire.After(now)) { nodes = append(nodes, &etcd.Node{ - Key: path, - Value: datum.value, + Key: path, + Value: datum.value, + ModifiedIndex: datum.index, }) } } @@ -226,8 +228,9 @@ func (s *FakeEtcdClient) Watch(key string, index uint64, recursive bool, respch for k, d := range s.data { if strings.HasPrefix(k, key) && d.index >= index { nodes = append(nodes, &etcd.Node{ - Key: k, - Value: d.value, + Key: k, + Value: d.value, + ModifiedIndex: d.index, }) } } diff --git a/coordination/watcher/sync.go b/coordination/watcher/sync.go index f055357fe2ab05711d110cea45e40670e88f7a90..60dc37ee22dd97c4303245c6bff875176f3ab4ef 100644 --- a/coordination/watcher/sync.go +++ b/coordination/watcher/sync.go @@ -42,18 +42,15 @@ func (s *Syncer) Close() { func (s *Syncer) handleResponse(resp *etcd.Response) { nodes := flatten(nil, resp.Node) - //data, _ := json.Marshal(resp) - //log.Printf("Syncer: %s", string(data)) - switch resp.Action { - case "delete": + case "delete", "compareAndDelete": for _, n := range nodes { s.target.Delete(n.Key) } - case "get", "set", "create", "update": + case "get", "set", "create", "update", "compareAndSwap": for _, n := range nodes { - s.target.Set(n.Key, n.Value, resp.EtcdIndex) + s.target.Set(n.Key, n.Value, n.ModifiedIndex) } } } diff --git a/coordination/watcher/sync_test.go b/coordination/watcher/sync_test.go new file mode 100644 index 0000000000000000000000000000000000000000..6e05766a25dcae3ae7d925fcfb5537470922395a --- /dev/null +++ b/coordination/watcher/sync_test.go @@ -0,0 +1,57 @@ +package watcher + +import ( + "fmt" + "testing" + "time" + + "git.autistici.org/ale/autoradio/coordination/etcdtest" +) + +type testSyncable struct { + values map[string]string + indexes map[string]uint64 +} + +func (s *testSyncable) Set(key, value string, index uint64) { + s.values[key] = value + s.indexes[key] = index +} + +func (s *testSyncable) Delete(key string) { + delete(s.values, key) + delete(s.indexes, key) +} + +func newTestSyncable() *testSyncable { + return &testSyncable{ + values: make(map[string]string), + indexes: make(map[string]uint64), + } +} + +func TestSyncer_IndexCorrectness(t *testing.T) { + etcd := etcdtest.NewClient() + target := newTestSyncable() + syn := NewSyncer(etcd, "/test/", target) + defer syn.Close() + + n := 1000 + expectedIndex := make(map[string]uint64) + for i := 0; i < n; i++ { + key := fmt.Sprintf("/test/key%06d", i) + resp, _ := etcd.Set(key, fmt.Sprintf("%08x", i), 0) + expectedIndex[key] = resp.Node.ModifiedIndex + } + + time.Sleep(100 * time.Millisecond) + if len(target.values) != n { + t.Fatalf("Syncable only saw %d keys", len(target.values)) + } + for key, expected := range expectedIndex { + idx := target.indexes[key] + if idx != expected { + t.Errorf("Synced key has bad index: got %d, expected %d", idx, expected) + } + } +}