Commit 48fa0495 authored by ale's avatar ale

Merge branch 'distrib' into 'master'

Distrib

Closes #9

See merge request !6
parents b790a71a a428be94
Pipeline #4384 passed with stages
in 4 minutes and 46 seconds
...@@ -155,3 +155,79 @@ The configuration is stored in a YAML file, by default ...@@ -155,3 +155,79 @@ The configuration is stored in a YAML file, by default
requires the *time*, *mem* and *threads* parameters (defaults requires the *time*, *mem* and *threads* parameters (defaults
to 1/4/4); *scrypt* requires *n*, *r* and *p* (defaults to 1/4/4); *scrypt* requires *n*, *r* and *p* (defaults
16384/8/1) 16384/8/1)
* `cache`: cache configuration
* `enabled`: if set to *true*, enable a cache for User objects. Very
useful to reduce latencies for backends with complex queries like
LDAP (default *false*, cache is disabled).
## Distributed operation
In a distributed scenario it might make sense to run multiple
instances of the accountserver for reliability. The accountserver
however is not a distributed application and does not include a
mechanism for managing consensus: furthermore it relies on the
characteristics of the underlying storage, which aren't under the
accountserver's control (consider for instance the case of using a SQL
or LDAP database with asynchronous replication).
The accountserver load is heavily skewed towards reads, and the read
and write paths have very different operational characteristics:
writes require a centralized accountserver, due to the presence of
many read-modify-update cycles in our API. The only way to improve
this is to use some highly-available, serialized storage. Writes,
however, are infrequent, and are not critical to the operation of the
accountserver clients. Reads, instead, are very frequent and require
caching for performance and latency reasons. It follows that
prioritizing reads over writes would be a reasonable graceful
degradation policy for the service.
If the storage layer has any form of read-only high-availability (much
easier to achieve, this would be the case for most
asynchronously-replicated setups for instance), this is exploitable by
the accountserver by providing high-availability for the read path,
which is basically a distributed caching problem. Given the tolerances
of the upstream applications, the only real issue is the necessary
connection between write path and read path which is required for
cache invalidation on writes.
The simplest way to make this work is the following:
* assume that the full configuration is available to each
accountserver at all times: that is, each accountserver instance has
a list of all the other accountserver instances;
* one of the accountserver instances is selected as the *leader* by
some external mechanism (including manually);
* write requests are always forwarded to the leader: this keeps the
client API simple, requiring no awareness of the accountserver
topology;
* every accountserver instance maintains its own read cache, and reads
are always served by the local accountserver, never forwarded;
* the leader accountserver, whenever it accepts a write, sends cache
invalidation requests to every other accountserver instance.
The performance of the above is strictly not worse than that of the
underlying storage, except for the possibility of serving stale data
whenever we lose an invalidation request due to network trouble. This
is generally an acceptable risk for our upstream applications.
### Configuration
To enable distributed operations set attributes below the
*replication* configuration variable:
* `replication`
* `leader_url`: URL of the *leader* accountserver instance. When
this field is set, write requests to this instance will be
forwarded (transparently to the caller) to this URL.
* `peers`: list of peer URLs for the other accountserver
instances. Do not include the current instance in this list, or
you will create unexpected feedback loops.
* `tls`: client TLS configuration
* `cert`: path to the server certificate
* `key`: path to the server's private key
* `ca`: path to the CA used to validate clients
Note that setting *peers* is only necessary if the cache is enabled
(see the *Configuration* section above). Due to implementation
details, all instances should share the same setting for
*cache.enabled*.
...@@ -2,12 +2,14 @@ package cachebackend ...@@ -2,12 +2,14 @@ package cachebackend
import ( import (
"context" "context"
"net/http"
"time" "time"
"github.com/patrickmn/go-cache" "github.com/patrickmn/go-cache"
"golang.org/x/sync/singleflight" "golang.org/x/sync/singleflight"
as "git.autistici.org/ai3/accountserver" as "git.autistici.org/ai3/accountserver"
"git.autistici.org/ai3/go-common/clientutil"
ct "git.autistici.org/ai3/go-common/ldap/compositetypes" ct "git.autistici.org/ai3/go-common/ldap/compositetypes"
) )
...@@ -16,7 +18,18 @@ var ( ...@@ -16,7 +18,18 @@ var (
cleanupInterval = 180 * time.Second cleanupInterval = 180 * time.Second
) )
// cacheBackend implements a simple in-memory cache of user objects // This is the interface for the actual cache, an in-memory LRU cache
// with optional replicated invalidation.
type internalCache interface {
Get(string) (interface{}, bool)
Set(string, interface{}, time.Duration)
Delete(string)
}
// CacheBackend is both an accountserver Backend and a http.Handler
// that implements the replicated invalidation RPCs.
//
// CacheBackend implements a simple in-memory cache of user objects
// (not resources yet), in order to reduce the database and processing // (not resources yet), in order to reduce the database and processing
// load in presence of a heavily read-oriented workload. The cache is // load in presence of a heavily read-oriented workload. The cache is
// very simple, and any update to a user or its resources cause us to // very simple, and any update to a user or its resources cause us to
...@@ -26,37 +39,62 @@ var ( ...@@ -26,37 +39,62 @@ var (
// cleaned up. Memory usage thus depends on the load and is difficult // cleaned up. Memory usage thus depends on the load and is difficult
// to estimate in advance. // to estimate in advance.
// //
type cacheBackend struct { type CacheBackend struct {
as.Backend as.Backend
cache *cache.Cache http.Handler
cache internalCache
} }
// Wrap a Backend with a cache. // Wrap a Backend with a cache.
func Wrap(b as.Backend) as.Backend { func Wrap(b as.Backend, peers []string, tls *clientutil.TLSClientConfig) (*CacheBackend, error) {
c := cache.New(defaultExpiration, cleanupInterval) var h http.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
return &cacheBackend{ http.NotFound(w, r)
Backend: b, })
cache: c,
var c internalCache = cache.New(defaultExpiration, cleanupInterval)
if len(peers) > 0 {
rc, err := newCacheWithReplicatedInvalidation(c, peers, tls)
if err != nil {
return nil, err
}
c = rc
h = rc
} }
return &CacheBackend{
Backend: b,
Handler: h,
cache: instrument(c),
}, nil
} }
func (b *cacheBackend) NewTransaction() (as.TX, error) { // NewTransaction returns a new accountserver.TX unit-of-work object.
func (b *CacheBackend) NewTransaction() (as.TX, error) {
innerTX, err := b.Backend.NewTransaction() innerTX, err := b.Backend.NewTransaction()
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &cacheTX{TX: innerTX, cache: b.cache}, nil return &cacheTX{
TX: innerTX,
cache: b.cache,
}, nil
} }
// The cacheTX type wraps an accountserver.TX object.
type cacheTX struct { type cacheTX struct {
as.TX as.TX
cache *cache.Cache cache internalCache
} }
func (c *cacheTX) invalidateUser(username string) { func (c *cacheTX) invalidateUser(username string) {
c.cache.Delete(username) c.cache.Delete(username)
} }
// Updates to the cache are controlled by a singleflight.Group to
// ensure that we only update each user once even with multiple
// callers.
var update singleflight.Group var update singleflight.Group
func (c *cacheTX) GetUser(ctx context.Context, name string) (*as.RawUser, error) { func (c *cacheTX) GetUser(ctx context.Context, name string) (*as.RawUser, error) {
......
package cachebackend
import (
"context"
"log"
"net/http"
"sync"
"time"
"git.autistici.org/ai3/go-common/clientutil"
"git.autistici.org/ai3/go-common/serverutil"
)
const InvalidateURLPath = "/api/internal/cache_invalidate"
var rpcTimeout = 5 * time.Second
type InvalidateUserRequest struct {
Username string `json:"username"`
}
var emptyResponse = struct{}{}
type replicatedCache struct {
internalCache
peers map[string]clientutil.Backend
}
func newCacheWithReplicatedInvalidation(c internalCache, peers []string, tls *clientutil.TLSClientConfig) (*replicatedCache, error) {
peerBackends := make(map[string]clientutil.Backend)
for _, peerURL := range peers {
b, err := clientutil.NewBackend(&clientutil.BackendConfig{
URL: peerURL,
TLSConfig: tls,
})
if err != nil {
return nil, err
}
peerBackends[peerURL] = b
}
return &replicatedCache{
internalCache: c,
peers: peerBackends,
}, nil
}
func (rc *replicatedCache) Delete(username string) {
// Delete the user entry from the local cache.
rc.internalCache.Delete(username)
ctx, cancel := context.WithTimeout(context.Background(), rpcTimeout)
defer cancel()
// Reach out to all peers in parallel with an InvalidateUserRequest.
var wg sync.WaitGroup
for peerURL, peer := range rc.peers {
wg.Add(1)
go func(peerURL string, peer clientutil.Backend) {
defer wg.Done()
if err := peer.Call(ctx, InvalidateURLPath, "", &InvalidateUserRequest{Username: username}, nil); err != nil {
log.Printf("error invalidating cache for %s on %s: %v", username, peerURL, err)
}
}(peerURL, peer)
}
wg.Wait()
}
func (rc *replicatedCache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var req InvalidateUserRequest
if !serverutil.DecodeJSONRequest(w, r, &req) {
http.Error(w, "bad request", http.StatusBadRequest)
return
}
// Remove user data from the local cache.
rc.internalCache.Delete(req.Username)
serverutil.EncodeJSONResponse(w, emptyResponse)
}
package cachebackend
import (
"time"
"github.com/prometheus/client_golang/prometheus"
)
// There's only going to be one cache per server process in production
// so we can just use globals for instrumentation metrics.
var (
cacheHits = prometheus.NewCounter(prometheus.CounterOpts{
Name: "accountserver_cache_hits",
Help: "Cache hit counter",
})
cacheMisses = prometheus.NewCounter(prometheus.CounterOpts{
Name: "accountserver_cache_misses",
Help: "Cache miss counter",
})
cacheWrites = prometheus.NewCounter(prometheus.CounterOpts{
Name: "accountserver_cache_writes",
Help: "Cache write counter",
})
cacheDeletes = prometheus.NewCounter(prometheus.CounterOpts{
Name: "accountserver_cache_deletes",
Help: "Cache delete counter",
})
)
func init() {
prometheus.MustRegister(cacheHits)
prometheus.MustRegister(cacheMisses)
prometheus.MustRegister(cacheWrites)
prometheus.MustRegister(cacheDeletes)
}
type instrumentedCache struct {
internalCache
}
func instrument(c internalCache) *instrumentedCache {
return &instrumentedCache{c}
}
func (c *instrumentedCache) Get(key string) (interface{}, bool) {
res, ok := c.internalCache.Get(key)
if ok {
cacheHits.Inc()
} else {
cacheMisses.Inc()
}
return res, ok
}
func (c *instrumentedCache) Set(key string, value interface{}, d time.Duration) {
c.internalCache.Set(key, value, d)
cacheWrites.Inc()
}
func (c *instrumentedCache) Delete(username string) {
c.internalCache.Delete(username)
cacheDeletes.Inc()
}
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
"strings" "strings"
"git.autistici.org/ai3/accountserver" "git.autistici.org/ai3/accountserver"
"git.autistici.org/ai3/go-common/clientutil"
"git.autistici.org/ai3/go-common/pwhash" "git.autistici.org/ai3/go-common/pwhash"
"git.autistici.org/ai3/go-common/serverutil" "git.autistici.org/ai3/go-common/serverutil"
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
...@@ -40,6 +41,13 @@ type config struct { ...@@ -40,6 +41,13 @@ type config struct {
Algo string `yaml:"algo"` Algo string `yaml:"algo"`
Params map[string]int `yaml:"params"` Params map[string]int `yaml:"params"`
} `yaml:"pwhash"` } `yaml:"pwhash"`
// Replication config.
Replication struct {
LeaderURL string `yaml:"leader_url"`
Peers []string `yaml:"peers"`
TLS *clientutil.TLSClientConfig `yaml:"tls"`
} `yaml:"replication"`
} }
func (c *config) validate() error { func (c *config) validate() error {
...@@ -163,8 +171,17 @@ func main() { ...@@ -163,8 +171,17 @@ func main() {
log.Fatal(err) log.Fatal(err)
} }
// If the cache is enabled, create the cache backend wrapper. It
// also acts as a http.Handler for the replicated cache
// invalidation RPC, so we're going to have to route its endpoint
// on the main Server later.
var cache *cachebackend.CacheBackend
if config.Cache.Enabled { if config.Cache.Enabled {
be = cachebackend.Wrap(be) cache, err = cachebackend.Wrap(be, config.Replication.Peers, config.Replication.TLS)
if err != nil {
log.Fatal(err)
}
be = cache
} }
service, err := accountserver.NewAccountService(be, &config.AccountServerConfig) service, err := accountserver.NewAccountService(be, &config.AccountServerConfig)
...@@ -172,8 +189,15 @@ func main() { ...@@ -172,8 +189,15 @@ func main() {
log.Fatal(err) log.Fatal(err)
} }
as := server.New(service, be) as, err := server.New(service, be, config.Replication.LeaderURL, config.Replication.TLS)
if err != nil {
log.Fatal(err)
}
if cache != nil {
as.Handle(cachebackend.InvalidateURLPath, cache)
}
// Start the HTTP server.
if err := serverutil.Serve(as, config.ServerConfig, *addr); err != nil { if err := serverutil.Serve(as, config.ServerConfig, *addr); err != nil {
log.Fatal(err) log.Fatal(err)
} }
......
...@@ -120,7 +120,10 @@ func startServiceWithConfigAndCache(t testing.TB, svcConfig as.Config, enableCac ...@@ -120,7 +120,10 @@ func startServiceWithConfigAndCache(t testing.TB, svcConfig as.Config, enableCac
t.Fatal("NewLDAPBackend", err) t.Fatal("NewLDAPBackend", err)
} }
if enableCache { if enableCache {
be = cachebackend.Wrap(be) be, err = cachebackend.Wrap(be, nil, nil)
if err != nil {
t.Fatal("cachebackend.Wrap", err)
}
} }
ssoStop, signer, ssoPubKeyFile := withSSO(t) ssoStop, signer, ssoPubKeyFile := withSSO(t)
...@@ -152,7 +155,10 @@ func startServiceWithConfigAndCache(t testing.TB, svcConfig as.Config, enableCac ...@@ -152,7 +155,10 @@ func startServiceWithConfigAndCache(t testing.TB, svcConfig as.Config, enableCac
t.Fatal("NewAccountService", err) t.Fatal("NewAccountService", err)
} }
as := server.New(service, be) as, err := server.New(service, be, "", nil)
if err != nil {
t.Fatal("server.New", err)
}
srv := httptest.NewServer(as) srv := httptest.NewServer(as)
c := &testClient{ c := &testClient{
......
package server package server
import ( import (
"crypto/tls"
"encoding/json" "encoding/json"
"fmt" "fmt"
"log" "log"
"net/http" "net/http"
"net/http/httputil"
"net/url"
"reflect" "reflect"
"strings"
"git.autistici.org/ai3/go-common/clientutil"
"git.autistici.org/ai3/go-common/serverutil" "git.autistici.org/ai3/go-common/serverutil"
as "git.autistici.org/ai3/accountserver" as "git.autistici.org/ai3/accountserver"
) )
type actionRegistry struct { type actionRegistry struct {
service *as.AccountService
handlers map[string]reflect.Type handlers map[string]reflect.Type
} }
func newActionRegistry() *actionRegistry { func newActionRegistry(service *as.AccountService) *actionRegistry {
return &actionRegistry{ return &actionRegistry{
service: service,
handlers: make(map[string]reflect.Type), handlers: make(map[string]reflect.Type),
} }
} }
...@@ -34,65 +41,20 @@ func (r *actionRegistry) newRequest(path string) (as.Request, bool) { ...@@ -34,65 +41,20 @@ func (r *actionRegistry) newRequest(path string) (as.Request, bool) {
return reflect.New(h).Interface().(as.Request), true return reflect.New(h).Interface().(as.Request), true
} }
// APIServer is the HTTP API interface to AccountService. func (r *actionRegistry) ServeHTTP(w http.ResponseWriter, httpReq *http.Request) {
type APIServer struct {
*actionRegistry
service *as.AccountService
}
// New creates a new APIServer.
func New(service *as.AccountService, backend as.Backend) *APIServer {
s := &APIServer{
actionRegistry: newActionRegistry(),
service: service,
}
s.Register("/api/user/get", &as.GetUserRequest{})
s.Register("/api/user/search", &as.SearchUserRequest{})
s.Register("/api/user/create", &as.CreateUserRequest{})
s.Register("/api/user/update", &as.UpdateUserRequest{})
s.Register("/api/user/admin_update", &as.AdminUpdateUserRequest{})
s.Register("/api/user/change_password", &as.ChangeUserPasswordRequest{})
s.Register("/api/user/reset_password", &as.ResetPasswordRequest{})
s.Register("/api/user/set_account_recovery_hint", &as.SetAccountRecoveryHintRequest{})
s.Register("/api/user/enable_otp", &as.EnableOTPRequest{})
s.Register("/api/user/disable_otp", &as.DisableOTPRequest{})
s.Register("/api/user/create_app_specific_password", &as.CreateApplicationSpecificPasswordRequest{})
s.Register("/api/user/delete_app_specific_password", &as.DeleteApplicationSpecificPasswordRequest{})
s.Register("/api/resource/get", &as.GetResourceRequest{})
s.Register("/api/resource/search", &as.SearchResourceRequest{})
s.Register("/api/resource/check_availability", &as.CheckResourceAvailabilityRequest{})
s.Register("/api/resource/set_status", &as.SetResourceStatusRequest{})
s.Register("/api/resource/create", &as.CreateResourcesRequest{})
s.Register("/api/resource/move", &as.MoveResourceRequest{})
s.Register("/api/resource/reset_password", &as.ResetResourcePasswordRequest{})
s.Register("/api/resource/email/add_alias", &as.AddEmailAliasRequest{})
s.Register("/api/resource/email/delete_alias", &as.DeleteEmailAliasRequest{})
s.Register("/api/recover_account", &as.AccountRecoveryRequest{})
return s
}
var emptyResponse struct{}
type jsonError interface {
JSON() []byte
}
func (s *APIServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// Create a new empty request object based on the request // Create a new empty request object based on the request
// path, then decode the HTTP request JSON body onto it. // path, then decode the HTTP request JSON body onto it.
r, ok := s.newRequest(req.URL.Path) req, ok := r.newRequest(httpReq.URL.Path)
if !ok { if !ok {
http.NotFound(w, req) http.NotFound(w, httpReq)
return return
} }
if !serverutil.DecodeJSONRequest(w, req, r) { if !serverutil.DecodeJSONRequest(w, httpReq, req) {
http.Error(w, "bad request", http.StatusBadRequest) http.Error(w, "bad request", http.StatusBadRequest)
return return
} }
resp, err := s.service.Handle(req.Context(), r) resp, err := r.service.Handle(httpReq.Context(), req)
if err != nil { if err != nil {
// Handle structured errors, serve a JSON response. // Handle structured errors, serve a JSON response.
status := errToStatus(err) status := errToStatus(err)
...@@ -113,13 +75,150 @@ func (s *APIServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { ...@@ -113,13 +75,150 @@ func (s *APIServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// Now that all is done, we can log the request/response // Now that all is done, we can log the request/response
// (sanitization might modify the objects in place). // (sanitization might modify the objects in place).
reqData := marshalJSONSanitized(r) reqData := marshalJSONSanitized(req)
if err != nil { if err != nil {
log.Printf("request: %s %s -> ERROR: %v", req.URL.Path, reqData, err) log.Printf("request: %s %s -> ERROR: %v", httpReq.URL.Path, reqData, err)
} else { } else {
respData := marshalJSONSanitized(resp) respData := marshalJSONSanitized(resp)
log.Printf("request: %s %s -> %s", req.URL.Path, reqData, respData) log.Printf("request: %s %s -> %s", httpReq.URL.Path, reqData, respData)
}
}
// APIServer is the HTTP API interface to AccountService. It
// implements the http.Handler interface.
type APIServer struct {
*http.ServeMux
}
type apiEndpoint struct {
path string
handler as.Request
}
var (
readOnlyEndpoints = []apiEndpoint{
{"/api/user/get", &as.GetUserRequest{}},
{"/api/user/search", &as.SearchUserRequest{}},
{"/api/resource/get", &as.GetResourceRequest{}},
{"/api/resource/search", &as.SearchResourceRequest{}},
{"/api/resource/check_availability", &as.CheckResourceAvailabilityRequest{}},
}
writeEndpoints = []apiEndpoint{
{"/api/user/create", &as.CreateUserRequest{}},
{"/api/user/update", &as.UpdateUserRequest{}},
{"/api/user/admin_update", &as.AdminUpdateUserRequest{}},
{"/api/user/change_password", &as.ChangeUserPasswordRequest{}},
{"/api/user/reset_password", &as.ResetPasswordRequest{}},
{"/api/user/set_account_recovery_hint", &as.SetAccountRecoveryHintRequest{}},
{"/api/user/enable_otp", &as.EnableOTPRequest{}},
{"/api/user/disable_otp", &as.DisableOTPRequest{}},
{"/api/user/create_app_specific_password", &as.CreateApplicationSpecificPasswordRequest{}},
{"/api/user/delete_app_specific_password", &as.DeleteApplicationSpecificPasswordRequest{}},
{"/api/resource/set_status", &as.SetResourceStatusRequest{}},
{"/api/resource/create", &as.CreateResourcesRequest{}},
{"/api/resource/move", &as.MoveResourceRequest{}},
{"/api/resource/reset_password", &as.ResetResourcePasswordRequest{}},
{"/api/resource/email/add_alias", &as.AddEmailAliasRequest{}},
{"/api/resource/email/delete_alias", &as.DeleteEmailAliasRequest{}},
{"/api/recover_account", &as.AccountRecoveryRequest{}},
}
)
// New creates a new APIServer. If leaderAddr is not the empty string,
// write requests will be forwarded to that address.
func New(service *as.AccountService, backend as.Backend, leaderAddr string, clientTLS *clientutil.TLSClientConfig) (*APIServer, error) {
registry := newActionRegistry(service)
mux := http.NewServeMux()
for _, ep := range readOnlyEndpoints {
registry.Register(ep.path, ep.handler)
}
var fs *forwardServer
if leaderAddr != "" {
var err error
fs, err = newForwardServer(leaderAddr, clientTLS)
if err != nil {
return nil, err
}
}
for _, ep := range writeEndpoints {
if leaderAddr == "" {
registry.Register(ep.path, ep.handler)
} else {
mux.Handle(ep.path, fs)
}
}
mux.Handle("/", registry)
return &APIServer{ServeMux: mux}, nil
}
var emptyResponse struct{}
type jsonError interface {
JSON() []byte
}
// A forwardServer is just a fancy httputil.ReverseProxy with loop
// detection (we don't want to receive proxied requests if we are not
// the leader).
type forwardServer struct {
proxy *httputil.ReverseProxy
}
const (
loopHdr = "X-Accountserver-Forwarded"
loopHdrValue = "true"
)
func newForwardServer(leaderURL string, tlsClientConf *clientutil.TLSClientConfig) (*forwardServer, error) {
leader, err := url.Parse(strings.TrimRight(leaderURL, "/"))
if err != nil {
return nil, err
} }
var tlsConf *tls.Config
if tlsClientConf != nil {
tlsConf, err = tlsClientConf.TLSConfig()
if err != nil {
return nil, err
}
}
proxy := &httputil.ReverseProxy{
Director: func(req *http.Request) {
req.URL.Scheme = leader.Scheme
req.URL.Host = leader.Host
req.URL.Path = leader.Path + req.URL.Path
// Loop protection.
req.Header.Set(loopHdr, loopHdrValue)
// Explicitly disable User-Agent so it's not set to default value.
if _, ok := req.Header["User-Agent"]; !ok {
req.Header.Set("User-Agent", "")
}
},
Transport: &http.Transport{
TLSClientConfig: tlsConf,
},