diff --git a/coordination/watcher/watcher.go b/coordination/watcher/watcher.go index 6c058792597a26abafc7d270ebc97a5a45fee890..3edfc3e55281f9133258587324062fe1b6363a69 100644 --- a/coordination/watcher/watcher.go +++ b/coordination/watcher/watcher.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "log" - "math/rand" "sync" "time" @@ -111,95 +110,9 @@ func NewNotifyWatchable(ctx context.Context, w Watchable) NotifyWatchable { } } -// A StringSet is a particular type of Watchable where we are only -// interested in the entry values, not their keys. Furthermore, the -// values are plain strings. This is useful, for instance, for a list -// of IP addresses. -// -// The StringSet keeps track of the keys, so if an entry changes value -// we can handle it correctly (the previous value is removed from the -// set). -// -// All methods are goroutine-safe. The nil value represents an empty -// but valid set. -// -type StringSet struct { - mx sync.Mutex - m map[string]string - l []string -} - -// Reset the contents of the set. Implements the Watchable interface. -func (s *StringSet) Reset(m map[string]string) { - s.mx.Lock() - s.m = m - s.l = nil - for _, value := range m { - s.l = append(s.l, value) - } - s.mx.Unlock() -} - -// Add an entry to the set. Implements the Watchable interface. -func (s *StringSet) Add(key, value string) { - s.mx.Lock() - if s.m == nil { - s.m = make(map[string]string) - } - oldValue, ok := s.m[key] - if ok { - s.deleteValue(oldValue) - } - s.m[key] = value - s.l = append(s.l, value) -} - -// Delete an entry from the set. Implements the Watchable interface. -func (s *StringSet) Delete(key string) { - s.mx.Lock() - defer s.mx.Unlock() - - if s.m == nil { - return - } - oldValue, ok := s.m[key] - if !ok { - return - } - s.deleteValue(oldValue) - delete(s.m, key) -} - -func (s *StringSet) deleteValue(oldValue string) { - var l []string - for _, value := range s.l { - if value != oldValue { - l = append(l, value) - } - } - s.l = l -} - -// RandomEntry returns a random entry from those currently active. If -// the list is empty, the empty string is returned. -func (s *StringSet) RandomEntry() string { - s.mx.Lock() - defer s.mx.Unlock() - if len(s.l) == 0 { - return "" - } - return s.l[rand.Intn(len(s.l))] -} - -// Values returns all the currently known values. -func (s *StringSet) Values() []string { - s.mx.Lock() - defer s.mx.Unlock() - // Try to make a copy. - return append([]string{}, s.l...) -} - func watchOnce(ctx context.Context, cli *clientv3.Client, prefix string, target Watchable) error { + plen := len(prefix) + // First populate the target with a prefix-ranged Get and call // Reset because we have all the authoritative data. resp, err := cli.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) @@ -208,7 +121,8 @@ func watchOnce(ctx context.Context, cli *clientv3.Client, prefix string, target } m := make(map[string]string) for _, ev := range resp.Kvs { - m[string(ev.Key)] = string(ev.Value) + key := string(ev.Key)[plen:] + m[key] = string(ev.Value) } target.Reset(m) rev := resp.Header.Revision @@ -218,11 +132,12 @@ func watchOnce(ctx context.Context, cli *clientv3.Client, prefix string, target for resp := range rch { for _, ev := range resp.Events { fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) + key := string(ev.Kv.Key)[plen:] switch ev.Type { case clientv3.EventTypePut: - target.Set(string(ev.Kv.Key), string(ev.Kv.Value)) + target.Set(key, string(ev.Kv.Value)) case clientv3.EventTypeDelete: - target.Delete(string(ev.Kv.Key)) + target.Delete(key) } } }