Commit 9cbc4225 authored by ale's avatar ale

Add leader-forwarding mode for write requests

Write requests can be forwarded to a 'leader' accountserver instance.
parent b790a71a
......@@ -155,3 +155,53 @@ The configuration is stored in a YAML file, by default
requires the *time*, *mem* and *threads* parameters (defaults
to 1/4/4); *scrypt* requires *n*, *r* and *p* (defaults
16384/8/1)
## Distributed operation
In a distributed scenario it might make sense to run multiple
instances of the accountserver for reliability. The accountserver
however is not a distributed application and does not include a
mechanism for managing consensus: furthermore it relies on the
characteristics of the underlying storage, which aren't under the
accountserver's control (consider for instance the case of using a SQL
or LDAP database with asynchronous replication).
The accountserver load is heavily skewed towards reads, and the read
and write paths have very different operational characteristics:
writes require a centralized accountserver, due to the presence of
many read-modify-update cycles in our API. The only way to improve
this is to use some highly-available, serialized storage. Writes,
however, are infrequent, and are not critical to the operation of the
accountserver clients. Reads, instead, are very frequent and require
caching for performance and latency reasons. It follows that
prioritizing reads over writes would be a reasonable graceful
degradation policy for the service.
If the storage layer has any form of read-only high-availability (much
easier to achieve, this would be the case for most
asynchronously-replicated setups for instance), this is exploitable by
the accountserver by providing high-availability for the read path,
which is basically a distributed caching problem. Given the tolerances
of the upstream applications, the only real issue is the necessary
connection between write path and read path which is required for
cache invalidation on writes.
The simplest way to make this work is the following:
* assume that the full configuration is available to each
accountserver at all times: that is, each accountserver instance has
a list of all the other accountserver instances;
* one of the accountserver instances is selected as the *leader* by
some external mechanism (including manually);
* write requests are always forwarded to the leader: this keeps the
client API simple, requiring no awareness of the accountserver
topology;
* every accountserver instance maintains its own read cache, and reads
are always served by the local accountserver, never forwarded;
* the leader accountserver, whenever it accepts a write, sends cache
invalidation requests to every other accountserver instance.
The performance of the above is strictly not worse than that of the
underlying storage, except for the possibility of serving stale data
whenever we lose an invalidation request due to network trouble. This
is generally an acceptable risk for our upstream applications.
......@@ -9,6 +9,7 @@ import (
"strings"
"git.autistici.org/ai3/accountserver"
"git.autistici.org/ai3/go-common/clientutil"
"git.autistici.org/ai3/go-common/pwhash"
"git.autistici.org/ai3/go-common/serverutil"
"gopkg.in/yaml.v2"
......@@ -40,6 +41,13 @@ type config struct {
Algo string `yaml:"algo"`
Params map[string]int `yaml:"params"`
} `yaml:"pwhash"`
// Replication config.
Replication struct {
LeaderURL string `yaml:"leader_url"`
Peers []string `yaml:"peers"`
TLS *clientutil.TLSClientConfig `yaml:"tls"`
} `yaml:"replication"`
}
func (c *config) validate() error {
......@@ -172,7 +180,10 @@ func main() {
log.Fatal(err)
}
as := server.New(service, be)
as, err := server.New(service, be, config.Replication.LeaderURL, config.Replication.TLS)
if err != nil {
log.Fatal(err)
}
if err := serverutil.Serve(as, config.ServerConfig, *addr); err != nil {
log.Fatal(err)
......
......@@ -152,7 +152,10 @@ func startServiceWithConfigAndCache(t testing.TB, svcConfig as.Config, enableCac
t.Fatal("NewAccountService", err)
}
as := server.New(service, be)
as, err := server.New(service, be, "", nil)
if err != nil {
t.Fatal("server.New", err)
}
srv := httptest.NewServer(as)
c := &testClient{
......
package server
import (
"crypto/tls"
"encoding/json"
"fmt"
"log"
"net/http"
"net/http/httputil"
"net/url"
"reflect"
"strings"
"git.autistici.org/ai3/go-common/clientutil"
"git.autistici.org/ai3/go-common/serverutil"
as "git.autistici.org/ai3/accountserver"
)
type actionRegistry struct {
service *as.AccountService
handlers map[string]reflect.Type
}
func newActionRegistry() *actionRegistry {
func newActionRegistry(service *as.AccountService) *actionRegistry {
return &actionRegistry{
service: service,
handlers: make(map[string]reflect.Type),
}
}
......@@ -34,65 +41,20 @@ func (r *actionRegistry) newRequest(path string) (as.Request, bool) {
return reflect.New(h).Interface().(as.Request), true
}
// APIServer is the HTTP API interface to AccountService.
type APIServer struct {
*actionRegistry
service *as.AccountService
}
// New creates a new APIServer.
func New(service *as.AccountService, backend as.Backend) *APIServer {
s := &APIServer{
actionRegistry: newActionRegistry(),
service: service,
}
s.Register("/api/user/get", &as.GetUserRequest{})
s.Register("/api/user/search", &as.SearchUserRequest{})
s.Register("/api/user/create", &as.CreateUserRequest{})
s.Register("/api/user/update", &as.UpdateUserRequest{})
s.Register("/api/user/admin_update", &as.AdminUpdateUserRequest{})
s.Register("/api/user/change_password", &as.ChangeUserPasswordRequest{})
s.Register("/api/user/reset_password", &as.ResetPasswordRequest{})
s.Register("/api/user/set_account_recovery_hint", &as.SetAccountRecoveryHintRequest{})
s.Register("/api/user/enable_otp", &as.EnableOTPRequest{})
s.Register("/api/user/disable_otp", &as.DisableOTPRequest{})
s.Register("/api/user/create_app_specific_password", &as.CreateApplicationSpecificPasswordRequest{})
s.Register("/api/user/delete_app_specific_password", &as.DeleteApplicationSpecificPasswordRequest{})
s.Register("/api/resource/get", &as.GetResourceRequest{})
s.Register("/api/resource/search", &as.SearchResourceRequest{})
s.Register("/api/resource/check_availability", &as.CheckResourceAvailabilityRequest{})
s.Register("/api/resource/set_status", &as.SetResourceStatusRequest{})
s.Register("/api/resource/create", &as.CreateResourcesRequest{})
s.Register("/api/resource/move", &as.MoveResourceRequest{})
s.Register("/api/resource/reset_password", &as.ResetResourcePasswordRequest{})
s.Register("/api/resource/email/add_alias", &as.AddEmailAliasRequest{})
s.Register("/api/resource/email/delete_alias", &as.DeleteEmailAliasRequest{})
s.Register("/api/recover_account", &as.AccountRecoveryRequest{})
return s
}
var emptyResponse struct{}
type jsonError interface {
JSON() []byte
}
func (s *APIServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
func (r *actionRegistry) ServeHTTP(w http.ResponseWriter, httpReq *http.Request) {
// Create a new empty request object based on the request
// path, then decode the HTTP request JSON body onto it.
r, ok := s.newRequest(req.URL.Path)
req, ok := r.newRequest(httpReq.URL.Path)
if !ok {
http.NotFound(w, req)
http.NotFound(w, httpReq)
return
}
if !serverutil.DecodeJSONRequest(w, req, r) {
if !serverutil.DecodeJSONRequest(w, httpReq, req) {
http.Error(w, "bad request", http.StatusBadRequest)
return
}
resp, err := s.service.Handle(req.Context(), r)
resp, err := r.service.Handle(httpReq.Context(), req)
if err != nil {
// Handle structured errors, serve a JSON response.
status := errToStatus(err)
......@@ -113,13 +75,151 @@ func (s *APIServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// Now that all is done, we can log the request/response
// (sanitization might modify the objects in place).
reqData := marshalJSONSanitized(r)
reqData := marshalJSONSanitized(req)
if err != nil {
log.Printf("request: %s %s -> ERROR: %v", req.URL.Path, reqData, err)
log.Printf("request: %s %s -> ERROR: %v", httpReq.URL.Path, reqData, err)
} else {
respData := marshalJSONSanitized(resp)
log.Printf("request: %s %s -> %s", req.URL.Path, reqData, respData)
log.Printf("request: %s %s -> %s", httpReq.URL.Path, reqData, respData)
}
}
// APIServer is the HTTP API interface to AccountService. It
// implements the http.Handler interface.
type APIServer struct {
*http.ServeMux
//*actionRegistry
}
type apiEndpoint struct {
path string
handler as.Request
}
var (
readOnlyEndpoints = []apiEndpoint{
{"/api/user/get", &as.GetUserRequest{}},
{"/api/user/search", &as.SearchUserRequest{}},
{"/api/resource/get", &as.GetResourceRequest{}},
{"/api/resource/search", &as.SearchResourceRequest{}},
{"/api/resource/check_availability", &as.CheckResourceAvailabilityRequest{}},
}
writeEndpoints = []apiEndpoint{
{"/api/user/create", &as.CreateUserRequest{}},
{"/api/user/update", &as.UpdateUserRequest{}},
{"/api/user/admin_update", &as.AdminUpdateUserRequest{}},
{"/api/user/change_password", &as.ChangeUserPasswordRequest{}},
{"/api/user/reset_password", &as.ResetPasswordRequest{}},
{"/api/user/set_account_recovery_hint", &as.SetAccountRecoveryHintRequest{}},
{"/api/user/enable_otp", &as.EnableOTPRequest{}},
{"/api/user/disable_otp", &as.DisableOTPRequest{}},
{"/api/user/create_app_specific_password", &as.CreateApplicationSpecificPasswordRequest{}},
{"/api/user/delete_app_specific_password", &as.DeleteApplicationSpecificPasswordRequest{}},
{"/api/resource/set_status", &as.SetResourceStatusRequest{}},
{"/api/resource/create", &as.CreateResourcesRequest{}},
{"/api/resource/move", &as.MoveResourceRequest{}},
{"/api/resource/reset_password", &as.ResetResourcePasswordRequest{}},
{"/api/resource/email/add_alias", &as.AddEmailAliasRequest{}},
{"/api/resource/email/delete_alias", &as.DeleteEmailAliasRequest{}},
{"/api/recover_account", &as.AccountRecoveryRequest{}},
}
)
// New creates a new APIServer. If leaderAddr is not the empty string,
// write requests will be forwarded to that address.
func New(service *as.AccountService, backend as.Backend, leaderAddr string, clientTLS *clientutil.TLSClientConfig) (*APIServer, error) {
registry := newActionRegistry(service)
mux := http.NewServeMux()
for _, ep := range readOnlyEndpoints {
registry.Register(ep.path, ep.handler)
}
var fs *forwardServer
if leaderAddr != "" {
var err error
fs, err = newForwardServer(leaderAddr, clientTLS)
if err != nil {
return nil, err
}
}
for _, ep := range writeEndpoints {
if leaderAddr == "" {
registry.Register(ep.path, ep.handler)
} else {
mux.Handle(ep.path, fs)
}
}
mux.Handle("/", registry)
return &APIServer{ServeMux: mux}, nil
}
var emptyResponse struct{}
type jsonError interface {
JSON() []byte
}
// A forwardServer is just a fancy httputil.ReverseProxy with loop
// detection (we don't want to receive proxied requests if we are not
// the leader).
type forwardServer struct {
proxy *httputil.ReverseProxy
}
const (
loopHdr = "X-Accountserver-Forwarded"
loopHdrValue = "true"
)
func newForwardServer(leaderURL string, tlsClientConf *clientutil.TLSClientConfig) (*forwardServer, error) {
leader, err := url.Parse(strings.TrimRight(leaderURL, "/"))
if err != nil {
return nil, err
}
var tlsConf *tls.Config
if tlsClientConf != nil {
tlsConf, err = tlsClientConf.TLSConfig()
if err != nil {
return nil, err
}
}
proxy := &httputil.ReverseProxy{
Director: func(req *http.Request) {
req.URL.Scheme = leader.Scheme
req.URL.Host = leader.Host
req.URL.Path = leader.Path + req.URL.Path
// Loop protection.
req.Header.Set(loopHdr, loopHdrValue)
// Explicitly disable User-Agent so it's not set to default value.
if _, ok := req.Header["User-Agent"]; !ok {
req.Header.Set("User-Agent", "")
}
},
Transport: &http.Transport{
TLSClientConfig: tlsConf,
},
BufferPool: newBufferPool(8192, 64),
}
return &forwardServer{proxy: proxy}, nil
}
func (s *forwardServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// Simple forwarding loop checker, using out-of-band data in
// the form of a custom HTTP header.
if req.Header.Get(loopHdr) == loopHdrValue {
log.Printf("received leader request from %s, aborted", req.RemoteAddr)
http.Error(w, "This node is not the leader", http.StatusInternalServerError)
return
}
s.proxy.ServeHTTP(w, req)
}
func errToStatus(err error) int {
......@@ -152,3 +252,36 @@ func marshalJSONSanitized(obj interface{}) string {
}
return string(data)
}
// Simple buffer pool for httputil.ReverseProxy.
type bufferPool struct {
ch chan []byte
bufSize int
}
func newBufferPool(bufSize, poolSize int) *bufferPool {
pool := make(chan []byte, poolSize)
for i := 0; i < poolSize; i++ {
pool <- make([]byte, bufSize)
}
return &bufferPool{
ch: pool,
bufSize: bufSize,
}
}
func (p *bufferPool) Get() (b []byte) {
select {
case b = <-p.ch:
default:
b = make([]byte, p.bufSize)
}
return
}
func (p *bufferPool) Put(b []byte) {
select {
case p.ch <- b:
default:
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment