Commit 2dc1ea31 authored by ale's avatar ale

Log user-level actions to the usermetadb

parent 3cbaad14
Pipeline #1433 passed with stages
in 1 minute and 27 seconds
......@@ -9,6 +9,7 @@ import (
"log"
"git.autistici.org/ai3/go-common/pwhash"
umdb "git.autistici.org/id/usermetadb"
"github.com/pquerna/otp/totp"
"github.com/sethvargo/go-password/password"
"github.com/tstranex/u2f"
......@@ -147,7 +148,7 @@ func (r *ChangeUserPasswordRequest) Validate(ctx context.Context, s *AccountServ
// care of re-encrypting the user encryption key, if present.
func (s *AccountService) ChangeUserPassword(ctx context.Context, tx TX, req *ChangeUserPasswordRequest) error {
return s.handleUserRequest(ctx, tx, req, s.authUserWithPassword(req.PrivilegedRequestBase), func(ctx context.Context, user *User) error {
return s.changeUserPasswordAndUpdateEncryptionKeys(ctx, tx, user, req.CurPassword, req.Password)
return s.changeUserPasswordAndUpdateEncryptionKeys(ctx, tx, user, req.CurPassword, req.Password, "password changed (user request)")
})
}
......@@ -204,10 +205,8 @@ func (s *AccountService) RecoverPassword(ctx context.Context, tx TX, req *Passwo
ctx = context.WithValue(ctx, authUserCtxKey, req.Username)
err = s.withRequest(ctx, tx, req, user, func(ctx context.Context) error {
s.audit.Log(ctx, ResourceID{}, "password reset via account recovery")
// Change the user password (the recovery password does not change).
if err := s.changeUserPasswordAndUpdateEncryptionKeys(ctx, tx, user, req.RecoveryPassword, req.Password); err != nil {
if err := s.changeUserPasswordAndUpdateEncryptionKeys(ctx, tx, user, req.RecoveryPassword, req.Password, "password changed (account recovery)"); err != nil {
return err
}
......@@ -239,7 +238,7 @@ func (s *AccountService) ResetPassword(ctx context.Context, tx TX, req *ResetPas
}
// Reset encryption keys and set the new password.
return s.changeUserPasswordAndResetEncryptionKeys(ctx, tx, user, req.Password)
return s.changeUserPasswordAndResetEncryptionKeys(ctx, tx, user, req.Password, "password reset (admin)")
})
}
......@@ -283,7 +282,7 @@ func (s *AccountService) SetPasswordRecoveryHint(ctx context.Context, tx TX, req
// Change the user password and update encryption keys, provided we
// have a password that we can use to decrypt them first.
func (s *AccountService) changeUserPasswordAndUpdateEncryptionKeys(ctx context.Context, tx TX, user *User, oldPassword, newPassword string) error {
func (s *AccountService) changeUserPasswordAndUpdateEncryptionKeys(ctx context.Context, tx TX, user *User, oldPassword, newPassword, logmsg string) error {
// If the user does not yet have an encryption key, generate one now.
var err error
......@@ -307,14 +306,15 @@ func (s *AccountService) changeUserPasswordAndUpdateEncryptionKeys(ctx context.C
return newBackendError(err)
}
s.audit.Log(ctx, ResourceID{}, "password changed")
s.audit.Log(ctx, ResourceID{}, logmsg)
s.logUserAction(user, umdb.LogTypePasswordChange, logmsg)
return nil
}
// Change the user password and reset all encryption keys. Existing email
// won't be readable anymore. Existing 2FA credentials will be deleted.
func (s *AccountService) changeUserPasswordAndResetEncryptionKeys(ctx context.Context, tx TX, user *User, newPassword string) error {
func (s *AccountService) changeUserPasswordAndResetEncryptionKeys(ctx context.Context, tx TX, user *User, newPassword, logmsg string) error {
// Calling initialize will wipe the current keys and replace
// them with a new one.
keys, _, err := s.initializeEncryptionKeys(ctx, tx, user, newPassword)
......@@ -332,7 +332,10 @@ func (s *AccountService) changeUserPasswordAndResetEncryptionKeys(ctx context.Co
return newBackendError(err)
}
s.audit.Log(ctx, ResourceID{}, "password reset")
if logmsg != "" {
s.audit.Log(ctx, ResourceID{}, logmsg)
s.logUserAction(user, umdb.LogTypePasswordReset, logmsg)
}
return nil
}
......@@ -873,9 +876,10 @@ func (s *AccountService) CreateUser(ctx context.Context, tx TX, req *CreateUserR
// Now set a password for the user and return it, and
// set random passwords for all the resources
// (currently, we don't care about those, the user
// will reset them later).
// will reset them later). However, we could return
// them in the response as well, if necessary.
newPassword := randomPassword()
if err := s.changeUserPasswordAndResetEncryptionKeys(ctx, tx, req.User, newPassword); err != nil {
if err := s.changeUserPasswordAndResetEncryptionKeys(ctx, tx, req.User, newPassword, ""); err != nil {
return err
}
resp.Password = newPassword
......@@ -942,7 +946,7 @@ func (s *AccountService) UpdateUser(ctx context.Context, tx TX, req *UpdateUserR
func randomBase64(n int) string {
b := make([]byte, n*3/4)
_, err := rand.Read(b[:])
_, err := rand.Read(b[:]) // #nosec
if err != nil {
panic(err)
}
......
......@@ -37,6 +37,7 @@ func (l *syslogAuditLogger) Log(ctx context.Context, resourceID ResourceID, what
}
}
data, _ := json.Marshal(&e)
log.Printf("@cee:%s", data)
if data, err := json.Marshal(&e); err == nil {
log.Printf("@cee:%s", data)
}
}
......@@ -3,6 +3,7 @@ package accountserver
import (
"io/ioutil"
"git.autistici.org/ai3/go-common/clientutil"
"git.autistici.org/id/go-sso"
)
......@@ -27,6 +28,8 @@ type Config struct {
Groups []string `yaml:"groups"`
AdminGroup string `yaml:"admin_group"`
} `yaml:"sso"`
UserMetaDB *clientutil.BackendConfig `yaml:"user_meta_server"`
}
func (c *Config) domainBackend() domainBackend {
......
......@@ -5,9 +5,12 @@ import (
"encoding/json"
"log"
"reflect"
"time"
"git.autistici.org/ai3/go-common/pwhash"
"git.autistici.org/id/go-sso"
umdb "git.autistici.org/id/usermetadb"
umdbc "git.autistici.org/id/usermetadb/client"
)
// Backend user database interface.
......@@ -79,6 +82,7 @@ type AccountService struct {
*authService
audit auditLogger
umdb umdbc.Client
fieldValidators *fieldValidators
resourceValidator *resourceValidator
......@@ -102,6 +106,14 @@ func newAccountServiceWithSSO(backend Backend, config *Config, ssoValidator sso.
audit: &syslogAuditLogger{},
}
if config.UserMetaDB != nil {
var err error
s.umdb, err = umdbc.New(config.UserMetaDB)
if err != nil {
return nil, err
}
}
vc, err := config.validationContext(backend)
if err != nil {
return nil, err
......@@ -431,7 +443,28 @@ func (s *AccountService) handleAdminRequest(ctx context.Context, tx TX, req inte
return s.withRequest(ctx, tx, req, nil, f)
}
var umdbLogTimeout = 30 * time.Second
func (s *AccountService) logUserAction(user *User, logtype, logmsg string) {
if s.umdb == nil {
return
}
go func() {
ctx, cancel := context.WithTimeout(context.Background(), umdbLogTimeout)
defer cancel()
if err := s.umdb.AddLog(ctx, "", &umdb.LogEntry{
Timestamp: time.Now(),
Username: user.Name,
Type: logtype,
Message: logmsg,
Service: "accountserver",
}); err != nil {
log.Printf("usermetadb.AddLog error for %s: %v", user.Name, err)
}
}()
}
func dumpRequest(req interface{}) string {
data, _ := json.Marshal(req)
data, _ := json.Marshal(req) // nolint
return string(data)
}
......@@ -107,7 +107,7 @@ func (d *staticShardBackend) IsAllowedShard(_ context.Context, kind, shard strin
}
func loadStringSetFromFile(path string) (stringSet, error) {
f, err := os.Open(path)
f, err := os.Open(path) // #nosec
if err != nil {
return stringSet{}, err
}
......
package clientutil
import (
"context"
)
// BackendConfig specifies the configuration of a service backend.
//
// Services with multiple backends can be replicated or partitioned,
// depending on a configuration switch, making it a deployment-time
// decision. Clients are expected to compute their own sharding
// function (either by database lookup or other methods), and expose a
// 'shard' parameter on their APIs.
type BackendConfig struct {
URL string `yaml:"url"`
TLSConfig *TLSClientConfig `yaml:"tls"`
Sharded bool `yaml:"sharded"`
Debug bool `yaml:"debug"`
}
// Backend is a runtime class that provides http Clients for use with
// a specific service backend. If the service can't be partitioned,
// pass an empty string to the Call method.
type Backend interface {
// Call a remote method. The sharding behavior is the following:
//
// Services that support sharding (partitioning) should always
// include the shard ID in their Call() requests. Users can
// then configure backends to be sharded or not in their
// Config. When invoking Call with a shard ID on a non-sharded
// service, the shard ID is simply ignored. Invoking Call
// *without* a shard ID on a sharded service is an error.
Call(context.Context, string, string, interface{}, interface{}) error
// Close all resources associated with the backend.
Close()
}
// NewBackend returns a new Backend with the given config.
func NewBackend(config *BackendConfig) (Backend, error) {
return newBalancedBackend(config, defaultResolver)
}
package clientutil
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"log"
"math/rand"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"time"
"github.com/cenkalti/backoff"
)
// Our own narrow logger interface.
type logger interface {
Printf(string, ...interface{})
}
// A nilLogger is used when Config.Debug is false.
type nilLogger struct{}
func (l nilLogger) Printf(_ string, _ ...interface{}) {}
// Parameters that define the exponential backoff algorithm used.
var (
ExponentialBackOffInitialInterval = 100 * time.Millisecond
ExponentialBackOffMultiplier = 1.4142
)
// newExponentialBackOff creates a backoff.ExponentialBackOff object
// with our own default values.
func newExponentialBackOff() *backoff.ExponentialBackOff {
b := backoff.NewExponentialBackOff()
b.InitialInterval = ExponentialBackOffInitialInterval
b.Multiplier = ExponentialBackOffMultiplier
// Set MaxElapsedTime to 0 because we expect the overall
// timeout to be dictated by the request Context.
b.MaxElapsedTime = 0
return b
}
// Balancer for HTTP connections. It will round-robin across available
// backends, trying to avoid ones that are erroring out, until one
// succeeds or returns a permanent error.
//
// This object should not be used for load balancing of individual
// HTTP requests: it doesn't do anything smart beyond trying to avoid
// broken targets. It's meant to provide a *reliable* connection to a
// set of equivalent services for HA purposes.
type balancedBackend struct {
*backendTracker
*transportCache
baseURI *url.URL
sharded bool
resolver resolver
log logger
}
func newBalancedBackend(config *BackendConfig, resolver resolver) (*balancedBackend, error) {
u, err := url.Parse(config.URL)
if err != nil {
return nil, err
}
var tlsConfig *tls.Config
if config.TLSConfig != nil {
tlsConfig, err = config.TLSConfig.TLSConfig()
if err != nil {
return nil, err
}
}
var logger logger = &nilLogger{}
if config.Debug {
logger = log.New(os.Stderr, fmt.Sprintf("backend %s: ", u.Host), 0)
}
return &balancedBackend{
backendTracker: newBackendTracker(u.Host, resolver, logger),
transportCache: newTransportCache(tlsConfig),
sharded: config.Sharded,
baseURI: u,
resolver: resolver,
log: logger,
}, nil
}
// Call the backend. Makes an HTTP POST request to the specified uri,
// with a JSON-encoded request body. It will attempt to decode the
// response body as JSON.
func (b *balancedBackend) Call(ctx context.Context, shard, path string, req, resp interface{}) error {
data, err := json.Marshal(req)
if err != nil {
return err
}
var tg targetGenerator = b.backendTracker
if b.sharded && shard != "" {
tg = newShardedGenerator(shard, b.baseURI.Host, b.resolver)
}
seq := newSequence(tg)
b.log.Printf("%016x: initialized", seq.ID())
var httpResp *http.Response
err = backoff.Retry(func() error {
req, rerr := b.newJSONRequest(path, shard, data)
if rerr != nil {
return rerr
}
httpResp, rerr = b.do(ctx, seq, req)
return rerr
}, backoff.WithContext(newExponentialBackOff(), ctx))
if err != nil {
return err
}
defer httpResp.Body.Close() // nolint
if httpResp.Header.Get("Content-Type") != "application/json" {
return errors.New("not a JSON response")
}
if resp == nil {
return nil
}
return json.NewDecoder(httpResp.Body).Decode(resp)
}
// Return the URI to be used for the request. This is used both in the
// Host HTTP header and as the TLS server name used to pick a server
// certificate (if using TLS).
func (b *balancedBackend) getURIForRequest(shard, path string) string {
u := *b.baseURI
if b.sharded && shard != "" {
u.Host = fmt.Sprintf("%s.%s", shard, u.Host)
}
u.Path = appendPath(u.Path, path)
return u.String()
}
// Build a http.Request object.
func (b *balancedBackend) newJSONRequest(path, shard string, data []byte) (*http.Request, error) {
req, err := http.NewRequest("POST", b.getURIForRequest(shard, path), bytes.NewReader(data))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Content-Length", strconv.FormatInt(int64(len(data)), 10))
return req, nil
}
// Select a new target from the given sequence and send the request to
// it. Wrap HTTP errors in a RemoteError object.
func (b *balancedBackend) do(ctx context.Context, seq *sequence, req *http.Request) (resp *http.Response, err error) {
target, terr := seq.Next()
if terr != nil {
return
}
b.log.Printf("sequence %016x: connecting to %s", seq.ID(), target)
client := &http.Client{
Transport: b.transportCache.getTransport(target),
}
resp, err = client.Do(req.WithContext(ctx))
if err == nil && resp.StatusCode != 200 {
err = remoteErrorFromResponse(resp)
if !isStatusTemporary(resp.StatusCode) {
err = backoff.Permanent(err)
}
resp.Body.Close() // nolint
resp = nil
}
seq.Done(target, err)
return
}
var errNoTargets = errors.New("no available backends")
type targetGenerator interface {
getTargets() []string
setStatus(string, bool)
}
// A replicatedSequence repeatedly iterates over available backends in order of
// preference. Once in a while it refreshes its list of available
// targets.
type sequence struct {
id uint64
tg targetGenerator
targets []string
pos int
}
func newSequence(tg targetGenerator) *sequence {
return &sequence{
id: rand.Uint64(),
tg: tg,
targets: tg.getTargets(),
}
}
func (s *sequence) ID() uint64 { return s.id }
func (s *sequence) reloadTargets() {
targets := s.tg.getTargets()
if len(targets) > 0 {
s.targets = targets
s.pos = 0
}
}
// Next returns the next target.
func (s *sequence) Next() (t string, err error) {
if s.pos >= len(s.targets) {
s.reloadTargets()
if len(s.targets) == 0 {
err = errNoTargets
return
}
}
t = s.targets[s.pos]
s.pos++
return
}
func (s *sequence) Done(t string, err error) {
s.tg.setStatus(t, err == nil)
}
// A shardedGenerator returns a single sharded target to a sequence.
type shardedGenerator struct {
id uint64
addrs []string
}
func newShardedGenerator(shard, base string, resolver resolver) *shardedGenerator {
return &shardedGenerator{
id: rand.Uint64(),
addrs: resolver.ResolveIP(fmt.Sprintf("%s.%s", shard, base)),
}
}
func (g *shardedGenerator) getTargets() []string { return g.addrs }
func (g *shardedGenerator) setStatus(_ string, _ bool) {}
// Concatenate two URI paths.
func appendPath(a, b string) string {
if strings.HasSuffix(a, "/") && strings.HasPrefix(b, "/") {
return a + b[1:]
}
return a + b
}
// Some HTTP status codes are treated are temporary errors.
func isStatusTemporary(code int) bool {
switch code {
case http.StatusTooManyRequests, http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout:
return true
default:
return false
}
}
package clientutil
import (
"log"
"net"
"sync"
"time"
"golang.org/x/sync/singleflight"
)
type resolver interface {
ResolveIP(string) []string
}
type dnsResolver struct{}
func (r *dnsResolver) ResolveIP(hostport string) []string {
var resolved []string
host, port, err := net.SplitHostPort(hostport)
if err != nil {
log.Printf("error parsing %s: %v", hostport, err)
return nil
}
hostIPs, err := net.LookupIP(host)
if err != nil {
log.Printf("error resolving %s: %v", host, err)
return nil
}
for _, ip := range hostIPs {
resolved = append(resolved, net.JoinHostPort(ip.String(), port))
}
return resolved
}
var defaultResolver = newDNSCache(&dnsResolver{})
type cacheDatum struct {
addrs []string
deadline time.Time
}
var dnsCacheTTL = 1 * time.Minute
type dnsCache struct {
resolver resolver
sf singleflight.Group
mx sync.RWMutex
cache map[string]cacheDatum
}
func newDNSCache(resolver resolver) *dnsCache {
return &dnsCache{
resolver: resolver,
cache: make(map[string]cacheDatum),
}
}
func (c *dnsCache) get(host string) ([]string, bool) {
d, ok := c.cache[host]
if !ok {
return nil, false
}
return d.addrs, d.deadline.After(time.Now())
}
func (c *dnsCache) update(host string) []string {
v, _, _ := c.sf.Do(host, func() (interface{}, error) {
addrs := c.resolver.ResolveIP(host)
// By uncommenting this, we stop caching negative results.
// if len(addrs) == 0 {
// return nil, nil
// }
c.mx.Lock()
c.cache[host] = cacheDatum{
addrs: addrs,
deadline: time.Now().Add(dnsCacheTTL),
}
c.mx.Unlock()
return addrs, nil
})
return v.([]string)
}
func (c *dnsCache) ResolveIP(host string) []string {
c.mx.RLock()
addrs, ok := c.get(host)
c.mx.RUnlock()
if ok {
return addrs
}
if len(addrs) > 0 {
go c.update(host)
return addrs
}
return c.update(host)
}
// Package clientutil implements a very simple style of JSON RPC.
//
// Requests and responses are both encoded in JSON, and they should
// have the "application/json" Content-Type.
//
// HTTP response statuses other than 200 indicate an error: in this
// case, the response body may contain (in plain text) further details
// about the error. Some HTTP status codes are considered temporary
// errors (incl. 429 for throttling). The client will retry requests,
// if targets are available, until the context expires - so it's quite
// important to remember to set a timeout on the context given to the
// Call() function!
//
// The client handles both replicated services and sharded
// (partitioned) services. Users of this package that want to support
// sharded deployments are supposed to pass a shard ID to every
// Call(). At the deployment stage, sharding can be enabled via the
// configuration.
//
// For replicated services, the client will expect the provided
// hostname to resolve to one or more IP addresses, in which case it
// will pick a random IP address on every new request, while
// remembering which addresses have had errors and trying to avoid
// them. It will however send an occasional request to the failed
// targets, to see if they've come back.
//
// For sharded services, the client makes simple HTTP requests to the
// specific target identified by the shard. It does this by prepending
// the shard ID to the backend hostname (so a request to "example.com"
// with shard ID "1" becomes a request to "1.example.com").
//
// The difference with other JSON-RPC implementations is that we use a
// different URI for every method, and we force the usage of
// request/response types. This makes it easy for projects to
// eventually migrate to GRPC.
//
package clientutil
package clientutil
import (
"fmt"
"io/ioutil"
"net/http"
)
// RemoteError represents a HTTP error from the server. The status
// code and response body can be retrieved with the StatusCode() and
// Body() methods.
type RemoteError struct {
statusCode int
body string
}
func remoteErrorFromResponse(resp *http.Response) *RemoteError {
// Optimistically read the response body, ignoring errors.
var body string
if data, err := ioutil.ReadAll(resp.Body); err == nil {
body = string(data)
}
return &RemoteError{statusCode: resp.StatusCode, body: body}
}
// Error implements the error interface.
func (e *RemoteError) Error() string {
return fmt.Sprintf("%d - %s", e.statusCode, e.body)
}
// StatusCode returns the HTTP status code.
func (e *RemoteError) StatusCode() int { return e.statusCode }
// Body returns the response body.
func (e *RemoteError) Body() string { return e.body }
package clientutil
import (
"crypto/tls"
"errors"
common "git.autistici.org/ai3/go-common"
)
// TLSClientConfig defines the TLS parameters for a client connection
// that should use a client X509 certificate for authentication.
type TLSClientConfig struct {
Cert string `yaml:"cert"`
Key string `yaml:"key"`