diff --git a/partition/debug.go b/partition/debug.go index 221b1bbc95a89f19b275a997ccb4f77bbd29ad9c..ac487760bdcf0f7f11d8236251166fcb0b0fc07d 100644 --- a/partition/debug.go +++ b/partition/debug.go @@ -8,13 +8,17 @@ import ( "net/rpc" "strings" "sync" + "time" "git.autistici.org/ale/djrandom/util/status" ) -const debugText = `<html> -<body> +const debugDumpPartitionText = `<html> +<head> <title>Partition Map: {{.Name}}</title> +</head> +<body> +<h1>Partition "{{.Name}}" (raw)</h1> <table> <thead> <tr> @@ -31,12 +35,50 @@ const debugText = `<html> </tbody> </table> <p>Version: <tt>{{.Pmap.GetVersion}}</tt></p> +<p><a href="/debug/part/{{.Name}}">Back to overview</a></p> </body> </html>` -const debugListText = `<html> +const debugShowPartitionText = `<html> +<head> +<title>Partition Map: {{.Name}}</title> +</head> <body> +<h1>Partition "{{.Name}}" (overview)</h1> +<p> +Versions: +{{range .GlobalStatus.AllVersions}}{{.}} {{end}} +</p> +<table> + <thead> + <tr> + <th>Host</th> + <th>Version</th> + <th>Stamp</th> + <th>#Items</th> + </tr> + </thead> + <tbody> + {{range .GlobalStatus.Sort}} + <tr> + <td>{{.Name}}</td> + <td>{{.Version}}</td> + <td>{{.Stamp}}</td> + <td>{{.Total}}</td> + </tr> + {{end}} + </tbody> +</table> +<p><a href="/debug/part/raw/{{.Name}}">Raw partition map</a></p> +</body> +</html>` + +const debugListPartitionsText = `<html> +<head> <title>Partition Maps</title> +</head> +<body> +<h1>Partition Maps</h1> <p> {{range .}} <a href="/debug/part/{{.}}">{{.}}</a><br> @@ -52,85 +94,124 @@ func httpize(s string) string { return s } -var debug = template.Must(template.New("PartitionMap debug").Funcs(template.FuncMap{"httpize": httpize}).Parse(debugText)) -var debugList = template.Must(template.New("PartitionMap debug list").Parse(debugListText)) +var ( + debugDumpPartitionTpl = template.Must(template.New("dump_partition").Funcs(template.FuncMap{"httpize": httpize}).Parse(debugDumpPartitionText)) + debugShowPartitionTpl = template.Must(template.New("show_partition").Funcs(template.FuncMap{"httpize": httpize}).Parse(debugShowPartitionText)) + debugListPartitionsTpl = template.Must(template.New("list_partitions").Parse(debugListPartitionsText)) -var debugPartitionMap = make(map[string]*PartitionMap) -var debugPartitionMapLock sync.Mutex + knownPartitionsMap = make(map[string]*PartitionMap) + knownPartitionsMapLock sync.Mutex +) -func registerPartitionMap(name string, pmap *PartitionMap) { - debugPartitionMapLock.Lock() - defer debugPartitionMapLock.Unlock() - debugPartitionMap[name] = pmap +func Register(pmap *PartitionMap, name string) { + knownPartitionsMapLock.Lock() + defer knownPartitionsMapLock.Unlock() + if _, ok := knownPartitionsMap[name]; ok { + panic(fmt.Sprintf("Partition map '%s' registered more than once", name)) + } + knownPartitionsMap[name] = pmap } type debugRPC struct{} type GetVersionRequest struct { - Partition string + Name string } type GetVersionResponse struct { - Partition string - Version string + Name string + Version string + Stamp time.Time } func (server debugRPC) GetVersion(req *GetVersionRequest, resp *GetVersionResponse) error { - debugPartitionMapLock.Lock() - pmap, ok := debugPartitionMap[req.Partition] - debugPartitionMapLock.Unlock() + knownPartitionsMapLock.Lock() + pmap, ok := knownPartitionsMap[req.Name] + knownPartitionsMapLock.Unlock() if !ok { return errors.New("Not found") } - resp.Partition = req.Partition + resp.Name = req.Name resp.Version = pmap.GetVersion() + resp.Stamp = pmap.GetStamp() return nil } -type debugHTTP struct{} - -func (server debugHTTP) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func debugHTTPDumpPartition(w http.ResponseWriter, r *http.Request) { name := r.URL.Path + if name == "" { + http.Error(w, "No such partition", http.StatusNotFound) + return + } + + knownPartitionsMapLock.Lock() + pmap, ok := knownPartitionsMap[name] + knownPartitionsMapLock.Unlock() + if !ok { + http.Error(w, "Not Found", http.StatusNotFound) + return + } + ctx := struct { + Pmap *PartitionMap + Name string + }{pmap, name} + if err := debugDumpPartitionTpl.Execute(w, &ctx); err != nil { + fmt.Fprintf(w, "debug: error executing template: %v", err) + } +} + +func debugHTTPShowPartition(w http.ResponseWriter, name string) { + if name == "" { + http.Error(w, "No such partition", http.StatusNotFound) + return + } + + knownPartitionsMapLock.Lock() + pmap, ok := knownPartitionsMap[name] + knownPartitionsMapLock.Unlock() + if !ok { + http.Error(w, "Not Found", http.StatusNotFound) + return + } + + status := getGlobalPartitionStatus(pmap, name) + + ctx := struct { + Name string + Pmap *PartitionMap + GlobalStatus statusMap + }{name, pmap, status} + if err := debugShowPartitionTpl.Execute(w, &ctx); err != nil { + fmt.Fprintf(w, "debug: error executing template: %v", err) + } +} + +func debugHTTPMain(w http.ResponseWriter, r *http.Request) { + name := strings.TrimPrefix(r.URL.Path, "/") + var err error if name == "" { - pmapList := make([]string, 0, len(debugPartitionMap)) - debugPartitionMapLock.Lock() - for name, _ := range debugPartitionMap { + pmapList := make([]string, 0, len(knownPartitionsMap)) + knownPartitionsMapLock.Lock() + for name, _ := range knownPartitionsMap { pmapList = append(pmapList, name) } - debugPartitionMapLock.Unlock() - err = debugList.Execute(w, pmapList) - } else { - debugPartitionMapLock.Lock() - pmap, ok := debugPartitionMap[name] - debugPartitionMapLock.Unlock() - if !ok { - http.Error(w, "Not Found", http.StatusNotFound) - return + knownPartitionsMapLock.Unlock() + if err = debugListPartitionsTpl.Execute(w, pmapList); err != nil { + fmt.Fprintf(w, "debug: error executing template: %v", err) } - ctx := struct { - Pmap *PartitionMap - Name string - }{pmap, name} - err = debug.Execute(w, &ctx) - } - - if err != nil { - fmt.Fprintln(w, "rpc: error executing template: ", err.Error()) + } else { + debugHTTPShowPartition(w, name) } } -var debugRpcServer = &debugRPC{} -var debugHttpServer = &debugHTTP{} - func init() { - http.Handle("/debug/part/", - http.StripPrefix("/debug/part/", debugHttpServer)) - rpc.RegisterName("Partition", debugRpcServer) - status.RegisterDebugEndpoint("/debug/part/") -} + mux := http.NewServeMux() + mux.Handle("/raw/", http.StripPrefix("/raw/", http.HandlerFunc(debugHTTPDumpPartition))) + mux.Handle("/", http.HandlerFunc(debugHTTPMain)) -func (pmap *PartitionMap) HandleDebug(name string) { - registerPartitionMap(name, pmap) + http.Handle("/debug/part/", http.StripPrefix("/debug/part", mux)) + rpc.RegisterName("Partition", &debugRPC{}) + status.RegisterDebugEndpoint("/debug/part/") } diff --git a/partition/partition.go b/partition/partition.go index 2336c6559c348ff532e75c6144a00ed3e40ea8f2..a210ac078907bf040778c3f260eee54632235a14 100644 --- a/partition/partition.go +++ b/partition/partition.go @@ -5,11 +5,12 @@ import ( "encoding/hex" "encoding/json" "io" - "io/ioutil" "log" "math/big" "math/rand" + "os" "sort" + "time" ) type Partition struct { @@ -26,6 +27,7 @@ func (pl partitionList) Less(i, j int) bool { return pl[i].EndKey < pl[j].EndKey type PartitionMap struct { P partitionList version string + stamp time.Time } // GetVersion lazily computes a fingerprint of the partition map. @@ -43,6 +45,10 @@ func (pmap *PartitionMap) GetVersion() string { return pmap.version } +func (pmap *PartitionMap) GetStamp() time.Time { + return pmap.stamp +} + // GetPartition returns the Partition for a key. func (pmap *PartitionMap) GetPartition(key string) Partition { // Run a binary search to find the partition. @@ -79,18 +85,20 @@ func (pmap *PartitionMap) AllTargets() []string { return result } -// Save stores the partition map in a file, in JSON-encoded format. +// Save stores the partition map in a file, in JSON format. func (pmap *PartitionMap) Save(path string) error { - buf, err := json.Marshal(pmap) + file, err := os.Create(path) if err != nil { return err } - return ioutil.WriteFile(path, buf, 0644) + defer file.Close() + return json.NewEncoder(file).Encode(pmap) } func NewPartitionMap() *PartitionMap { return &PartitionMap{ - P: []Partition{}, + P: []Partition{}, + stamp: time.Now(), } } @@ -101,19 +109,25 @@ func NewSimplePartitionMap(target string) *PartitionMap { } func LoadPartitionMap(path string) (*PartitionMap, error) { - data, err := ioutil.ReadFile(path) + file, err := os.Open(path) if err != nil { return nil, err } + defer file.Close() - pmap := NewPartitionMap() - err = json.Unmarshal(data, pmap) + var pmap PartitionMap + if err = json.NewDecoder(file).Decode(&pmap); err != nil { + return nil, err + } + + stat, err := file.Stat() if err != nil { return nil, err } + pmap.stamp = stat.ModTime() sort.Sort(pmap.P) - return pmap, nil + return &pmap, nil } func MustLoadPartitionMap(path string) *PartitionMap { diff --git a/partition/status.go b/partition/status.go new file mode 100644 index 0000000000000000000000000000000000000000..b5e52d2065d84d19e1f4c92c97aba2c83a58d9a9 --- /dev/null +++ b/partition/status.go @@ -0,0 +1,118 @@ +package partition + +import ( + "net/rpc" + "net/url" + "sort" + "strings" + "time" +) + +type PartitionStatus struct { + Name string + Version string + Stamp time.Time + Counters map[string]int + Err error +} + +func (s PartitionStatus) Total() int { + var total int + for _, v := range s.Counters { + total += v + } + return total +} + +type statusMap map[string]*PartitionStatus + +func (m statusMap) AllVersions() []string { + tmp := make(map[string]struct{}) + for _, s := range m { + if s.Err == nil { + tmp[s.Version] = struct{}{} + } + } + var versions []string + for v := range tmp { + versions = append(versions, v) + } + return versions +} + +type partitionStatusList []*PartitionStatus + +func (l partitionStatusList) Len() int { return len(l) } +func (l partitionStatusList) Swap(i, j int) { l[i], l[j] = l[j], l[i] } +func (l partitionStatusList) Less(i, j int) bool { + if l[i].Version > l[j].Version { + return true + } + if l[i].Stamp.After(l[j].Stamp) { + return true + } + return false +} + +func (m statusMap) Sort() []*PartitionStatus { + var out []*PartitionStatus + for _, v := range m { + out = append(out, v) + } + sort.Sort(partitionStatusList(out)) + return out +} + +func target2addr(target string) string { + if strings.HasPrefix(target, "http://") || strings.HasPrefix(target, "https://") { + if u, err := url.Parse(target); err == nil { + return u.Host + } + } + return target +} + +func getPartitionStatus(name, target string) *PartitionStatus { + client, err := rpc.DialHTTP("tcp", target2addr(target)) + if err != nil { + return &PartitionStatus{Name: name, Err: err} + } + defer client.Close() + + req := &GetVersionRequest{Name: name} + var resp GetVersionResponse + if err := client.Call("Partition.GetVersion", req, &resp); err != nil { + return &PartitionStatus{Name: name, Err: err} + } + var sreq struct{} + var sresp PartitionStatsResponse + if err := client.Call("PartitionedService.GetStats", &sreq, &sresp); err != nil { + return &PartitionStatus{Name: name, Err: err} + } + + return &PartitionStatus{ + Name: name, + Version: resp.Version, + Stamp: resp.Stamp, + Counters: sresp.Counters, + } +} + +func getGlobalPartitionStatus(pmap *PartitionMap, name string) statusMap { + c := make(chan *PartitionStatus) + defer close(c) + + targets := pmap.AllTargets() + for _, t := range targets { + go func(t string) { + c <- getPartitionStatus(name, t) + }(t) + } + + status := make(statusMap) + for i := 0; i < len(targets); i++ { + result := <-c + status[result.Name] = result + } + return status +} diff --git a/services/index/client.go b/services/index/client.go index 5e2086703e2a35508f6fbcd5a0fe42ac601f1680..d7b08a7f02c2fc73f9e9ad631e2a38767e87c9ef 100644 --- a/services/index/client.go +++ b/services/index/client.go @@ -31,7 +31,7 @@ func NewIndexClient(pmap *partition.PartitionMap) *IndexClient { func NewIndexClientFromConfig() *IndexClient { pmap := partition.MustLoadPartitionMap( config.MustString("index_pmap", *IndexPmapFile)) - pmap.HandleDebug("index") + partition.Register(pmap, "index") return NewIndexClient(pmap) } diff --git a/services/index/index_server/index_server.go b/services/index/index_server/index_server.go index a361c39f48be07c0391c973056edb3f9f16c9a8c..c0b1ab87b64297352be2d04ec79c6e9afb67edd7 100644 --- a/services/index/index_server/index_server.go +++ b/services/index/index_server/index_server.go @@ -27,7 +27,7 @@ func main() { indexPmap := partition.MustLoadPartitionMap( config.MustString("index_pmap", *index.IndexPmapFile)) - indexPmap.HandleDebug("index") + partition.Register(indexPmap, "index") idxsvc := index.NewIndexService( config.MustString("index_file", *indexFile), diff --git a/services/storage/client.go b/services/storage/client.go index 17ced1237fccd7a23c87a587199cf099e8add549..4346f83c49d5250807da14b08d4d7062bd73dcfb 100644 --- a/services/storage/client.go +++ b/services/storage/client.go @@ -50,7 +50,7 @@ type DistributedStorageClient struct { func NewPartitionMapFromConfig() *partition.PartitionMap { pmap := partition.MustLoadPartitionMap( config.MustString("storage_pmap", *storagePmap)) - pmap.HandleDebug("storage") + partition.Register(pmap, "storage") return pmap }