Skip to content
Snippets Groups Projects
Commit 089bdf32 authored by ale's avatar ale
Browse files

remove some obvious race conditions from mock_etcd

parent 51d682cd
No related branches found
No related tags found
No related merge requests found
......@@ -22,9 +22,10 @@ type datum struct {
}
type testEtcdServer struct {
lock sync.Mutex
latency time.Duration
lock sync.Mutex
data map[string]datum
wlock sync.Mutex
watches map[string][]chan *etcd.Response
index uint64
}
......@@ -63,23 +64,29 @@ func (s *testEtcdServer) trigger(action, key string) *etcd.Response {
if action != "delete" {
resp.Node.Value = s.data[key].value
}
for pfx, w := range s.watches {
if strings.HasPrefix(key, pfx) {
for _, ch := range w {
ch <- resp
s.index++
s.lock.Unlock()
go func() {
s.wlock.Lock()
for pfx, w := range s.watches {
if strings.HasPrefix(key, pfx) {
for _, ch := range w {
ch <- resp
}
}
}
}
s.index++
s.wlock.Unlock()
}()
return resp
}
func (s *testEtcdServer) Create(key, value string, ttl uint64) (*etcd.Response, error) {
s.delay()
s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.data[key]; ok {
s.lock.Unlock()
return nil, &etcd.EtcdError{Message: "already there", Index: s.index}
}
......@@ -93,7 +100,6 @@ func (s *testEtcdServer) Create(key, value string, ttl uint64) (*etcd.Response,
func (s *testEtcdServer) CompareAndSwap(key, value string, ttl uint64, oldvalue string, index uint64) (*etcd.Response, error) {
s.delay()
s.lock.Lock()
defer s.lock.Unlock()
if d, ok := s.data[key]; ok && d.value == oldvalue {
s.data[key] = datum{
value: value,
......@@ -101,13 +107,13 @@ func (s *testEtcdServer) CompareAndSwap(key, value string, ttl uint64, oldvalue
}
return s.trigger("update", key), nil
}
s.lock.Unlock()
return nil, &etcd.EtcdError{Message: "failed", Index: s.index}
}
func (s *testEtcdServer) Delete(key string, recursive bool) (*etcd.Response, error) {
s.delay()
s.lock.Lock()
defer s.lock.Unlock()
delete(s.data, key)
return s.trigger("delete", key), nil
}
......@@ -148,7 +154,6 @@ func (s *testEtcdServer) Get(key string, recursive, boh bool) (*etcd.Response, e
func (s *testEtcdServer) Set(key, value string, ttl uint64) (*etcd.Response, error) {
s.delay()
s.lock.Lock()
defer s.lock.Unlock()
s.data[key] = datum{
value: value,
expire: time.Now().Add(time.Duration(ttl) * time.Second),
......@@ -159,11 +164,11 @@ func (s *testEtcdServer) Set(key, value string, ttl uint64) (*etcd.Response, err
func (s *testEtcdServer) Watch(key string, index uint64, recursive bool, respch chan *etcd.Response, stop chan bool) (*etcd.Response, error) {
ch := respch
if ch == nil {
ch = make(chan *etcd.Response, 1)
ch = make(chan *etcd.Response)
}
s.lock.Lock()
s.wlock.Lock()
s.watches[key] = append(s.watches[key], ch)
s.lock.Unlock()
s.wlock.Unlock()
var resp *etcd.Response
if respch != nil {
......@@ -176,7 +181,7 @@ func (s *testEtcdServer) Watch(key string, index uint64, recursive bool, respch
}
// Delete the watch.
s.lock.Lock()
s.wlock.Lock()
var watches []chan *etcd.Response
for _, w := range s.watches[key] {
if w != ch {
......@@ -184,7 +189,7 @@ func (s *testEtcdServer) Watch(key string, index uint64, recursive bool, respch
}
}
s.watches[key] = watches
s.lock.Unlock()
s.wlock.Unlock()
close(ch)
if resp == nil {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment