Commit c9a6cec5 authored by ale's avatar ale

Add DNS resolution support to the memcache client

Meant to integrate well with DNS-based service discovery: the client
will constantly monitor the DNS results for changes, and dynamically
modify the list of memcache backends.
parent 61875346
Pipeline #1387 passed with stages
in 1 minute and 30 seconds
......@@ -397,7 +397,11 @@ func NewServer(config *Config) (*Server, error) {
var cache cacheClient
if len(config.MemcacheServers) > 0 {
cache = newMemcacheReplicatedClient(config.MemcacheServers)
var err error
cache, err = newMemcacheReplicatedClient(config.MemcacheServers)
if err != nil {
return nil, err
}
} else {
cache = newInprocessCache()
}
......
......@@ -5,10 +5,16 @@ import (
"sync"
"time"
"git.autistici.org/ai3/go-common/clientutil"
"github.com/bradfitz/gomemcache/memcache"
cache "github.com/patrickmn/go-cache"
)
var (
memcacheTimeout = 10 * time.Second
memcacheMaxIdleConns = 3
)
// Client for a short-term cache.
//
// Data should only be consistent over a short period of time, and the
......@@ -21,20 +27,60 @@ type cacheClient interface {
readAny(string) ([]byte, bool)
}
// A cacheClient that uses one or more memcached servers.
// A cacheClient that uses one or more memcached servers. DNS names
// will be watched and the backend list will be updated dynamically if
// the addresses change.
type memcacheReplicatedClient struct {
caches []*memcache.Client
watcher *clientutil.MultiDNSWatcher
mx sync.RWMutex
caches []*memcache.Client
initialized bool
initCh chan struct{}
}
func newMemcacheReplicatedClient(servers []string) *memcacheReplicatedClient {
var m memcacheReplicatedClient
for _, s := range servers {
c := memcache.New(s)
c.Timeout = u2fClientTimeout
c.MaxIdleConns = u2fClientMaxIdleConns
m.caches = append(m.caches, c)
func newMemcacheReplicatedClient(addrs []string) (*memcacheReplicatedClient, error) {
w, err := clientutil.NewMultiDNSWatcher(addrs)
if err != nil {
return nil, err
}
// Initialize our client with the DNS watcher, and wait until
// we have at least one address to connect to.
m := &memcacheReplicatedClient{
initCh: make(chan struct{}),
watcher: w,
}
go m.watchForChanges()
<-m.initCh
return m, nil
}
func (m *memcacheReplicatedClient) clients() []*memcache.Client {
m.mx.RLock()
defer m.mx.RUnlock()
return m.caches
}
// Watch for DNS changes and rebuild the list of memcache clients.
func (m *memcacheReplicatedClient) watchForChanges() {
for addrs := range m.watcher.Changes() {
var caches []*memcache.Client
for _, addr := range addrs {
c := memcache.New(addr)
c.Timeout = memcacheTimeout
c.MaxIdleConns = memcacheMaxIdleConns
caches = append(caches, c)
}
m.mx.Lock()
m.caches = caches
if !m.initialized {
m.initialized = true
close(m.initCh)
}
m.mx.Unlock()
}
return &m
}
func (m *memcacheReplicatedClient) writeAll(key string, value []byte, ttl int) error {
......@@ -45,15 +91,16 @@ func (m *memcacheReplicatedClient) writeAll(key string, value []byte, ttl int) e
}
// Write to the memcache servers. At least one write must succeed.
ch := make(chan error, len(m.caches))
caches := m.clients()
ch := make(chan error, len(caches))
defer close(ch)
for _, c := range m.caches {
for _, c := range caches {
go func(c *memcache.Client) {
ch <- c.Set(item)
}(c)
}
var ok bool
for i := 0; i < len(m.caches); i++ {
for i := 0; i < len(caches); i++ {
if err := <-ch; err == nil {
ok = true
}
......@@ -75,7 +122,7 @@ func (m *memcacheReplicatedClient) readAny(key string) ([]byte, bool) {
ch := make(chan []byte, 1)
var wg sync.WaitGroup
for _, c := range m.caches {
for _, c := range m.clients() {
wg.Add(1)
go func(c *memcache.Client) {
defer wg.Done()
......
......@@ -40,6 +40,8 @@ type cacheDatum struct {
deadline time.Time
}
var dnsCacheTTL = 1 * time.Minute
type dnsCache struct {
resolver resolver
sf singleflight.Group
......@@ -72,7 +74,7 @@ func (c *dnsCache) update(host string) []string {
c.mx.Lock()
c.cache[host] = cacheDatum{
addrs: addrs,
deadline: time.Now().Add(60 * time.Second),
deadline: time.Now().Add(dnsCacheTTL),
}
c.mx.Unlock()
return addrs, nil
......
package clientutil
import (
"fmt"
"sync"
"time"
)
var dnsWatcherInterval = 1 * time.Minute
// A DNSWatcher monitors a DNS name for changes, constantly attempting
// to resolve it every minute and notifying a channel when the list of
// returned IP addresses changes. All addresses must be in host:port
// format.
type DNSWatcher struct {
hostport string
resolver resolver
addrs []string
updateCh chan []string
stopCh chan struct{}
}
// NewDNSWatcher creates a new DNSWatcher.
func NewDNSWatcher(hostport string) (*DNSWatcher, error) {
return newDNSWatcherWithResolver(hostport, defaultResolver)
}
func newDNSWatcherWithResolver(hostport string, resolver resolver) (*DNSWatcher, error) {
// Resolve names once before returning. Return a fatal error
// when there are no results, as it may indicate a syntax
// error in hostport.
addrs := resolver.ResolveIP(hostport)
if len(addrs) == 0 {
return nil, fmt.Errorf("can't resolve %s", hostport)
}
w := &DNSWatcher{
hostport: hostport,
resolver: resolver,
addrs: addrs,
updateCh: make(chan []string, 10),
stopCh: make(chan struct{}),
}
w.updateCh <- addrs
go w.loop()
return w, nil
}
// Stop the watcher.
func (w *DNSWatcher) Stop() {
close(w.stopCh)
}
// Changes returns a channel where the resolved addresses are sent
// whenever they change.
func (w *DNSWatcher) Changes() <-chan []string {
return w.updateCh
}
func (w *DNSWatcher) check() {
addrs := w.resolver.ResolveIP(w.hostport)
if len(addrs) > 0 && !addrListEqual(addrs, w.addrs) {
w.addrs = addrs
w.updateCh <- addrs
}
}
func (w *DNSWatcher) loop() {
defer close(w.updateCh)
tick := time.NewTicker(dnsWatcherInterval)
defer tick.Stop()
for {
select {
case <-tick.C:
w.check()
case <-w.stopCh:
return
}
}
}
type multiDNSUpdate struct {
hostport string
addrs []string
}
// A MultiDNSWatcher watches multiple addresses for DNS changes. The
// results are merged and returned as a list of addresses.
type MultiDNSWatcher struct {
watchers []*DNSWatcher
addrmap map[string][]string
faninCh chan multiDNSUpdate
updateCh chan []string
}
// NewMultiDNSWatcher creates a new MultiDNSWatcher.
func NewMultiDNSWatcher(hostports []string) (*MultiDNSWatcher, error) {
return newMultiDNSWatcherWithResolver(hostports, defaultResolver)
}
func newMultiDNSWatcherWithResolver(hostports []string, resolver resolver) (*MultiDNSWatcher, error) {
mw := &MultiDNSWatcher{
addrmap: make(map[string][]string),
faninCh: make(chan multiDNSUpdate, 10),
updateCh: make(chan []string, 10),
}
// All the MultiDNSWatcher does is multiplex updates from the
// individual DNSWatchers onto faninCh, then merging those
// updates with all the others and sending the result to
// updateCh.
go func() {
defer close(mw.updateCh)
for up := range mw.faninCh {
mw.addrmap[up.hostport] = up.addrs
mw.updateCh <- mw.allAddrs()
}
}()
var wg sync.WaitGroup
for _, hostport := range hostports {
w, err := newDNSWatcherWithResolver(hostport, resolver)
if err != nil {
return nil, err
}
mw.watchers = append(mw.watchers, w)
wg.Add(1)
go func(hostport string) {
for addrs := range w.Changes() {
mw.faninCh <- multiDNSUpdate{
hostport: hostport,
addrs: addrs,
}
}
wg.Done()
}(hostport)
}
go func() {
wg.Wait()
close(mw.faninCh)
}()
return mw, nil
}
func (mw *MultiDNSWatcher) allAddrs() []string {
var out []string
for _, addrs := range mw.addrmap {
out = append(out, addrs...)
}
return out
}
// Stop the watcher.
func (mw *MultiDNSWatcher) Stop() {
for _, w := range mw.watchers {
w.Stop()
}
}
// Changes returns a channel where the aggregate resolved addresses
// are sent whenever they change.
func (mw *MultiDNSWatcher) Changes() <-chan []string {
return mw.updateCh
}
func addrListEqual(a, b []string) bool {
if len(a) != len(b) {
return false
}
tmp := make(map[string]struct{})
for _, aa := range a {
tmp[aa] = struct{}{}
}
for _, bb := range b {
if _, ok := tmp[bb]; !ok {
return false
}
delete(tmp, bb)
}
return len(tmp) == 0
}
......@@ -5,32 +5,32 @@
{
"checksumSHA1": "pLvPnUablirQucyALgrso9hLG4E=",
"path": "git.autistici.org/ai3/go-common",
"revision": "232cb4db4b1a9c57075dcdab7f2d8dfdf7590ce5",
"revisionTime": "2018-08-28T06:59:35Z"
"revision": "717f4ef5c5a16347d8875a6b300967d6f4c682f0",
"revisionTime": "2018-10-26T09:01:12Z"
},
{
"checksumSHA1": "WxcDAOyeiMJa5QyJAhsl6swy8ks=",
"checksumSHA1": "Xd4ClmFykFMOg8b2ZFXimSS3Uj0=",
"path": "git.autistici.org/ai3/go-common/clientutil",
"revision": "232cb4db4b1a9c57075dcdab7f2d8dfdf7590ce5",
"revisionTime": "2018-08-28T06:59:35Z"
"revision": "717f4ef5c5a16347d8875a6b300967d6f4c682f0",
"revisionTime": "2018-10-26T09:01:12Z"
},
{
"checksumSHA1": "kQbBWZqrXc95wodlrOKEshQVaBo=",
"path": "git.autistici.org/ai3/go-common/ldap",
"revision": "232cb4db4b1a9c57075dcdab7f2d8dfdf7590ce5",
"revisionTime": "2018-08-28T06:59:35Z"
"revision": "717f4ef5c5a16347d8875a6b300967d6f4c682f0",
"revisionTime": "2018-10-26T09:01:12Z"
},
{
"checksumSHA1": "mfFIqmwojDqQdJvjLI3y7YCQ+2c=",
"path": "git.autistici.org/ai3/go-common/pwhash",
"revision": "232cb4db4b1a9c57075dcdab7f2d8dfdf7590ce5",
"revisionTime": "2018-08-28T06:59:35Z"
"revision": "717f4ef5c5a16347d8875a6b300967d6f4c682f0",
"revisionTime": "2018-10-26T09:01:12Z"
},
{
"checksumSHA1": "T2vf4xzKRqoIjfXlofMgudKA8rA=",
"path": "git.autistici.org/ai3/go-common/unix",
"revision": "232cb4db4b1a9c57075dcdab7f2d8dfdf7590ce5",
"revisionTime": "2018-08-28T06:59:35Z"
"revision": "717f4ef5c5a16347d8875a6b300967d6f4c682f0",
"revisionTime": "2018-10-26T09:01:12Z"
},
{
"checksumSHA1": "7Kbb9vTjqcQhhxtSGpmp9rk6PUk=",
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment