Commit 96c16085 authored by Kamil Kisiel's avatar Kamil Kisiel

Initial implementation of TCP management server

Implemented some of the commands.
parent 44925910
......@@ -55,33 +55,18 @@ func (m Metric) String() string {
return fmt.Sprintf("{%s, %s, %f}", m.Type, m.Bucket, m.Value)
}
func main () {
var metricChan = make(chan Metric)
go metricListener(metricChan)
go metricAggregator(metricChan)
// Run forever
select {}
}
type MetricMap map[string]float64
type MetricListMap map[string][]float64
func metricListener(metricChan chan Metric) {
conn, err := net.ListenPacket("udp", ":8125")
if err != nil {
log.Fatal(err)
return
}
msg := make([]byte, 1024)
for {
nbytes, _, err := conn.ReadFrom(msg)
if err != nil {
log.Printf("%s", err)
continue
}
go handleMessage(metricChan, string(msg[:nbytes]))
}
type ConsoleRequest struct {
Command string
ResultChan chan string
}
type MetricMap map[string]float64
type MetricListMap map[string][]float64
type ConsoleSession struct {
RequestChan chan string
ResultChan chan string
}
func round(v float64) float64 {
return math.Floor(v + 0.5)
......@@ -156,7 +141,7 @@ func flushMetrics(counters MetricMap, gauges MetricMap, timers MetricListMap, fl
}
func metricAggregator(metricChan chan Metric) {
func metricAggregator(metricChan chan Metric, consoleChan chan ConsoleRequest) {
var counters = make(MetricMap)
var gauges = make(MetricMap)
var timers = make(MetricListMap)
......@@ -212,6 +197,37 @@ func metricAggregator(metricChan chan Metric) {
gauges = new_gauges
flushTimer = time.NewTimer(flushInterval)
case consoleRequest := <-consoleChan:
var result string
switch parts := strings.Split(strings.TrimSpace(consoleRequest.Command), " "); parts[0] {
case "help":
result = "Commands: stats, counters, timers, gauges, delcounters, deltimers, delgauges, quit\n"
case "stats":
result = "stats:\n"
case "counters":
result = "counters:\n"
case "timers":
result = "timers:\n"
case "gauges":
result = "gauges:\n"
case "delcounters":
for _, k := range parts[1:] {
delete(counters, k)
}
case "deltimers":
for _, k := range parts[1:] {
delete(timers, k)
}
case "delgauges":
for _, k := range parts[1:] {
delete(gauges, k)
}
case "quit":
result = "quit"
default:
result = fmt.Sprintf("unknown command: %s\n", parts[0])
}
consoleRequest.ResultChan <- result
}
}
}
......@@ -296,3 +312,68 @@ func handleMessage(metricChan chan Metric, msg string) {
}
}
}
func metricListener(metricChan chan Metric) {
conn, err := net.ListenPacket("udp", ":8125")
if err != nil {
log.Fatal(err)
return
}
msg := make([]byte, 1024)
for {
nbytes, _, err := conn.ReadFrom(msg)
if err != nil {
log.Printf("%s", err)
continue
}
go handleMessage(metricChan, string(msg[:nbytes]))
}
}
func consoleClient(conn net.Conn, consoleChan chan ConsoleRequest) {
defer conn.Close()
command := make([]byte, 1024)
resultChan := make(chan string)
for {
nbytes, err := conn.Read(command)
if err != nil {
// Connection has likely closed
return
}
consoleChan <- ConsoleRequest{string(command[:nbytes]), resultChan}
result := <-resultChan
if result == "quit" {
return
}
conn.Write([]byte(result))
}
}
func consoleServer(consoleChan chan ConsoleRequest) {
ln, err := net.Listen("tcp", ":8126")
if err != nil {
log.Fatal(err)
return
}
for {
conn, err := ln.Accept()
if err != nil {
log.Printf("%s", err)
continue
}
go consoleClient(conn, consoleChan)
}
}
func main () {
var metricChan = make(chan Metric)
var consoleChan = make(chan ConsoleRequest)
go metricListener(metricChan)
go metricAggregator(metricChan, consoleChan)
go consoleServer(consoleChan)
// Run forever
select {}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment