diff --git a/util/mock_etcd.go b/util/mock_etcd.go index 300dc683091435b58e50bdf7be89e246539fdb46..32cf1de508b0f2a18efb45e344b4030e82f19f5a 100644 --- a/util/mock_etcd.go +++ b/util/mock_etcd.go @@ -22,9 +22,10 @@ type datum struct { } type testEtcdServer struct { - lock sync.Mutex latency time.Duration + lock sync.Mutex data map[string]datum + wlock sync.Mutex watches map[string][]chan *etcd.Response index uint64 } @@ -63,23 +64,29 @@ func (s *testEtcdServer) trigger(action, key string) *etcd.Response { if action != "delete" { resp.Node.Value = s.data[key].value } - for pfx, w := range s.watches { - if strings.HasPrefix(key, pfx) { - for _, ch := range w { - ch <- resp + s.index++ + s.lock.Unlock() + + go func() { + s.wlock.Lock() + for pfx, w := range s.watches { + if strings.HasPrefix(key, pfx) { + for _, ch := range w { + ch <- resp + } } } - } - s.index++ + s.wlock.Unlock() + }() + return resp } func (s *testEtcdServer) Create(key, value string, ttl uint64) (*etcd.Response, error) { s.delay() s.lock.Lock() - defer s.lock.Unlock() - if _, ok := s.data[key]; ok { + s.lock.Unlock() return nil, &etcd.EtcdError{Message: "already there", Index: s.index} } @@ -93,7 +100,6 @@ func (s *testEtcdServer) Create(key, value string, ttl uint64) (*etcd.Response, func (s *testEtcdServer) CompareAndSwap(key, value string, ttl uint64, oldvalue string, index uint64) (*etcd.Response, error) { s.delay() s.lock.Lock() - defer s.lock.Unlock() if d, ok := s.data[key]; ok && d.value == oldvalue { s.data[key] = datum{ value: value, @@ -101,13 +107,13 @@ func (s *testEtcdServer) CompareAndSwap(key, value string, ttl uint64, oldvalue } return s.trigger("update", key), nil } + s.lock.Unlock() return nil, &etcd.EtcdError{Message: "failed", Index: s.index} } func (s *testEtcdServer) Delete(key string, recursive bool) (*etcd.Response, error) { s.delay() s.lock.Lock() - defer s.lock.Unlock() delete(s.data, key) return s.trigger("delete", key), nil } @@ -148,7 +154,6 @@ func (s *testEtcdServer) Get(key string, recursive, boh bool) (*etcd.Response, e func (s *testEtcdServer) Set(key, value string, ttl uint64) (*etcd.Response, error) { s.delay() s.lock.Lock() - defer s.lock.Unlock() s.data[key] = datum{ value: value, expire: time.Now().Add(time.Duration(ttl) * time.Second), @@ -159,11 +164,11 @@ func (s *testEtcdServer) Set(key, value string, ttl uint64) (*etcd.Response, err func (s *testEtcdServer) Watch(key string, index uint64, recursive bool, respch chan *etcd.Response, stop chan bool) (*etcd.Response, error) { ch := respch if ch == nil { - ch = make(chan *etcd.Response, 1) + ch = make(chan *etcd.Response) } - s.lock.Lock() + s.wlock.Lock() s.watches[key] = append(s.watches[key], ch) - s.lock.Unlock() + s.wlock.Unlock() var resp *etcd.Response if respch != nil { @@ -176,7 +181,7 @@ func (s *testEtcdServer) Watch(key string, index uint64, recursive bool, respch } // Delete the watch. - s.lock.Lock() + s.wlock.Lock() var watches []chan *etcd.Response for _, w := range s.watches[key] { if w != ch { @@ -184,7 +189,7 @@ func (s *testEtcdServer) Watch(key string, index uint64, recursive bool, respch } } s.watches[key] = watches - s.lock.Unlock() + s.wlock.Unlock() close(ch) if resp == nil {