Commit 8426627f authored by putro's avatar putro

Merge remote-tracking branch 'origin/master' into add_newsletters

parents 34f46d7f 6d3d9981
Pipeline #4637 passed with stages
in 5 minutes and 2 seconds
......@@ -528,6 +528,20 @@ Request parameters:
* `sso` - SSO ticket
* `resources` - list of resource objects to create
### `/api/resource/check_availability`
Verify if a resource (identified here by type and name) exists or not,
with the purpose of providing early feedback to applications creating
new services.
Request parameters:
* `type` - resource type
* `name` - resource name
Response attributes:
* `available` - bool
## Type-specific resource endpoints
......
......@@ -155,3 +155,79 @@ The configuration is stored in a YAML file, by default
requires the *time*, *mem* and *threads* parameters (defaults
to 1/4/4); *scrypt* requires *n*, *r* and *p* (defaults
16384/8/1)
* `cache`: cache configuration
* `enabled`: if set to *true*, enable a cache for User objects. Very
useful to reduce latencies for backends with complex queries like
LDAP (default *false*, cache is disabled).
## Distributed operation
In a distributed scenario it might make sense to run multiple
instances of the accountserver for reliability. The accountserver
however is not a distributed application and does not include a
mechanism for managing consensus: furthermore it relies on the
characteristics of the underlying storage, which aren't under the
accountserver's control (consider for instance the case of using a SQL
or LDAP database with asynchronous replication).
The accountserver load is heavily skewed towards reads, and the read
and write paths have very different operational characteristics:
writes require a centralized accountserver, due to the presence of
many read-modify-update cycles in our API. The only way to improve
this is to use some highly-available, serialized storage. Writes,
however, are infrequent, and are not critical to the operation of the
accountserver clients. Reads, instead, are very frequent and require
caching for performance and latency reasons. It follows that
prioritizing reads over writes would be a reasonable graceful
degradation policy for the service.
If the storage layer has any form of read-only high-availability (much
easier to achieve, this would be the case for most
asynchronously-replicated setups for instance), this is exploitable by
the accountserver by providing high-availability for the read path,
which is basically a distributed caching problem. Given the tolerances
of the upstream applications, the only real issue is the necessary
connection between write path and read path which is required for
cache invalidation on writes.
The simplest way to make this work is the following:
* assume that the full configuration is available to each
accountserver at all times: that is, each accountserver instance has
a list of all the other accountserver instances;
* one of the accountserver instances is selected as the *leader* by
some external mechanism (including manually);
* write requests are always forwarded to the leader: this keeps the
client API simple, requiring no awareness of the accountserver
topology;
* every accountserver instance maintains its own read cache, and reads
are always served by the local accountserver, never forwarded;
* the leader accountserver, whenever it accepts a write, sends cache
invalidation requests to every other accountserver instance.
The performance of the above is strictly not worse than that of the
underlying storage, except for the possibility of serving stale data
whenever we lose an invalidation request due to network trouble. This
is generally an acceptable risk for our upstream applications.
### Configuration
To enable distributed operations set attributes below the
*replication* configuration variable:
* `replication`
* `leader_url`: URL of the *leader* accountserver instance. When
this field is set, write requests to this instance will be
forwarded (transparently to the caller) to this URL.
* `peers`: list of peer URLs for the other accountserver
instances. Do not include the current instance in this list, or
you will create unexpected feedback loops.
* `tls`: client TLS configuration
* `cert`: path to the server certificate
* `key`: path to the server's private key
* `ca`: path to the CA used to validate clients
Note that setting *peers* is only necessary if the cache is enabled
(see the *Configuration* section above). Due to implementation
details, all instances should share the same setting for
*cache.enabled*.
......@@ -89,6 +89,62 @@ func (r *SetResourceStatusRequest) Serve(rctx *RequestContext) (interface{}, err
return nil, setResourceStatus(rctx, r.Status)
}
// CheckResourceAvailabilityRequest is an unauthenticated request that
// can tell if a given resource ID is available or not.
type CheckResourceAvailabilityRequest struct {
Type string `json:"type"`
Name string `json:"name"`
}
// CheckResourceAvailabilityResponse is the response type for
// CheckResourceAvailabilityRequest.
type CheckResourceAvailabilityResponse struct {
Available bool `json:"available"`
}
// Authorize the request - this one requires no authentication.
func (r *CheckResourceAvailabilityRequest) Authorize(rctx *RequestContext) error {
return nil
}
// PopulateContext is a no-op for this type.
func (r *CheckResourceAvailabilityRequest) PopulateContext(rctx *RequestContext) error {
return nil
}
// Validate the request.
func (r *CheckResourceAvailabilityRequest) Validate(rctx *RequestContext) error {
if r.Name == "" {
return errors.New("name is unset")
}
return nil
}
// Serve the request.
func (r *CheckResourceAvailabilityRequest) Serve(rctx *RequestContext) (interface{}, error) {
var check ValidatorFunc
switch r.Type {
case ResourceTypeEmail, ResourceTypeMailingList:
check = rctx.validationCtx.isAvailableEmailAddr()
case ResourceTypeDomain:
check = rctx.validationCtx.isAvailableDomain()
case ResourceTypeWebsite:
check = rctx.validationCtx.isAvailableWebsite()
case ResourceTypeDAV:
check = rctx.validationCtx.isAvailableDAV()
case ResourceTypeDatabase:
check = rctx.validationCtx.isAvailableDatabase()
default:
return nil, errors.New("unknown resource type")
}
var resp CheckResourceAvailabilityResponse
if err := check(rctx.Context, r.Name); err == nil {
resp.Available = true
}
return &resp, nil
}
// ResetResourcePasswordRequest will reset the password associated
// with a resource (if the resource type supports it). It will
// generate a random password and return it to the caller.
......
......@@ -2,12 +2,14 @@ package cachebackend
import (
"context"
"net/http"
"time"
"github.com/patrickmn/go-cache"
"golang.org/x/sync/singleflight"
as "git.autistici.org/ai3/accountserver"
"git.autistici.org/ai3/go-common/clientutil"
ct "git.autistici.org/ai3/go-common/ldap/compositetypes"
)
......@@ -16,7 +18,18 @@ var (
cleanupInterval = 180 * time.Second
)
// cacheBackend implements a simple in-memory cache of user objects
// This is the interface for the actual cache, an in-memory LRU cache
// with optional replicated invalidation.
type internalCache interface {
Get(string) (interface{}, bool)
Set(string, interface{}, time.Duration)
Delete(string)
}
// CacheBackend is both an accountserver Backend and a http.Handler
// that implements the replicated invalidation RPCs.
//
// CacheBackend implements a simple in-memory cache of user objects
// (not resources yet), in order to reduce the database and processing
// load in presence of a heavily read-oriented workload. The cache is
// very simple, and any update to a user or its resources cause us to
......@@ -26,37 +39,62 @@ var (
// cleaned up. Memory usage thus depends on the load and is difficult
// to estimate in advance.
//
type cacheBackend struct {
type CacheBackend struct {
as.Backend
cache *cache.Cache
http.Handler
cache internalCache
}
// Wrap a Backend with a cache.
func Wrap(b as.Backend) as.Backend {
c := cache.New(defaultExpiration, cleanupInterval)
return &cacheBackend{
Backend: b,
cache: c,
func Wrap(b as.Backend, peers []string, tls *clientutil.TLSClientConfig) (*CacheBackend, error) {
var h http.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
http.NotFound(w, r)
})
var c internalCache = cache.New(defaultExpiration, cleanupInterval)
if len(peers) > 0 {
rc, err := newCacheWithReplicatedInvalidation(c, peers, tls)
if err != nil {
return nil, err
}
c = rc
h = rc
}
return &CacheBackend{
Backend: b,
Handler: h,
cache: instrument(c),
}, nil
}
func (b *cacheBackend) NewTransaction() (as.TX, error) {
// NewTransaction returns a new accountserver.TX unit-of-work object.
func (b *CacheBackend) NewTransaction() (as.TX, error) {
innerTX, err := b.Backend.NewTransaction()
if err != nil {
return nil, err
}
return &cacheTX{TX: innerTX, cache: b.cache}, nil
return &cacheTX{
TX: innerTX,
cache: b.cache,
}, nil
}
// The cacheTX type wraps an accountserver.TX object.
type cacheTX struct {
as.TX
cache *cache.Cache
cache internalCache
}
func (c *cacheTX) invalidateUser(username string) {
c.cache.Delete(username)
}
// Updates to the cache are controlled by a singleflight.Group to
// ensure that we only update each user once even with multiple
// callers.
var update singleflight.Group
func (c *cacheTX) GetUser(ctx context.Context, name string) (*as.RawUser, error) {
......
package cachebackend
import (
"context"
"log"
"net/http"
"sync"
"time"
"git.autistici.org/ai3/go-common/clientutil"
"git.autistici.org/ai3/go-common/serverutil"
)
const InvalidateURLPath = "/api/internal/cache_invalidate"
var rpcTimeout = 5 * time.Second
type InvalidateUserRequest struct {
Username string `json:"username"`
}
var emptyResponse = struct{}{}
type replicatedCache struct {
internalCache
peers map[string]clientutil.Backend
}
func newCacheWithReplicatedInvalidation(c internalCache, peers []string, tls *clientutil.TLSClientConfig) (*replicatedCache, error) {
peerBackends := make(map[string]clientutil.Backend)
for _, peerURL := range peers {
b, err := clientutil.NewBackend(&clientutil.BackendConfig{
URL: peerURL,
TLSConfig: tls,
})
if err != nil {
return nil, err
}
peerBackends[peerURL] = b
}
return &replicatedCache{
internalCache: c,
peers: peerBackends,
}, nil
}
func (rc *replicatedCache) Delete(username string) {
// Delete the user entry from the local cache.
rc.internalCache.Delete(username)
ctx, cancel := context.WithTimeout(context.Background(), rpcTimeout)
defer cancel()
// Reach out to all peers in parallel with an InvalidateUserRequest.
var wg sync.WaitGroup
for peerURL, peer := range rc.peers {
wg.Add(1)
go func(peerURL string, peer clientutil.Backend) {
defer wg.Done()
if err := peer.Call(ctx, InvalidateURLPath, "", &InvalidateUserRequest{Username: username}, nil); err != nil {
log.Printf("error invalidating cache for %s on %s: %v", username, peerURL, err)
}
}(peerURL, peer)
}
wg.Wait()
}
func (rc *replicatedCache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var req InvalidateUserRequest
if !serverutil.DecodeJSONRequest(w, r, &req) {
http.Error(w, "bad request", http.StatusBadRequest)
return
}
// Remove user data from the local cache.
rc.internalCache.Delete(req.Username)
serverutil.EncodeJSONResponse(w, emptyResponse)
}
package cachebackend
import (
"time"
"github.com/prometheus/client_golang/prometheus"
)
// There's only going to be one cache per server process in production
// so we can just use globals for instrumentation metrics.
var (
cacheHits = prometheus.NewCounter(prometheus.CounterOpts{
Name: "accountserver_cache_hits",
Help: "Cache hit counter",
})
cacheMisses = prometheus.NewCounter(prometheus.CounterOpts{
Name: "accountserver_cache_misses",
Help: "Cache miss counter",
})
cacheWrites = prometheus.NewCounter(prometheus.CounterOpts{
Name: "accountserver_cache_writes",
Help: "Cache write counter",
})
cacheDeletes = prometheus.NewCounter(prometheus.CounterOpts{
Name: "accountserver_cache_deletes",
Help: "Cache delete counter",
})
)
func init() {
prometheus.MustRegister(cacheHits)
prometheus.MustRegister(cacheMisses)
prometheus.MustRegister(cacheWrites)
prometheus.MustRegister(cacheDeletes)
}
type instrumentedCache struct {
internalCache
}
func instrument(c internalCache) *instrumentedCache {
return &instrumentedCache{c}
}
func (c *instrumentedCache) Get(key string) (interface{}, bool) {
res, ok := c.internalCache.Get(key)
if ok {
cacheHits.Inc()
} else {
cacheMisses.Inc()
}
return res, ok
}
func (c *instrumentedCache) Set(key string, value interface{}, d time.Duration) {
c.internalCache.Set(key, value, d)
cacheWrites.Inc()
}
func (c *instrumentedCache) Delete(username string) {
c.internalCache.Delete(username)
cacheDeletes.Inc()
}
......@@ -9,6 +9,7 @@ import (
"strings"
"git.autistici.org/ai3/accountserver"
"git.autistici.org/ai3/go-common/clientutil"
"git.autistici.org/ai3/go-common/pwhash"
"git.autistici.org/ai3/go-common/serverutil"
"gopkg.in/yaml.v2"
......@@ -40,6 +41,13 @@ type config struct {
Algo string `yaml:"algo"`
Params map[string]int `yaml:"params"`
} `yaml:"pwhash"`
// Replication config.
Replication struct {
LeaderURL string `yaml:"leader_url"`
Peers []string `yaml:"peers"`
TLS *clientutil.TLSClientConfig `yaml:"tls"`
} `yaml:"replication"`
}
func (c *config) validate() error {
......@@ -163,8 +171,17 @@ func main() {
log.Fatal(err)
}
// If the cache is enabled, create the cache backend wrapper. It
// also acts as a http.Handler for the replicated cache
// invalidation RPC, so we're going to have to route its endpoint
// on the main Server later.
var cache *cachebackend.CacheBackend
if config.Cache.Enabled {
be = cachebackend.Wrap(be)
cache, err = cachebackend.Wrap(be, config.Replication.Peers, config.Replication.TLS)
if err != nil {
log.Fatal(err)
}
be = cache
}
service, err := accountserver.NewAccountService(be, &config.AccountServerConfig)
......@@ -172,8 +189,15 @@ func main() {
log.Fatal(err)
}
as := server.New(service, be)
as, err := server.New(service, be, config.Replication.LeaderURL, config.Replication.TLS)
if err != nil {
log.Fatal(err)
}
if cache != nil {
as.Handle(cachebackend.InvalidateURLPath, cache)
}
// Start the HTTP server.
if err := serverutil.Serve(as, config.ServerConfig, *addr); err != nil {
log.Fatal(err)
}
......
......@@ -120,7 +120,10 @@ func startServiceWithConfigAndCache(t testing.TB, svcConfig as.Config, enableCac
t.Fatal("NewLDAPBackend", err)
}
if enableCache {
be = cachebackend.Wrap(be)
be, err = cachebackend.Wrap(be, nil, nil)
if err != nil {
t.Fatal("cachebackend.Wrap", err)
}
}
ssoStop, signer, ssoPubKeyFile := withSSO(t)
......@@ -152,7 +155,10 @@ func startServiceWithConfigAndCache(t testing.TB, svcConfig as.Config, enableCac
t.Fatal("NewAccountService", err)
}
as := server.New(service, be)
as, err := server.New(service, be, "", nil)
if err != nil {
t.Fatal("server.New", err)
}
srv := httptest.NewServer(as)
c := &testClient{
......
package server
import (
"crypto/tls"
"encoding/json"
"fmt"
"log"
"net/http"
"net/http/httputil"
"net/url"
"reflect"
"strings"
"git.autistici.org/ai3/go-common/clientutil"
"git.autistici.org/ai3/go-common/serverutil"
as "git.autistici.org/ai3/accountserver"
)
type actionRegistry struct {
service *as.AccountService
handlers map[string]reflect.Type
}
func newActionRegistry() *actionRegistry {
func newActionRegistry(service *as.AccountService) *actionRegistry {
return &actionRegistry{
service: service,
handlers: make(map[string]reflect.Type),
}
}
......@@ -34,64 +41,20 @@ func (r *actionRegistry) newRequest(path string) (as.Request, bool) {
return reflect.New(h).Interface().(as.Request), true
}
// APIServer is the HTTP API interface to AccountService.
type APIServer struct {
*actionRegistry
service *as.AccountService
}
// New creates a new APIServer.
func New(service *as.AccountService, backend as.Backend) *APIServer {
s := &APIServer{
actionRegistry: newActionRegistry(),
service: service,
}
s.Register("/api/user/get", &as.GetUserRequest{})
s.Register("/api/user/search", &as.SearchUserRequest{})
s.Register("/api/user/create", &as.CreateUserRequest{})
s.Register("/api/user/update", &as.UpdateUserRequest{})
s.Register("/api/user/admin_update", &as.AdminUpdateUserRequest{})
s.Register("/api/user/change_password", &as.ChangeUserPasswordRequest{})
s.Register("/api/user/reset_password", &as.ResetPasswordRequest{})
s.Register("/api/user/set_account_recovery_hint", &as.SetAccountRecoveryHintRequest{})
s.Register("/api/user/enable_otp", &as.EnableOTPRequest{})
s.Register("/api/user/disable_otp", &as.DisableOTPRequest{})
s.Register("/api/user/create_app_specific_password", &as.CreateApplicationSpecificPasswordRequest{})
s.Register("/api/user/delete_app_specific_password", &as.DeleteApplicationSpecificPasswordRequest{})
s.Register("/api/resource/get", &as.GetResourceRequest{})
s.Register("/api/resource/search", &as.SearchResourceRequest{})
s.Register("/api/resource/set_status", &as.SetResourceStatusRequest{})
s.Register("/api/resource/create", &as.CreateResourcesRequest{})
s.Register("/api/resource/move", &as.MoveResourceRequest{})
s.Register("/api/resource/reset_password", &as.ResetResourcePasswordRequest{})
s.Register("/api/resource/email/add_alias", &as.AddEmailAliasRequest{})
s.Register("/api/resource/email/delete_alias", &as.DeleteEmailAliasRequest{})
s.Register("/api/recover_account", &as.AccountRecoveryRequest{})
return s
}
var emptyResponse struct{}
type jsonError interface {
JSON() []byte
}
func (s *APIServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
func (r *actionRegistry) ServeHTTP(w http.ResponseWriter, httpReq *http.Request) {
// Create a new empty request object based on the request
// path, then decode the HTTP request JSON body onto it.
r, ok := s.newRequest(req.URL.Path)
req, ok := r.newRequest(httpReq.URL.Path)
if !ok {
http.NotFound(w, req)
http.NotFound(w, httpReq)
return
}
if !serverutil.DecodeJSONRequest(w, req, r) {
if !serverutil.DecodeJSONRequest(w, httpReq, req) {
http.Error(w, "bad request", http.StatusBadRequest)
return
}
resp, err := s.service.Handle(req.Context(), r)
resp, err := r.service.Handle(httpReq.Context(), req)
if err != nil {
// Handle structured errors, serve a JSON response.
status := errToStatus(err)
......@@ -112,13 +75,150 @@ func (s *APIServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// Now that all is done, we can log the request/response
// (sanitization might modify the objects in place).
reqData := marshalJSONSanitized(r)
reqData := marshalJSONSanitized(req)
if err != nil {
log.Printf("request: %s %s -> ERROR: %v", req.URL.Path, reqData, err)
log.Printf("request: %s %s -> ERROR: %v", httpReq.URL.Path, reqData, err)
} else {
respData := marshalJSONSanitized(resp)
log.Printf("request: %s %s -> %s", req.URL.Path, reqData, respData)
log.Printf("request: %s %s -> %s", httpReq.URL.Path, reqData, respData)
}
}
// APIServer is the HTTP API interface to AccountService. It
// implements the http.Handler interface.
type APIServer struct {
*http.ServeMux
}
type apiEndpoint struct {
path string
handler as.Request
}
var (
readOnlyEndpoints = []apiEndpoint{
{"/api/user/get", &as.GetUserRequest{}},
{"/api/user/search", &as.SearchUserRequest{}},
{"/api/resource/get", &as.GetResourceRequest{}},
{"/api/resource/search", &as.SearchResourceRequest{}},
{"/api/resource/check_availability", &as.CheckResourceAvailabilityRequest{}},
}
writeEndpoints = []apiEndpoint{
{"/api/user/create", &as.CreateUserRequest{}},
{"/api/user/update", &as.UpdateUserRequest{}},
{"/api/user/admin_update", &as.AdminUpdateUserRequest{}},
{"/api/user/change_password", &as.ChangeUserPasswordRequest{}},
{"/api/user/reset_password", &as.ResetPasswordRequest{}},
{"/api/user/set_account_recovery_hint", &as.SetAccountRecoveryHintRequest{}},
{"/api/user/enable_otp", &as.EnableOTPRequest{}},
{"/api/user/disable_otp", &as.DisableOTPRequest{}},
{"/api/user/create_app_specific_password", &as.CreateApplicationSpecificPasswordRequest{}},
{"/api/user/delete_app_specific_password", &as.DeleteApplicationSpecificPasswordRequest{}},
{"/api/resource/set_status", &as.SetResourceStatusRequest{}},
{"/api/resource/create", &as.CreateResourcesRequest{}},
{"/api/resource/move", &as.MoveResourceRequest{}},
{"/api/resource/reset_password", &as.ResetResourcePasswordRequest{}},
{"/api/resource/email/add_alias", &as.AddEmailAliasRequest{}},
{"/api/resource/email/delete_alias", &as.DeleteEmailAliasRequest{}},
{"/api/recover_account", &as.AccountRecoveryRequest{}},
}
)
// New creates a new APIServer. If leaderAddr is not the empty string,
// write requests will be forwarded to that address.
func New(service *as.AccountService, backend as.Backend, leaderAddr string, clientTLS *clientutil.TLSClientConfig) (*APIServer, error) {
registry := newActionRegistry(service)
mux := http.NewServeMux()
for _, ep := range readOnlyEndpoints {
registry.Register(ep.path, ep.handler)
}
var fs *forwardServer
if leaderAddr != "" {
var err error
fs, err = newForwardServer(leaderAddr, clientTLS)
if err != nil {
return nil, err
}
}
for _, ep := range writeEndpoints {
if leaderAddr == "" {
registry.Register(ep.path, ep.handler)
} else {
mux.Handle(ep.path, fs)
}
}
mux.Handle("/", registry)
return &APIServer{ServeMux: mux}, nil
}
var emptyResponse struct{}
type jsonError interface {
JSON() []byte
}