Select Git revision
README.mm-handler
Forked from
ai3 / thirdparty / mailman-lmtp
Source project has a limited visibility.
-
Thorsten Glaser authoredThorsten Glaser authored
auxdb.go 5.31 KiB
package auxdbbackend
import (
"context"
"encoding/json"
"log"
"strings"
"sync"
as "git.autistici.org/ai3/accountserver"
"git.autistici.org/ai3/go-common/clientutil"
auxpb "git.autistici.org/ai3/tools/aux-db/proto"
)
// Data type of the webapp-related resources in aux-db.
const auxDbWebappDataType = "cms_info"
// Data type of disk usage resources in aux-db.
const auxDbDiskUsageDataType = "disk_usage"
// AuxWebappBackend looks up website information (cms_info data type) in
// the aux-db service.
//
// A lookup spawns a number of requests for each resource that might
// be involved, but we would like to group requests to aux-db such as
// to minimize their number (and parallelize them). So we generate a
// list of resource IDs to lookup, along with an associated callback
// that will modify the original resource. Then we group them by
// shard, fan out the requests in parallel, and invoke the callbacks
// with the results.
type AuxWebappBackend struct {
as.Backend
auxdbbe clientutil.Backend
}
type wdbTX struct {
as.TX
auxdbbe clientutil.Backend
}
func Wrap(b as.Backend, config *clientutil.BackendConfig) (*AuxWebappBackend, error) {
be, err := clientutil.NewBackend(config)
if err != nil {
return nil, err
}
return &AuxWebappBackend{
Backend: b,
auxdbbe: be,
}, nil
}
func (b *AuxWebappBackend) NewTransaction() (as.TX, error) {
tx, err := b.Backend.NewTransaction()
if err != nil {
return nil, err
}
return &wdbTX{
TX: tx,
auxdbbe: b.auxdbbe,
}, nil
}
func (tx *wdbTX) GetUser(ctx context.Context, name string) (*as.RawUser, error) {
user, err := tx.TX.GetUser(ctx, name)
if err != nil || user == nil {
return user, err
}
// Find all web resources.
tx.lookup(ctx, lookupsForResources(user.Resources))
return user, nil
}
func (tx *wdbTX) GetResource(ctx context.Context, id as.ResourceID) (*as.RawResource, error) {
rsrc, err := tx.TX.GetResource(ctx, id)
if err != nil || rsrc == nil {
return rsrc, err
}
tx.lookup(ctx, lookupsForResources([]*as.Resource{&rsrc.Resource}))
return rsrc, nil
}
func decodeJSONValue(value auxpb.EncodedJSON, out interface{}) error {
return json.NewDecoder(strings.NewReader(string(value))).Decode(out)
}
func setCMSInfo(rsrc *as.Resource) func(*auxpb.Entry) error {
return func(entry *auxpb.Entry) error {
var app as.App
if err := decodeJSONValue(entry.ValueJSON, &app); err != nil {
return err
}
app.Timestamp = entry.Timestamp
app.Path = entry.Key.AppKey
app.Site = entry.Key.ResourceID
rsrc.Website.CMSInfo = append(rsrc.Website.CMSInfo, &app)
return nil
}
}
func setDiskUsage(rsrc *as.Resource) func(*auxpb.Entry) error {
return func(entry *auxpb.Entry) error {
var datum struct {
Usage int64 `json:"usage"`
}
if err := decodeJSONValue(entry.ValueJSON, &datum); err != nil {
return err
}
rsrc.UsageBytes = datum.Usage
return nil
}
}
type callbackFunc func(*auxpb.Entry) error
type lookupEntry struct {
key auxpb.Key
callback callbackFunc
}
func lookupsForResources(resources []*as.Resource) (lookups []*lookupEntry) {
for _, r := range resources {
if r.Type == as.ResourceTypeWebsite || r.Type == as.ResourceTypeDomain {
lookups = append(lookups, &lookupEntry{
key: auxpb.Key{
Type: auxDbWebappDataType,
Shard: r.Shard,
ResourceID: r.ID.String(),
},
callback: setCMSInfo(r),
})
}
if r.Type == as.ResourceTypeDAV || r.Type == as.ResourceTypeEmail || r.Type == as.ResourceTypeDatabase {
lookups = append(lookups, &lookupEntry{
key: auxpb.Key{
Type: auxDbDiskUsageDataType,
Shard: r.Shard,
ResourceID: r.ID.String(),
},
callback: setDiskUsage(r),
})
}
}
return
}
func (tx *wdbTX) lookup(ctx context.Context, lookups []*lookupEntry) {
// Group the keys by shard, and build a reverse index of keys
// -> callbacks, that we'll use to process the results.
keysByShard := make(map[string][]auxpb.Key)
callbacks := make(map[string]callbackFunc)
for _, l := range lookups {
keysByShard[l.key.Shard] = append(keysByShard[l.key.Shard], l.key)
callbacks[l.key.String()] = l.callback
}
// Query each shard in parallel, but send results over a
// channel so that we can invoke all callbacks from the main
// goroutine and avoid worrying about locking.
var wg sync.WaitGroup
ch := make(chan *auxpb.Entry, 10)
for shardID, keys := range keysByShard {
wg.Add(1)
go func(shardID string, keys []auxpb.Key) {
defer wg.Done()
var resp auxpb.GetResponse
if err := tx.auxdbbe.Call(ctx, shardID, "/api/get", &auxpb.GetRequest{Keys: keys}, &resp); err != nil {
// Errors are non-fatal, just log them.
log.Printf("aux-db lookup error (shard %s): %v", shardID, err)
return
}
for _, entry := range resp.Results {
ch <- entry
}
}(shardID, keys)
}
go func() {
wg.Wait()
close(ch)
}()
// We might receive multiple entries for the same lookup key:
// they will have different AppKeys. To lookup the callback,
// we ignore the AppKey part of the key.
for entry := range ch {
lookupKey := entry.Key
lookupKey.AppKey = ""
cb, ok := callbacks[lookupKey.String()]
if !ok {
// Violation of the aux-db Get API contract.
log.Printf("unexpected aux-db entry %s", entry.Key.String())
continue
}
if err := cb(entry); err != nil {
log.Printf("error parsing aux-db entry %s: %v", entry.Key.String(), err)
}
}
}