Commit 7443b86a authored by ale's avatar ale
Browse files

Merge branch 'disk-usage' into 'master'

Disk usage

See merge request !27
parents bd3ca21d d25deeaa
Pipeline #13092 passed with stages
in 1 minute and 32 seconds
package webappdbbackend
package auxdbbackend
import (
"context"
"encoding/json"
"log"
"strings"
"sync"
"time"
as "git.autistici.org/ai3/accountserver"
"git.autistici.org/ai3/go-common/clientutil"
......@@ -14,8 +15,19 @@ import (
// Data type of the webapp-related resources in aux-db.
const auxDbWebappDataType = "cms_info"
// Data type of disk usage resources in aux-db.
const auxDbDiskUsageDataType = "disk_usage"
// AuxWebappBackend looks up website information (cms_info data type) in
// the aux-db service.
//
// A lookup spawns a number of requests for each resource that might
// be involved, but we would like to group requests to aux-db such as
// to minimize their number (and parallelize them). So we generate a
// list of resource IDs to lookup, along with an associated callback
// that will modify the original resource. Then we group them by
// shard, fan out the requests in parallel, and invoke the callbacks
// with the results.
type AuxWebappBackend struct {
as.Backend
......@@ -56,16 +68,7 @@ func (tx *wdbTX) GetUser(ctx context.Context, name string) (*as.RawUser, error)
}
// Find all web resources.
var sites []*as.Resource
for _, r := range user.Resources {
if r.Type == as.ResourceTypeWebsite || r.Type == as.ResourceTypeDomain {
sites = append(sites, r)
}
}
if len(sites) > 0 {
tx.lookup(ctx, sites)
}
tx.lookup(ctx, lookupsForResources(user.Resources))
return user, nil
}
......@@ -76,79 +79,121 @@ func (tx *wdbTX) GetResource(ctx context.Context, id as.ResourceID) (*as.RawReso
return rsrc, err
}
if rsrc.Type == as.ResourceTypeWebsite || rsrc.Type == as.ResourceTypeDomain {
tx.lookup(ctx, []*as.Resource{&rsrc.Resource})
}
tx.lookup(ctx, lookupsForResources([]*as.Resource{&rsrc.Resource}))
return rsrc, nil
}
// Specialization of auxpb.Entry that we can use to deserialize the
// value_json attribute right away into the type we are expecting to
// see. We just need a tiny amount of further processing to wrap back
// some fields into the result.
type webappsGetResponseEntry struct {
Key auxpb.Key `json:"key"`
Timestamp time.Time `json:"timestamp"`
ValueJSON *as.App `json:"value_json"`
func decodeJSONValue(value auxpb.EncodedJSON, out interface{}) error {
return json.NewDecoder(strings.NewReader(string(value))).Decode(out)
}
func setCMSInfo(rsrc *as.Resource) func(*auxpb.Entry) error {
return func(entry *auxpb.Entry) error {
var app as.App
if err := decodeJSONValue(entry.ValueJSON, &app); err != nil {
return err
}
app.Timestamp = entry.Timestamp
app.Path = entry.Key.AppKey
app.Site = entry.Key.ResourceID
rsrc.Website.CMSInfo = append(rsrc.Website.CMSInfo, &app)
return nil
}
}
func (w *webappsGetResponseEntry) App() *as.App {
app := w.ValueJSON
app.Timestamp = w.Timestamp
app.Path = w.Key.AppKey
app.Site = w.Key.ResourceID
return app
func setDiskUsage(rsrc *as.Resource) func(*auxpb.Entry) error {
return func(entry *auxpb.Entry) error {
var datum struct {
Usage int64 `json:"usage"`
}
if err := decodeJSONValue(entry.ValueJSON, &datum); err != nil {
return err
}
rsrc.UsageBytes = datum.Usage
return nil
}
}
type webappsGetResponse struct {
Results []*webappsGetResponseEntry `json:"results"`
type callbackFunc func(*auxpb.Entry) error
type lookupEntry struct {
key auxpb.Key
callback callbackFunc
}
// Lookup resources in the webapp DB, modifying them in-place with any
// eventual CMSInfo data returned.
func (tx *wdbTX) lookup(ctx context.Context, resources []*as.Resource) {
// Build a list of site names, and a site name -> resource map
// so we can modify the original objects with our results.
byShard := make(map[string][]auxpb.Key)
byKey := make(map[string]*as.Resource)
func lookupsForResources(resources []*as.Resource) (lookups []*lookupEntry) {
for _, r := range resources {
// Create the lookup key for aux-db.
auxKey := auxpb.Key{
Type: auxDbWebappDataType,
Shard: r.Shard,
ResourceID: r.ID.String(),
if r.Type == as.ResourceTypeWebsite || r.Type == as.ResourceTypeDomain {
lookups = append(lookups, &lookupEntry{
key: auxpb.Key{
Type: auxDbWebappDataType,
Shard: r.Shard,
ResourceID: r.ID.String(),
},
callback: setCMSInfo(r),
})
}
if r.Type == as.ResourceTypeWebsite || r.Type == as.ResourceTypeDomain || r.Type == as.ResourceTypeEmail || r.Type == as.ResourceTypeDatabase {
lookups = append(lookups, &lookupEntry{
key: auxpb.Key{
Type: auxDbDiskUsageDataType,
Shard: r.Shard,
ResourceID: r.ID.String(),
},
callback: setDiskUsage(r),
})
}
byKey[r.ID.String()] = r
}
return
}
// Group resource keys by shard.
byShard[r.Shard] = append(byShard[r.Shard], auxKey)
func (tx *wdbTX) lookup(ctx context.Context, lookups []*lookupEntry) {
// Group the keys by shard, and build a reverse index of keys
// -> callbacks, that we'll use to process the results.
keysByShard := make(map[string][]auxpb.Key)
callbacks := make(map[string]callbackFunc)
for _, l := range lookups {
keysByShard[l.key.Shard] = append(keysByShard[l.key.Shard], l.key)
callbacks[l.key.String()] = l.callback
}
// Query each shard in parallel, modify the resulting
// Resources in-place, ignoring errors.
// Query each shard in parallel, but send results over a
// channel so that we can invoke all callbacks from the main
// goroutine and avoid worrying about locking.
var wg sync.WaitGroup
for shardID, keys := range byShard {
ch := make(chan *auxpb.Entry, 10)
for shardID, keys := range keysByShard {
wg.Add(1)
go func(shardID string, keys []auxpb.Key) {
defer wg.Done()
var resp webappsGetResponse
var resp auxpb.GetResponse
if err := tx.auxdbbe.Call(ctx, shardID, "/api/get", &auxpb.GetRequest{Keys: keys}, &resp); err != nil {
// Errors are non-fatal, just log them.
log.Printf("aux-db lookup error (shard %s): %v", shardID, err)
return
}
// Now append the cms_info data to the associated Resource object.
for _, entry := range resp.Results {
r, ok := byKey[entry.Key.ResourceID]
if !ok || r.Website == nil {
continue
}
r.Website.CMSInfo = append(r.Website.CMSInfo, entry.App())
ch <- entry
}
}(shardID, keys)
}
wg.Wait()
go func() {
wg.Wait()
close(ch)
}()
for entry := range ch {
cb, ok := callbacks[entry.Key.String()]
if !ok {
// Violation of the aux-db Get API contract.
log.Printf("unexpected aux-db entry %s", entry.Key.String())
continue
}
if err := cb(entry); err != nil {
log.Printf("error parsing aux-db entry %s: %v", entry.Key.String(), err)
}
}
}
......@@ -14,10 +14,10 @@ import (
"git.autistici.org/ai3/go-common/serverutil"
"gopkg.in/yaml.v2"
auxdbbackend "git.autistici.org/ai3/accountserver/backend/auxdb"
cachebackend "git.autistici.org/ai3/accountserver/backend/cache"
insbackend "git.autistici.org/ai3/accountserver/backend/instrumented"
ldapbackend "git.autistici.org/ai3/accountserver/backend/ldap"
webappdbbackend "git.autistici.org/ai3/accountserver/backend/webappdb"
"git.autistici.org/ai3/accountserver/server"
)
......@@ -192,14 +192,14 @@ func main() {
be = insbackend.Wrap(cacheBE, "cache")
}
// Enable lookups to the webappdb (FreeWVS) service. Errors
// are not fatal, the service is optional.
// Enable lookups to the aux-db service. Errors are not fatal,
// the service is optional.
if config.AuxDB != nil {
wdb, err := webappdbbackend.Wrap(be, config.AuxDB)
xdb, err := auxdbbackend.Wrap(be, config.AuxDB)
if err != nil {
log.Printf("warning: could not initialize webappdb backend: %v", err)
} else {
be = wdb
be = xdb
}
}
......
......@@ -482,6 +482,9 @@ type Resource struct {
// arbitrary strings.
Group string `json:"group,omitempty"`
// Usage (for filesystem-based resources).
UsageBytes int64 `json:"usage_bytes"`
// Details about the specific type (only one of these can be
// set, depending on the value of 'type').
Email *Email `json:"email,omitempty"`
......@@ -538,7 +541,6 @@ type Email struct {
Aliases []string `json:"aliases,omitempty"`
Maildir string `json:"maildir"`
QuotaLimit int `json:"quota_limit"`
QuotaUsage int `json:"quota_usage"`
}
// MailingList resource attributes.
......@@ -583,7 +585,6 @@ type Website struct {
Options []string `json:"options,omitempty"`
Categories []string `json:"categories,omitempty"`
Description map[string]string `json:"description,omitempty"`
QuotaUsage int `json:"quota_usage"`
DocumentRoot string `json:"document_root"`
StatsID int `json:"stats_id"`
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment