Commit e770009a authored by Kamil Kisiel's avatar Kamil Kisiel

Merge branch 'master' of github.com:kisielk/gostatsd

parents 60fbc594 2d492398
...@@ -7,47 +7,45 @@ import ( ...@@ -7,47 +7,45 @@ import (
"time" "time"
) )
var ( const (
metricsAddr string defaultMetricsAddr = ":8125"
consoleAddr string defaultConsoleAddr = ":8126"
graphiteAddr string defaultGraphiteAddr = "localhost:2003"
flushInterval time.Duration defaultFlushInterval = 10 * time.Second
) )
func init() {
const (
defaultMetricsAddr = ":8125"
defaultConsoleAddr = ":8126"
defaultGraphiteAddr = "localhost:2003"
defaultFlushInterval = 10 * time.Second
)
flag.StringVar(&metricsAddr, "l", defaultMetricsAddr, "address on which to listen for metrics")
flag.StringVar(&consoleAddr, "c", defaultConsoleAddr, "address on which to listen for console sessions")
flag.StringVar(&graphiteAddr, "g", defaultGraphiteAddr, "address of the graphite server")
flag.DurationVar(&flushInterval, "f", defaultFlushInterval, "how often to flush metrics to the graphite server")
}
func main() { func main() {
metricsAddr := flag.String("l", defaultMetricsAddr, "address on which to listen for metrics")
graphiteAddr := flag.String("g", defaultGraphiteAddr, "address of the graphite server")
flushInterval := flag.Duration("f", defaultFlushInterval, "how often to flush metrics to the graphite server")
webConsoleAddr := flag.String("web", "", "if set, use as the address of the web-based console")
consoleAddr := flag.String("console", "", "if set, use as the address of the telnet-based console ")
flag.Parse() flag.Parse()
// Start the metric aggregator // Start the metric aggregator
graphite, err := statsd.NewGraphiteClient(graphiteAddr) graphite, err := statsd.NewGraphiteClient(*graphiteAddr)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
aggregator := statsd.NewMetricAggregator(&graphite, flushInterval) aggregator := statsd.NewMetricAggregator(&graphite, *flushInterval)
go aggregator.Aggregate() go aggregator.Aggregate()
// Start the metric receiver // Start the metric receiver
f := func(metric statsd.Metric) { f := func(metric statsd.Metric) {
aggregator.MetricChan <- metric aggregator.MetricChan <- metric
} }
receiver := statsd.MetricReceiver{metricsAddr, statsd.HandlerFunc(f)} receiver := statsd.MetricReceiver{*metricsAddr, statsd.HandlerFunc(f)}
go receiver.ListenAndReceive() go receiver.ListenAndReceive()
// Start the console // Start the console(s)
console := statsd.WebConsoleServer{consoleAddr, &aggregator} if *consoleAddr != "" {
go console.ListenAndServe() console := statsd.ConsoleServer{*consoleAddr, &aggregator}
go console.ListenAndServe()
}
if *webConsoleAddr != "" {
console := statsd.WebConsoleServer{*webConsoleAddr, &aggregator}
go console.ListenAndServe()
}
// Listen forever // Listen forever
select {} select {}
......
...@@ -2,8 +2,8 @@ package statsd ...@@ -2,8 +2,8 @@ package statsd
import ( import (
"fmt" "fmt"
"github.com/kisielk/cmd"
"net" "net"
"strings"
) )
// DefaultConsoleAddr is the default address on which a ConsoleServer will listen // DefaultConsoleAddr is the default address on which a ConsoleServer will listen
...@@ -53,33 +53,15 @@ type consoleConn struct { ...@@ -53,33 +53,15 @@ type consoleConn struct {
// serve reads from the consoleConn and responds to incoming requests // serve reads from the consoleConn and responds to incoming requests
func (c *consoleConn) serve() { func (c *consoleConn) serve() {
defer c.conn.Close() defer c.conn.Close()
buf := make([]byte, 1024)
for { commands := map[string]cmd.CmdFn{
c.conn.Write([]byte("console> ")) "help": func(args []string) (string, error) {
nbytes, err := c.conn.Read(buf) return "Commands: stats, counters, timers, gauges, delcounters, deltimers, delgauges, quit\n", nil
if err != nil { },
// Connection has likely closed "stats": func(args []string) (string, error) {
return
}
var command string
var args []string
var result string
if parts := strings.Fields(string(buf[:nbytes])); len(parts) > 0 {
command = parts[0]
if len(parts) > 1 {
args = parts[1:]
}
}
switch command {
case "help":
result = "Commands: stats, counters, timers, gauges, delcounters, deltimers, delgauges, quit\n"
case "stats":
c.server.Aggregator.Lock() c.server.Aggregator.Lock()
result = fmt.Sprintf( defer c.server.Aggregator.Unlock()
return fmt.Sprintf(
"Invalid messages received: %d\n"+ "Invalid messages received: %d\n"+
"Last message received: %s\n"+ "Last message received: %s\n"+
"Last flush to Graphite: %s\n"+ "Last flush to Graphite: %s\n"+
...@@ -87,47 +69,59 @@ func (c *consoleConn) serve() { ...@@ -87,47 +69,59 @@ func (c *consoleConn) serve() {
c.server.Aggregator.Stats.BadLines, c.server.Aggregator.Stats.BadLines,
c.server.Aggregator.Stats.LastMessage, c.server.Aggregator.Stats.LastMessage,
c.server.Aggregator.Stats.LastFlush, c.server.Aggregator.Stats.LastFlush,
c.server.Aggregator.Stats.LastFlushError) c.server.Aggregator.Stats.LastFlushError), nil
c.server.Aggregator.Unlock() },
case "counters": "counters": func(args []string) (string, error) {
c.server.Aggregator.Lock() c.server.Aggregator.Lock()
result = fmt.Sprint(c.server.Aggregator.Counters) defer c.server.Aggregator.Unlock()
c.server.Aggregator.Unlock() return fmt.Sprintln(c.server.Aggregator.Counters), nil
case "timers": },
"timers": func(args []string) (string, error) {
c.server.Aggregator.Lock() c.server.Aggregator.Lock()
result = fmt.Sprint(c.server.Aggregator.Timers) defer c.server.Aggregator.Unlock()
c.server.Aggregator.Unlock() return fmt.Sprintln(c.server.Aggregator.Timers), nil
case "gauges": },
"gauges": func(args []string) (string, error) {
c.server.Aggregator.Lock() c.server.Aggregator.Lock()
result = fmt.Sprint(c.server.Aggregator.Gauges) defer c.server.Aggregator.Unlock()
c.server.Aggregator.Unlock() return fmt.Sprintln(c.server.Aggregator.Gauges), nil
case "delcounters": },
"delcounters": func(args []string) (string, error) {
c.server.Aggregator.Lock() c.server.Aggregator.Lock()
defer c.server.Aggregator.Unlock()
i := 0
for _, k := range args { for _, k := range args {
delete(c.server.Aggregator.Counters, k) delete(c.server.Aggregator.Counters, k)
i++
} }
c.server.Aggregator.Unlock() return fmt.Sprintf("deleted %d counters\n", i), nil
case "deltimers": },
"deltimers": func(args []string) (string, error) {
c.server.Aggregator.Lock() c.server.Aggregator.Lock()
defer c.server.Aggregator.Unlock()
i := 0
for _, k := range args { for _, k := range args {
delete(c.server.Aggregator.Timers, k) delete(c.server.Aggregator.Timers, k)
i++
} }
c.server.Aggregator.Unlock() return fmt.Sprintf("deleted %d timers\n", i), nil
case "delgauges": },
"delgauges": func(args []string) (string, error) {
c.server.Aggregator.Lock() c.server.Aggregator.Lock()
defer c.server.Aggregator.Unlock()
i := 0
for _, k := range args { for _, k := range args {
delete(c.server.Aggregator.Gauges, k) delete(c.server.Aggregator.Gauges, k)
i++
} }
c.server.Aggregator.Unlock() return fmt.Sprintf("deleted %d gauges\n", i), nil
case "quit": },
result = "goodbye\n" "quit": func(args []string) (string, error) {
default: return "goodbye\n", fmt.Errorf("client quit")
result = fmt.Sprintf("unknown command: %s\n", command) },
}
c.conn.Write([]byte(result))
if result == "goodbye\n" {
return
}
} }
console := cmd.New(commands, c.conn, c.conn)
console.Prompt = "console> "
console.Loop()
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment