Commit 05ad3165 authored by ale's avatar ale

Simplify the internal cache API

parent 7edf17df
Pipeline #4382 passed with stages
in 4 minutes and 47 seconds
...@@ -18,7 +18,18 @@ var ( ...@@ -18,7 +18,18 @@ var (
cleanupInterval = 180 * time.Second cleanupInterval = 180 * time.Second
) )
// cacheBackend implements a simple in-memory cache of user objects // This is the interface for the actual cache, an in-memory LRU cache
// with optional replicated invalidation.
type internalCache interface {
Get(string) (interface{}, bool)
Set(string, interface{}, time.Duration)
Delete(string)
}
// CacheBackend is both an accountserver Backend and a http.Handler
// that implements the replicated invalidation RPCs.
//
// CacheBackend implements a simple in-memory cache of user objects
// (not resources yet), in order to reduce the database and processing // (not resources yet), in order to reduce the database and processing
// load in presence of a heavily read-oriented workload. The cache is // load in presence of a heavily read-oriented workload. The cache is
// very simple, and any update to a user or its resources cause us to // very simple, and any update to a user or its resources cause us to
...@@ -28,54 +39,62 @@ var ( ...@@ -28,54 +39,62 @@ var (
// cleaned up. Memory usage thus depends on the load and is difficult // cleaned up. Memory usage thus depends on the load and is difficult
// to estimate in advance. // to estimate in advance.
// //
type cacheBackend struct { type CacheBackend struct {
as.Backend
*replicatedCache
cache *cache.Cache
}
// Cache is both an accountserver Backend and a http.Handler that
// implements the replicated invalidation RPCs.
type Cache interface {
as.Backend as.Backend
http.Handler http.Handler
cache internalCache
} }
// Wrap a Backend with a cache. // Wrap a Backend with a cache.
func Wrap(b as.Backend, peers []string, tls *clientutil.TLSClientConfig) (Cache, error) { func Wrap(b as.Backend, peers []string, tls *clientutil.TLSClientConfig) (*CacheBackend, error) {
c := cache.New(defaultExpiration, cleanupInterval) var h http.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
http.NotFound(w, r)
})
repl, err := newReplicatedCache(c, peers, tls) var c internalCache = cache.New(defaultExpiration, cleanupInterval)
if err != nil {
return nil, err if len(peers) > 0 {
rc, err := newCacheWithReplicatedInvalidation(c, peers, tls)
if err != nil {
return nil, err
}
c = rc
h = rc
} }
return &cacheBackend{ return &CacheBackend{
Backend: b, Backend: b,
replicatedCache: repl, Handler: h,
cache: c, cache: c,
}, nil }, nil
} }
func (b *cacheBackend) NewTransaction() (as.TX, error) { // NewTransaction returns a new accountserver.TX unit-of-work object.
func (b *CacheBackend) NewTransaction() (as.TX, error) {
innerTX, err := b.Backend.NewTransaction() innerTX, err := b.Backend.NewTransaction()
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &cacheTX{ return &cacheTX{
TX: innerTX, TX: innerTX,
cache: b.cache, cache: b.cache,
invalidateUser: b.invalidateUser,
}, nil }, nil
} }
// The cacheTX type wraps an accountserver.TX object. // The cacheTX type wraps an accountserver.TX object.
type cacheTX struct { type cacheTX struct {
as.TX as.TX
cache *cache.Cache cache internalCache
invalidateUser func(string) }
func (c *cacheTX) invalidateUser(username string) {
c.cache.Delete(username)
} }
// Updates to the cache are controlled by a singleflight.Group to
// ensure that we only update each user once even with multiple
// callers.
var update singleflight.Group var update singleflight.Group
func (c *cacheTX) GetUser(ctx context.Context, name string) (*as.RawUser, error) { func (c *cacheTX) GetUser(ctx context.Context, name string) (*as.RawUser, error) {
......
...@@ -9,7 +9,6 @@ import ( ...@@ -9,7 +9,6 @@ import (
"git.autistici.org/ai3/go-common/clientutil" "git.autistici.org/ai3/go-common/clientutil"
"git.autistici.org/ai3/go-common/serverutil" "git.autistici.org/ai3/go-common/serverutil"
"github.com/patrickmn/go-cache"
) )
const InvalidateURLPath = "/api/internal/cache_invalidate" const InvalidateURLPath = "/api/internal/cache_invalidate"
...@@ -23,11 +22,12 @@ type InvalidateUserRequest struct { ...@@ -23,11 +22,12 @@ type InvalidateUserRequest struct {
var emptyResponse = struct{}{} var emptyResponse = struct{}{}
type replicatedCache struct { type replicatedCache struct {
internalCache
peers map[string]clientutil.Backend peers map[string]clientutil.Backend
cache *cache.Cache
} }
func newReplicatedCache(cache *cache.Cache, peers []string, tls *clientutil.TLSClientConfig) (*replicatedCache, error) { func newCacheWithReplicatedInvalidation(c internalCache, peers []string, tls *clientutil.TLSClientConfig) (*replicatedCache, error) {
peerBackends := make(map[string]clientutil.Backend) peerBackends := make(map[string]clientutil.Backend)
for _, peerURL := range peers { for _, peerURL := range peers {
b, err := clientutil.NewBackend(&clientutil.BackendConfig{ b, err := clientutil.NewBackend(&clientutil.BackendConfig{
...@@ -40,16 +40,19 @@ func newReplicatedCache(cache *cache.Cache, peers []string, tls *clientutil.TLSC ...@@ -40,16 +40,19 @@ func newReplicatedCache(cache *cache.Cache, peers []string, tls *clientutil.TLSC
peerBackends[peerURL] = b peerBackends[peerURL] = b
} }
return &replicatedCache{ return &replicatedCache{
cache: cache, internalCache: c,
peers: peerBackends, peers: peerBackends,
}, nil }, nil
} }
func (rc *replicatedCache) invalidateUser(username string) { func (rc *replicatedCache) Delete(username string) {
// Delete the user entry from the local cache.
rc.internalCache.Delete(username)
ctx, cancel := context.WithTimeout(context.Background(), rpcTimeout) ctx, cancel := context.WithTimeout(context.Background(), rpcTimeout)
defer cancel() defer cancel()
// Reach out to all peers in parallel and send an InvalidateUserRequest. // Reach out to all peers in parallel with an InvalidateUserRequest.
var wg sync.WaitGroup var wg sync.WaitGroup
for peerURL, peer := range rc.peers { for peerURL, peer := range rc.peers {
wg.Add(1) wg.Add(1)
...@@ -71,7 +74,7 @@ func (rc *replicatedCache) ServeHTTP(w http.ResponseWriter, r *http.Request) { ...@@ -71,7 +74,7 @@ func (rc *replicatedCache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
// Remove user data from the local cache. // Remove user data from the local cache.
rc.cache.Delete(req.Username) rc.internalCache.Delete(req.Username)
serverutil.EncodeJSONResponse(w, emptyResponse) serverutil.EncodeJSONResponse(w, emptyResponse)
} }
...@@ -175,7 +175,7 @@ func main() { ...@@ -175,7 +175,7 @@ func main() {
// also acts as a http.Handler for the replicated cache // also acts as a http.Handler for the replicated cache
// invalidation RPC, so we're going to have to route its endpoint // invalidation RPC, so we're going to have to route its endpoint
// on the main Server later. // on the main Server later.
var cache cachebackend.Cache var cache *cachebackend.CacheBackend
if config.Cache.Enabled { if config.Cache.Enabled {
cache, err = cachebackend.Wrap(be, config.Replication.Peers, config.Replication.TLS) cache, err = cachebackend.Wrap(be, config.Replication.Peers, config.Replication.TLS)
if err != nil { if err != nil {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment