Commit 7edf17df authored by ale's avatar ale

Add distributed cache invalidation (optimistic)

parent 9cbc4225
Pipeline #4239 passed with stages
in 4 minutes and 49 seconds
......@@ -155,6 +155,10 @@ The configuration is stored in a YAML file, by default
requires the *time*, *mem* and *threads* parameters (defaults
to 1/4/4); *scrypt* requires *n*, *r* and *p* (defaults
16384/8/1)
* `cache`: cache configuration
* `enabled`: if set to *true*, enable a cache for User objects. Very
useful to reduce latencies for backends with complex queries like
LDAP (default *false*, cache is disabled).
## Distributed operation
......@@ -205,3 +209,25 @@ The performance of the above is strictly not worse than that of the
underlying storage, except for the possibility of serving stale data
whenever we lose an invalidation request due to network trouble. This
is generally an acceptable risk for our upstream applications.
### Configuration
To enable distributed operations set attributes below the
*replication* configuration variable:
* `replication`
* `leader_url`: URL of the *leader* accountserver instance. When
this field is set, write requests to this instance will be
forwarded (transparently to the caller) to this URL.
* `peers`: list of peer URLs for the other accountserver
instances. Do not include the current instance in this list, or
you will create unexpected feedback loops.
* `tls`: client TLS configuration
* `cert`: path to the server certificate
* `key`: path to the server's private key
* `ca`: path to the CA used to validate clients
Note that setting *peers* is only necessary if the cache is enabled
(see the *Configuration* section above). Due to implementation
details, all instances should share the same setting for
*cache.enabled*.
......@@ -2,12 +2,14 @@ package cachebackend
import (
"context"
"net/http"
"time"
"github.com/patrickmn/go-cache"
"golang.org/x/sync/singleflight"
as "git.autistici.org/ai3/accountserver"
"git.autistici.org/ai3/go-common/clientutil"
ct "git.autistici.org/ai3/go-common/ldap/compositetypes"
)
......@@ -28,16 +30,31 @@ var (
//
type cacheBackend struct {
as.Backend
*replicatedCache
cache *cache.Cache
}
// Cache is both an accountserver Backend and a http.Handler that
// implements the replicated invalidation RPCs.
type Cache interface {
as.Backend
http.Handler
}
// Wrap a Backend with a cache.
func Wrap(b as.Backend) as.Backend {
func Wrap(b as.Backend, peers []string, tls *clientutil.TLSClientConfig) (Cache, error) {
c := cache.New(defaultExpiration, cleanupInterval)
return &cacheBackend{
Backend: b,
cache: c,
repl, err := newReplicatedCache(c, peers, tls)
if err != nil {
return nil, err
}
return &cacheBackend{
Backend: b,
replicatedCache: repl,
cache: c,
}, nil
}
func (b *cacheBackend) NewTransaction() (as.TX, error) {
......@@ -45,16 +62,18 @@ func (b *cacheBackend) NewTransaction() (as.TX, error) {
if err != nil {
return nil, err
}
return &cacheTX{TX: innerTX, cache: b.cache}, nil
return &cacheTX{
TX: innerTX,
cache: b.cache,
invalidateUser: b.invalidateUser,
}, nil
}
// The cacheTX type wraps an accountserver.TX object.
type cacheTX struct {
as.TX
cache *cache.Cache
}
func (c *cacheTX) invalidateUser(username string) {
c.cache.Delete(username)
cache *cache.Cache
invalidateUser func(string)
}
var update singleflight.Group
......
package cachebackend
import (
"context"
"log"
"net/http"
"sync"
"time"
"git.autistici.org/ai3/go-common/clientutil"
"git.autistici.org/ai3/go-common/serverutil"
"github.com/patrickmn/go-cache"
)
const InvalidateURLPath = "/api/internal/cache_invalidate"
var rpcTimeout = 5 * time.Second
type InvalidateUserRequest struct {
Username string `json:"username"`
}
var emptyResponse = struct{}{}
type replicatedCache struct {
peers map[string]clientutil.Backend
cache *cache.Cache
}
func newReplicatedCache(cache *cache.Cache, peers []string, tls *clientutil.TLSClientConfig) (*replicatedCache, error) {
peerBackends := make(map[string]clientutil.Backend)
for _, peerURL := range peers {
b, err := clientutil.NewBackend(&clientutil.BackendConfig{
URL: peerURL,
TLSConfig: tls,
})
if err != nil {
return nil, err
}
peerBackends[peerURL] = b
}
return &replicatedCache{
cache: cache,
peers: peerBackends,
}, nil
}
func (rc *replicatedCache) invalidateUser(username string) {
ctx, cancel := context.WithTimeout(context.Background(), rpcTimeout)
defer cancel()
// Reach out to all peers in parallel and send an InvalidateUserRequest.
var wg sync.WaitGroup
for peerURL, peer := range rc.peers {
wg.Add(1)
go func(peerURL string, peer clientutil.Backend) {
defer wg.Done()
if err := peer.Call(ctx, InvalidateURLPath, "", &InvalidateUserRequest{Username: username}, nil); err != nil {
log.Printf("error invalidating cache for %s on %s: %v", username, peerURL, err)
}
}(peerURL, peer)
}
wg.Wait()
}
func (rc *replicatedCache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var req InvalidateUserRequest
if !serverutil.DecodeJSONRequest(w, r, &req) {
http.Error(w, "bad request", http.StatusBadRequest)
return
}
// Remove user data from the local cache.
rc.cache.Delete(req.Username)
serverutil.EncodeJSONResponse(w, emptyResponse)
}
......@@ -171,8 +171,17 @@ func main() {
log.Fatal(err)
}
// If the cache is enabled, create the cache backend wrapper. It
// also acts as a http.Handler for the replicated cache
// invalidation RPC, so we're going to have to route its endpoint
// on the main Server later.
var cache cachebackend.Cache
if config.Cache.Enabled {
be = cachebackend.Wrap(be)
cache, err = cachebackend.Wrap(be, config.Replication.Peers, config.Replication.TLS)
if err != nil {
log.Fatal(err)
}
be = cache
}
service, err := accountserver.NewAccountService(be, &config.AccountServerConfig)
......@@ -184,7 +193,11 @@ func main() {
if err != nil {
log.Fatal(err)
}
if cache != nil {
as.Handle(cachebackend.InvalidateURLPath, cache)
}
// Start the HTTP server.
if err := serverutil.Serve(as, config.ServerConfig, *addr); err != nil {
log.Fatal(err)
}
......
......@@ -120,7 +120,10 @@ func startServiceWithConfigAndCache(t testing.TB, svcConfig as.Config, enableCac
t.Fatal("NewLDAPBackend", err)
}
if enableCache {
be = cachebackend.Wrap(be)
be, err = cachebackend.Wrap(be, nil, nil)
if err != nil {
t.Fatal("cachebackend.Wrap", err)
}
}
ssoStop, signer, ssoPubKeyFile := withSSO(t)
......
......@@ -88,7 +88,6 @@ func (r *actionRegistry) ServeHTTP(w http.ResponseWriter, httpReq *http.Request)
// implements the http.Handler interface.
type APIServer struct {
*http.ServeMux
//*actionRegistry
}
type apiEndpoint struct {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment