diff --git a/gostatsd/gostatsd.go b/gostatsd/gostatsd.go index bdc59c1e1315b0f3b96157d9f5c754610d26e532..576d625dc4df71e4cbda732794195fc150f89acd 100644 --- a/gostatsd/gostatsd.go +++ b/gostatsd/gostatsd.go @@ -7,47 +7,45 @@ import ( "time" ) -var ( - metricsAddr string - consoleAddr string - graphiteAddr string - flushInterval time.Duration +const ( + defaultMetricsAddr = ":8125" + defaultConsoleAddr = ":8126" + defaultGraphiteAddr = "localhost:2003" + defaultFlushInterval = 10 * time.Second ) -func init() { - const ( - defaultMetricsAddr = ":8125" - defaultConsoleAddr = ":8126" - defaultGraphiteAddr = "localhost:2003" - defaultFlushInterval = 10 * time.Second - ) - flag.StringVar(&metricsAddr, "l", defaultMetricsAddr, "address on which to listen for metrics") - flag.StringVar(&consoleAddr, "c", defaultConsoleAddr, "address on which to listen for console sessions") - flag.StringVar(&graphiteAddr, "g", defaultGraphiteAddr, "address of the graphite server") - flag.DurationVar(&flushInterval, "f", defaultFlushInterval, "how often to flush metrics to the graphite server") -} - func main() { + metricsAddr := flag.String("l", defaultMetricsAddr, "address on which to listen for metrics") + graphiteAddr := flag.String("g", defaultGraphiteAddr, "address of the graphite server") + flushInterval := flag.Duration("f", defaultFlushInterval, "how often to flush metrics to the graphite server") + webConsoleAddr := flag.String("web", "", "if set, use as the address of the web-based console") + consoleAddr := flag.String("console", "", "if set, use as the address of the telnet-based console ") flag.Parse() // Start the metric aggregator - graphite, err := statsd.NewGraphiteClient(graphiteAddr) + graphite, err := statsd.NewGraphiteClient(*graphiteAddr) if err != nil { log.Fatal(err) } - aggregator := statsd.NewMetricAggregator(&graphite, flushInterval) + aggregator := statsd.NewMetricAggregator(&graphite, *flushInterval) go aggregator.Aggregate() // Start the metric receiver f := func(metric statsd.Metric) { aggregator.MetricChan <- metric } - receiver := statsd.MetricReceiver{metricsAddr, statsd.HandlerFunc(f)} + receiver := statsd.MetricReceiver{*metricsAddr, statsd.HandlerFunc(f)} go receiver.ListenAndReceive() - // Start the console - console := statsd.WebConsoleServer{consoleAddr, &aggregator} - go console.ListenAndServe() + // Start the console(s) + if *consoleAddr != "" { + console := statsd.ConsoleServer{*consoleAddr, &aggregator} + go console.ListenAndServe() + } + if *webConsoleAddr != "" { + console := statsd.WebConsoleServer{*webConsoleAddr, &aggregator} + go console.ListenAndServe() + } // Listen forever select {} diff --git a/statsd/console.go b/statsd/console.go index 98fffb9eb527fde274b7462d48ee7bf4ee69f7d5..60bea14a09cccaa51e0a62e28a1c0c478d68a334 100644 --- a/statsd/console.go +++ b/statsd/console.go @@ -2,8 +2,8 @@ package statsd import ( "fmt" + "github.com/kisielk/cmd.go" "net" - "strings" ) // DefaultConsoleAddr is the default address on which a ConsoleServer will listen @@ -53,33 +53,15 @@ type consoleConn struct { // serve reads from the consoleConn and responds to incoming requests func (c *consoleConn) serve() { defer c.conn.Close() - buf := make([]byte, 1024) - for { - c.conn.Write([]byte("console> ")) - nbytes, err := c.conn.Read(buf) - if err != nil { - // Connection has likely closed - return - } - - var command string - var args []string - var result string - - if parts := strings.Fields(string(buf[:nbytes])); len(parts) > 0 { - command = parts[0] - if len(parts) > 1 { - args = parts[1:] - } - } - - switch command { - case "help": - result = "Commands: stats, counters, timers, gauges, delcounters, deltimers, delgauges, quit\n" - case "stats": + commands := map[string]cmd.CmdFn{ + "help": func(args []string) (string, error) { + return "Commands: stats, counters, timers, gauges, delcounters, deltimers, delgauges, quit\n", nil + }, + "stats": func(args []string) (string, error) { c.server.Aggregator.Lock() - result = fmt.Sprintf( + defer c.server.Aggregator.Unlock() + return fmt.Sprintf( "Invalid messages received: %d\n"+ "Last message received: %s\n"+ "Last flush to Graphite: %s\n"+ @@ -87,47 +69,59 @@ func (c *consoleConn) serve() { c.server.Aggregator.Stats.BadLines, c.server.Aggregator.Stats.LastMessage, c.server.Aggregator.Stats.LastFlush, - c.server.Aggregator.Stats.LastFlushError) - c.server.Aggregator.Unlock() - case "counters": + c.server.Aggregator.Stats.LastFlushError), nil + }, + "counters": func(args []string) (string, error) { c.server.Aggregator.Lock() - result = fmt.Sprint(c.server.Aggregator.Counters) - c.server.Aggregator.Unlock() - case "timers": + defer c.server.Aggregator.Unlock() + return fmt.Sprintln(c.server.Aggregator.Counters), nil + }, + "timers": func(args []string) (string, error) { c.server.Aggregator.Lock() - result = fmt.Sprint(c.server.Aggregator.Timers) - c.server.Aggregator.Unlock() - case "gauges": + defer c.server.Aggregator.Unlock() + return fmt.Sprintln(c.server.Aggregator.Timers), nil + }, + "gauges": func(args []string) (string, error) { c.server.Aggregator.Lock() - result = fmt.Sprint(c.server.Aggregator.Gauges) - c.server.Aggregator.Unlock() - case "delcounters": + defer c.server.Aggregator.Unlock() + return fmt.Sprintln(c.server.Aggregator.Gauges), nil + }, + "delcounters": func(args []string) (string, error) { c.server.Aggregator.Lock() + defer c.server.Aggregator.Unlock() + i := 0 for _, k := range args { delete(c.server.Aggregator.Counters, k) + i++ } - c.server.Aggregator.Unlock() - case "deltimers": + return fmt.Sprintf("deleted %d counters\n", i), nil + }, + "deltimers": func(args []string) (string, error) { c.server.Aggregator.Lock() + defer c.server.Aggregator.Unlock() + i := 0 for _, k := range args { delete(c.server.Aggregator.Timers, k) + i++ } - c.server.Aggregator.Unlock() - case "delgauges": + return fmt.Sprintf("deleted %d timers\n", i), nil + }, + "delgauges": func(args []string) (string, error) { c.server.Aggregator.Lock() + defer c.server.Aggregator.Unlock() + i := 0 for _, k := range args { delete(c.server.Aggregator.Gauges, k) + i++ } - c.server.Aggregator.Unlock() - case "quit": - result = "goodbye\n" - default: - result = fmt.Sprintf("unknown command: %s\n", command) - } - - c.conn.Write([]byte(result)) - if result == "goodbye\n" { - return - } + return fmt.Sprintf("deleted %d gauges\n", i), nil + }, + "quit": func(args []string) (string, error) { + return "goodbye\n", fmt.Errorf("client quit") + }, } + + console := cmd.New(commands, c.conn, c.conn) + console.Prompt = "console> " + console.Loop() }