Skip to content
Snippets Groups Projects
Select Git revision
  • master
  • lintian-fixes
  • latest
3 results

README.mm-handler

Blame
  • Forked from ai3 / thirdparty / mailman-lmtp
    Source project has a limited visibility.
    auxdb.go 5.31 KiB
    package auxdbbackend
    
    import (
    	"context"
    	"encoding/json"
    	"log"
    	"strings"
    	"sync"
    
    	as "git.autistici.org/ai3/accountserver"
    	"git.autistici.org/ai3/go-common/clientutil"
    	auxpb "git.autistici.org/ai3/tools/aux-db/proto"
    )
    
    // Data type of the webapp-related resources in aux-db.
    const auxDbWebappDataType = "cms_info"
    
    // Data type of disk usage resources in aux-db.
    const auxDbDiskUsageDataType = "disk_usage"
    
    // AuxWebappBackend looks up website information (cms_info data type) in
    // the aux-db service.
    //
    // A lookup spawns a number of requests for each resource that might
    // be involved, but we would like to group requests to aux-db such as
    // to minimize their number (and parallelize them). So we generate a
    // list of resource IDs to lookup, along with an associated callback
    // that will modify the original resource. Then we group them by
    // shard, fan out the requests in parallel, and invoke the callbacks
    // with the results.
    type AuxWebappBackend struct {
    	as.Backend
    
    	auxdbbe clientutil.Backend
    }
    
    type wdbTX struct {
    	as.TX
    	auxdbbe clientutil.Backend
    }
    
    func Wrap(b as.Backend, config *clientutil.BackendConfig) (*AuxWebappBackend, error) {
    	be, err := clientutil.NewBackend(config)
    	if err != nil {
    		return nil, err
    	}
    	return &AuxWebappBackend{
    		Backend: b,
    		auxdbbe: be,
    	}, nil
    }
    
    func (b *AuxWebappBackend) NewTransaction() (as.TX, error) {
    	tx, err := b.Backend.NewTransaction()
    	if err != nil {
    		return nil, err
    	}
    	return &wdbTX{
    		TX:      tx,
    		auxdbbe: b.auxdbbe,
    	}, nil
    }
    
    func (tx *wdbTX) GetUser(ctx context.Context, name string) (*as.RawUser, error) {
    	user, err := tx.TX.GetUser(ctx, name)
    	if err != nil || user == nil {
    		return user, err
    	}
    
    	// Find all web resources.
    	tx.lookup(ctx, lookupsForResources(user.Resources))
    
    	return user, nil
    }
    
    func (tx *wdbTX) GetResource(ctx context.Context, id as.ResourceID) (*as.RawResource, error) {
    	rsrc, err := tx.TX.GetResource(ctx, id)
    	if err != nil || rsrc == nil {
    		return rsrc, err
    	}
    
    	tx.lookup(ctx, lookupsForResources([]*as.Resource{&rsrc.Resource}))
    
    	return rsrc, nil
    }
    
    func decodeJSONValue(value auxpb.EncodedJSON, out interface{}) error {
    	return json.NewDecoder(strings.NewReader(string(value))).Decode(out)
    }
    
    func setCMSInfo(rsrc *as.Resource) func(*auxpb.Entry) error {
    	return func(entry *auxpb.Entry) error {
    		var app as.App
    		if err := decodeJSONValue(entry.ValueJSON, &app); err != nil {
    			return err
    		}
    		app.Timestamp = entry.Timestamp
    		app.Path = entry.Key.AppKey
    		app.Site = entry.Key.ResourceID
    		rsrc.Website.CMSInfo = append(rsrc.Website.CMSInfo, &app)
    		return nil
    	}
    }
    
    func setDiskUsage(rsrc *as.Resource) func(*auxpb.Entry) error {
    	return func(entry *auxpb.Entry) error {
    		var datum struct {
    			Usage int64 `json:"usage"`
    		}
    		if err := decodeJSONValue(entry.ValueJSON, &datum); err != nil {
    			return err
    		}
    		rsrc.UsageBytes = datum.Usage
    		return nil
    	}
    }
    
    type callbackFunc func(*auxpb.Entry) error
    
    type lookupEntry struct {
    	key      auxpb.Key
    	callback callbackFunc
    }
    
    func lookupsForResources(resources []*as.Resource) (lookups []*lookupEntry) {
    	for _, r := range resources {
    		if r.Type == as.ResourceTypeWebsite || r.Type == as.ResourceTypeDomain {
    			lookups = append(lookups, &lookupEntry{
    				key: auxpb.Key{
    					Type:       auxDbWebappDataType,
    					Shard:      r.Shard,
    					ResourceID: r.ID.String(),
    				},
    				callback: setCMSInfo(r),
    			})
    		}
    		if r.Type == as.ResourceTypeDAV || r.Type == as.ResourceTypeEmail || r.Type == as.ResourceTypeDatabase {
    			lookups = append(lookups, &lookupEntry{
    				key: auxpb.Key{
    					Type:       auxDbDiskUsageDataType,
    					Shard:      r.Shard,
    					ResourceID: r.ID.String(),
    				},
    				callback: setDiskUsage(r),
    			})
    		}
    	}
    	return
    }
    
    func (tx *wdbTX) lookup(ctx context.Context, lookups []*lookupEntry) {
    	// Group the keys by shard, and build a reverse index of keys
    	// -> callbacks, that we'll use to process the results.
    	keysByShard := make(map[string][]auxpb.Key)
    	callbacks := make(map[string]callbackFunc)
    	for _, l := range lookups {
    		keysByShard[l.key.Shard] = append(keysByShard[l.key.Shard], l.key)
    		callbacks[l.key.String()] = l.callback
    	}
    
    	// Query each shard in parallel, but send results over a
    	// channel so that we can invoke all callbacks from the main
    	// goroutine and avoid worrying about locking.
    	var wg sync.WaitGroup
    	ch := make(chan *auxpb.Entry, 10)
    
    	for shardID, keys := range keysByShard {
    		wg.Add(1)
    		go func(shardID string, keys []auxpb.Key) {
    			defer wg.Done()
    			var resp auxpb.GetResponse
    			if err := tx.auxdbbe.Call(ctx, shardID, "/api/get", &auxpb.GetRequest{Keys: keys}, &resp); err != nil {
    				// Errors are non-fatal, just log them.
    				log.Printf("aux-db lookup error (shard %s): %v", shardID, err)
    				return
    			}
    			for _, entry := range resp.Results {
    				ch <- entry
    			}
    		}(shardID, keys)
    	}
    
    	go func() {
    		wg.Wait()
    		close(ch)
    	}()
    
    	// We might receive multiple entries for the same lookup key:
    	// they will have different AppKeys. To lookup the callback,
    	// we ignore the AppKey part of the key.
    	for entry := range ch {
    		lookupKey := entry.Key
    		lookupKey.AppKey = ""
    		cb, ok := callbacks[lookupKey.String()]
    		if !ok {
    			// Violation of the aux-db Get API contract.
    			log.Printf("unexpected aux-db entry %s", entry.Key.String())
    			continue
    		}
    		if err := cb(entry); err != nil {
    			log.Printf("error parsing aux-db entry %s: %v", entry.Key.String(), err)
    		}
    	}
    }