Commit 78d97cae by ale

fix an issue with pids; parallelize json decoding

1 parent 9348b3c3
Pipeline #242 passed
in 1 minute 6 seconds
Showing with 64 additions and 32 deletions
......@@ -5,14 +5,16 @@
package main
import (
"bufio"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"os"
"reflect"
"runtime"
"strings"
"sync"
"time"
"golang.org/x/net/context"
......@@ -26,6 +28,7 @@ var (
toDate = flag.String("to", "", "end date (default now)")
syslogFacility = flag.String("facility", "", "filter syslog_facility")
doDumpQuery = flag.Bool("dump-query", false, "don't do anything, just print the ES query as JSON")
batchSize = flag.Int("size", 10000, "ES scroll batch size")
)
var allowedTimeFormats = []string{
......@@ -51,20 +54,18 @@ type logMessage struct {
Message string `json:"syslog_message"`
Host string `json:"host"`
Program string `json:"program"`
Pid int `json:"pid"`
Pid string `json:"pid"`
}
func (m logMessage) write(w io.Writer) {
func (m *logMessage) format() string {
ts := m.Timestamp.Format(time.Stamp)
if m.Program != "" {
if m.Pid != 0 {
fmt.Fprintf(w, "%s %s %s[%s]: %s\n", ts, m.Host, m.Program, m.Pid, m.Message)
} else {
fmt.Fprintf(w, "%s %s %s: %s\n", ts, m.Host, m.Program, m.Message)
}
} else {
fmt.Fprintf(w, "%s %s %s\n", ts, m.Host, m.Message)
if m.Program == "" {
return fmt.Sprintf("%s %s %s\n", ts, m.Host, m.Message)
}
if m.Pid == "" {
return fmt.Sprintf("%s %s %s: %s\n", ts, m.Host, m.Program, m.Message)
}
return fmt.Sprintf("%s %s %s[%s]: %s\n", ts, m.Host, m.Program, m.Pid, m.Message)
}
func logstashIndexes(from, to time.Time) []string {
......@@ -79,14 +80,14 @@ func logstashIndexes(from, to time.Time) []string {
func makeQuery(q, facility string, from, to time.Time) (elastic.Query, []string) {
queries := []elastic.Query{
elastic.NewTermQuery("type", "syslog"),
elastic.NewMatchQuery("type", "syslog"),
elastic.NewRangeQuery("@timestamp").From(from).To(to),
}
if q != "" {
queries = append(queries, elastic.NewQueryStringQuery(q))
}
if facility != "" {
queries = append(queries, elastic.NewTermQuery("syslog_facility", facility))
queries = append(queries, elastic.NewMatchQuery("syslog_facility", facility))
}
query := elastic.NewBoolQuery().Must(queries...)
......@@ -94,27 +95,56 @@ func makeQuery(q, facility string, from, to time.Time) (elastic.Query, []string)
return query, indexes
}
func runQuery(client *elastic.Client, query elastic.Query, indexes []string) error {
scroll := elastic.NewScrollService(client).
Index(indexes...).
Query(query).
KeepAlive("10m")
for {
result, err := scroll.Do(context.Background())
if err == io.EOF {
break
} else if err != nil {
return err
}
var logType logMessage
for _, item := range result.Each(reflect.TypeOf(logType)) {
if l, ok := item.(logMessage); ok {
l.write(os.Stdout)
func runQuery(client *elastic.Client, query elastic.Query, indexes []string, w io.Writer) error {
c := make(chan json.RawMessage, 10000)
sc := make(chan string, 10000)
var wg sync.WaitGroup
go func() {
defer close(c)
scroll := client.Scroll(indexes...).
Query(query).
Size(*batchSize).
KeepAlive("10m")
for {
result, err := scroll.Do(context.Background())
if err == io.EOF {
return
} else if err != nil {
log.Println(err)
return
}
for _, hit := range result.Hits.Hits {
c <- *hit.Source
}
}
}()
for i := 0; i < runtime.NumCPU(); i++ {
wg.Add(1)
go func() {
defer wg.Done()
for hit := range c {
var m logMessage
if err := json.Unmarshal(hit, &m); err != nil {
log.Printf("error decoding result")
continue
}
sc <- m.format()
}
}()
}
go func() {
wg.Wait()
close(sc)
}()
for s := range sc {
io.WriteString(w, s)
}
return nil
}
......@@ -155,7 +185,9 @@ func main() {
if err != nil {
log.Fatal(err)
}
if err := runQuery(client, query, indexes); err != nil {
w := bufio.NewWriter(os.Stdout)
if err := runQuery(client, query, indexes, w); err != nil {
log.Fatal(err)
}
w.Flush()
}
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!