Commit 1f0d3b61 authored by ale's avatar ale
Browse files

Update ES client library, and refactored output pipeline

Also some defaults have been changed.
parent 78d97cae
......@@ -6,7 +6,9 @@ package main
import (
"bufio"
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
......@@ -17,18 +19,17 @@ import (
"sync"
"time"
"golang.org/x/net/context"
"gopkg.in/olivere/elastic.v5"
"gopkg.in/olivere/elastic.v6"
)
var (
elasticURL = flag.String("url", "http://127.0.0.1:9200", "ES URL")
fromDate = flag.String("from", "", "start date (default 1 hour ago)")
toDate = flag.String("to", "", "end date (default now)")
syslogFacility = flag.String("facility", "", "filter syslog_facility")
doDumpQuery = flag.Bool("dump-query", false, "don't do anything, just print the ES query as JSON")
batchSize = flag.Int("size", 10000, "ES scroll batch size")
elasticURL = flag.String("url", "http://127.0.0.1:9200", "ES URL")
fromDate = flag.String("from", "-1h", "start date")
toDate = flag.String("to", "", "end date (default now)")
doDumpQuery = flag.Bool("dump-query", false, "don't do anything, just print the ES query as JSON")
batchSize = flag.Int("batch-size", 10000, "ES scroll batch size")
indexBase = flag.String("index", "logstash", "ES index base name")
objType = flag.String("obj-type", "events", "ES index object type")
)
var allowedTimeFormats = []string{
......@@ -39,69 +40,134 @@ var allowedTimeFormats = []string{
"2006/1/2",
}
func parseDate(s string) (t time.Time, err error) {
for _, f := range allowedTimeFormats {
t, err = time.Parse(f, s)
if err == nil {
break
func parseTime(now time.Time, s string) (t time.Time, err error) {
var d time.Duration
switch {
case strings.HasPrefix(s, "+"):
d, err = time.ParseDuration(s[1:])
if err != nil {
return
}
t = now.Add(d)
case strings.HasPrefix(s, "-"):
d, err = time.ParseDuration(s[1:])
if err != nil {
return
}
t = now.Add(-d)
default:
for _, f := range allowedTimeFormats {
t, err = time.Parse(f, s)
if err == nil {
break
}
}
}
return
}
func parseTimeRange(fromStr, toStr string) (from time.Time, to time.Time, err error) {
now := time.Now()
if fromStr == "" {
from = now.Add(-1 * time.Hour)
} else {
from, err = parseTime(now, fromStr)
if err != nil {
err = fmt.Errorf("error in --from: %v", err)
return
}
}
if toStr == "" {
to = now
} else {
to, err = parseTime(now, toStr)
if err != nil {
err = fmt.Errorf("error in --to: %v", err)
return
}
}
if to.Before(from) {
err = errors.New("--to comes before --from")
}
return
}
type logMessage struct {
Timestamp time.Time `json:"syslog_timestamp"`
Message string `json:"syslog_message"`
Timestamp time.Time `json:"@timestamp"`
Message string `json:"message"`
Host string `json:"host"`
Program string `json:"program"`
Pid string `json:"pid"`
Tag string `json:"tag"`
}
func (m *logMessage) format() string {
ts := m.Timestamp.Format(time.Stamp)
if m.Program == "" {
return fmt.Sprintf("%s %s %s\n", ts, m.Host, m.Message)
}
if m.Pid == "" {
return fmt.Sprintf("%s %s %s: %s\n", ts, m.Host, m.Program, m.Message)
if m.Tag == "" {
return fmt.Sprintf("%s %s%s\n", ts, m.Host, m.Message)
}
return fmt.Sprintf("%s %s %s[%s]: %s\n", ts, m.Host, m.Program, m.Pid, m.Message)
return fmt.Sprintf("%s %s %s%s\n", ts, m.Host, m.Tag, m.Message)
}
func logstashIndexes(from, to time.Time) []string {
func getIndexes(from, to time.Time) []string {
var idxs []string
t := time.Date(from.Year(), from.Month(), from.Day(), 0, 0, 0, 0, time.UTC)
idxFmt := fmt.Sprintf("%s-2006.01.02", *indexBase)
for t.Before(to) {
idxs = append(idxs, t.Format("logstash-2006.01.02"))
idxs = append(idxs, t.Format(idxFmt))
t = t.AddDate(0, 0, 1)
}
return idxs
}
func makeQuery(q, facility string, from, to time.Time) (elastic.Query, []string) {
func makeQuery(q string, from, to time.Time) (elastic.Query, []string) {
queries := []elastic.Query{
elastic.NewMatchQuery("type", "syslog"),
elastic.NewMatchQuery("_type", *objType),
elastic.NewRangeQuery("@timestamp").From(from).To(to),
}
if q != "" {
queries = append(queries, elastic.NewQueryStringQuery(q))
}
if facility != "" {
queries = append(queries, elastic.NewMatchQuery("syslog_facility", facility))
}
query := elastic.NewBoolQuery().Must(queries...)
indexes := logstashIndexes(from, to)
indexes := getIndexes(from, to)
return query, indexes
}
func runQuery(client *elastic.Client, query elastic.Query, indexes []string, w io.Writer) error {
c := make(chan json.RawMessage, 10000)
sc := make(chan string, 10000)
jsonCh := newBatchChannel(1000)
outCh := newBatchChannel(1000)
var wg sync.WaitGroup
// Start workers, reading from jsonCh and sending output to outCh.
for i := 0; i < runtime.NumCPU(); i++ {
wg.Add(1)
w := outCh.Sender()
go func() {
defer w.Close()
defer wg.Done()
jsonCh.Foreach(func(obj interface{}) {
var m logMessage
if err := json.Unmarshal(obj.(json.RawMessage), &m); err != nil {
log.Printf("error decoding result")
return
}
// Empty message logs are probably structured logs, skip them.
if m.Message != "" {
w.Push(m.format())
}
})
}()
}
// Run the query, process the ES results and dispatch them to the jsonCh.
jsonW := jsonCh.Sender()
go func() {
defer close(c)
// Ordering is important here: close the jsonCh sender first.
defer jsonCh.Close()
defer jsonW.Close()
scroll := client.Scroll(indexes...).
Query(query).
Size(*batchSize).
......@@ -116,68 +182,91 @@ func runQuery(client *elastic.Client, query elastic.Query, indexes []string, w i
return
}
for _, hit := range result.Hits.Hits {
c <- *hit.Source
jsonW.Push(*hit.Source)
}
}
}()
for i := 0; i < runtime.NumCPU(); i++ {
wg.Add(1)
go func() {
defer wg.Done()
for hit := range c {
var m logMessage
if err := json.Unmarshal(hit, &m); err != nil {
log.Printf("error decoding result")
continue
}
sc <- m.format()
}
}()
}
// Synchronize.
go func() {
wg.Wait()
close(sc)
outCh.Close()
}()
for s := range sc {
io.WriteString(w, s)
}
outCh.Foreach(func(obj interface{}) {
io.WriteString(w, obj.(string)) // nolint
})
return nil
}
func main() {
log.SetFlags(0)
flag.Parse()
type batchChannel struct {
ch chan []interface{}
sz int
}
now := time.Now()
if *toDate == "" {
*toDate = now.Format(time.RFC3339)
if *fromDate == "" {
*fromDate = now.Add(-1 * time.Hour).Format(time.RFC3339)
func newBatchChannel(sz int) *batchChannel {
return &batchChannel{
sz: sz,
ch: make(chan []interface{}, 100),
}
}
func (b *batchChannel) Close() {
close(b.ch)
}
func (b *batchChannel) Foreach(f func(interface{})) {
for buf := range b.ch {
for _, obj := range buf {
f(obj)
}
}
}
from, err := parseDate(*fromDate)
if err != nil {
log.Fatalf("error in --from: %v", err)
func (b *batchChannel) Sender() *batchSender {
return &batchSender{
ch: b.ch,
sz: b.sz,
wbuf: make([]interface{}, 0, b.sz),
}
to, err := parseDate(*toDate)
if err != nil {
log.Fatalf("error in --to: %v", err)
}
type batchSender struct {
ch chan []interface{}
wbuf []interface{}
sz int
}
func (s *batchSender) Push(obj interface{}) {
s.wbuf = append(s.wbuf, obj)
if len(s.wbuf) >= s.sz {
s.ch <- s.wbuf
s.wbuf = make([]interface{}, 0, s.sz)
}
if to.Before(from) {
log.Fatal("--to comes before --from")
}
func (s *batchSender) Close() {
if len(s.wbuf) > 0 {
s.ch <- s.wbuf
}
}
func main() {
log.SetFlags(0)
flag.Parse()
from, to, err := parseTimeRange(*fromDate, *toDate)
if err != nil {
log.Fatalf("invalid time range: %v", err)
}
query, indexes := makeQuery(strings.Join(flag.Args(), " "), *syslogFacility, from, to)
query, indexes := makeQuery(strings.Join(flag.Args(), " "), from, to)
if *doDumpQuery {
src, _ := query.Source()
b, _ := json.MarshalIndent(src, "", " ")
log.Printf("indexes: %v", indexes)
log.Printf("query:\n%s", string(b))
src, _ := query.Source() // nolint
b, _ := json.MarshalIndent(src, "", " ") // nolint
fmt.Printf("indexes: %v\n", indexes)
fmt.Printf("query:\n%s\n", string(b))
return
}
......@@ -189,5 +278,5 @@ func main() {
if err := runQuery(client, query, indexes, w); err != nil {
log.Fatal(err)
}
w.Flush()
w.Flush() // nolint
}
Copyright (c) 2009 The Go Authors. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Google Inc. nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Additional IP Rights Grant (Patents)
"This implementation" means the copyrightable works distributed by
Google as part of the Go project.
Google hereby grants to You a perpetual, worldwide, non-exclusive,
no-charge, royalty-free, irrevocable (except as stated in this section)
patent license to make, have made, use, offer to sell, sell, import,
transfer and otherwise run, modify and propagate the contents of this
implementation of Go, where such license applies only to those patent
claims, both currently owned or controlled by Google and acquired in
the future, licensable by Google that are necessarily infringed by this
implementation of Go. This grant does not include claims that would be
infringed only as a consequence of further modification of this
implementation. If you or your agent or exclusive licensee institute or
order or agree to the institution of patent litigation against any
entity (including a cross-claim or counterclaim in a lawsuit) alleging
that this implementation of Go or any code incorporated within this
implementation of Go constitutes direct or contributory patent
infringement, or inducement of patent infringement, then any patent
rights granted to you under this License for this implementation of Go
shall terminate as of the date such litigation is filed.
// Copyright 2014 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package context defines the Context type, which carries deadlines,
// cancelation signals, and other request-scoped values across API boundaries
// and between processes.
//
// Incoming requests to a server should create a Context, and outgoing calls to
// servers should accept a Context. The chain of function calls between must
// propagate the Context, optionally replacing it with a modified copy created
// using WithDeadline, WithTimeout, WithCancel, or WithValue.
//
// Programs that use Contexts should follow these rules to keep interfaces
// consistent across packages and enable static analysis tools to check context
// propagation:
//
// Do not store Contexts inside a struct type; instead, pass a Context
// explicitly to each function that needs it. The Context should be the first
// parameter, typically named ctx:
//
// func DoSomething(ctx context.Context, arg Arg) error {
// // ... use ctx ...
// }
//
// Do not pass a nil Context, even if a function permits it. Pass context.TODO
// if you are unsure about which Context to use.
//
// Use context Values only for request-scoped data that transits processes and
// APIs, not for passing optional parameters to functions.
//
// The same Context may be passed to functions running in different goroutines;
// Contexts are safe for simultaneous use by multiple goroutines.
//
// See http://blog.golang.org/context for example code for a server that uses
// Contexts.
package context // import "golang.org/x/net/context"
import "time"
// A Context carries a deadline, a cancelation signal, and other values across
// API boundaries.
//
// Context's methods may be called by multiple goroutines simultaneously.
type Context interface {
// Deadline returns the time when work done on behalf of this context
// should be canceled. Deadline returns ok==false when no deadline is
// set. Successive calls to Deadline return the same results.
Deadline() (deadline time.Time, ok bool)
// Done returns a channel that's closed when work done on behalf of this
// context should be canceled. Done may return nil if this context can
// never be canceled. Successive calls to Done return the same value.
//
// WithCancel arranges for Done to be closed when cancel is called;
// WithDeadline arranges for Done to be closed when the deadline
// expires; WithTimeout arranges for Done to be closed when the timeout
// elapses.
//
// Done is provided for use in select statements:
//
// // Stream generates values with DoSomething and sends them to out
// // until DoSomething returns an error or ctx.Done is closed.
// func Stream(ctx context.Context, out chan<- Value) error {
// for {
// v, err := DoSomething(ctx)
// if err != nil {
// return err
// }
// select {
// case <-ctx.Done():
// return ctx.Err()
// case out <- v:
// }
// }
// }
//
// See http://blog.golang.org/pipelines for more examples of how to use
// a Done channel for cancelation.
Done() <-chan struct{}
// Err returns a non-nil error value after Done is closed. Err returns
// Canceled if the context was canceled or DeadlineExceeded if the
// context's deadline passed. No other values for Err are defined.
// After Done is closed, successive calls to Err return the same value.
Err() error
// Value returns the value associated with this context for key, or nil
// if no value is associated with key. Successive calls to Value with
// the same key returns the same result.
//
// Use context values only for request-scoped data that transits
// processes and API boundaries, not for passing optional parameters to
// functions.
//
// A key identifies a specific value in a Context. Functions that wish
// to store values in Context typically allocate a key in a global
// variable then use that key as the argument to context.WithValue and
// Context.Value. A key can be any type that supports equality;
// packages should define keys as an unexported type to avoid
// collisions.
//
// Packages that define a Context key should provide type-safe accessors
// for the values stores using that key:
//
// // Package user defines a User type that's stored in Contexts.
// package user
//
// import "golang.org/x/net/context"
//
// // User is the type of value stored in the Contexts.
// type User struct {...}
//
// // key is an unexported type for keys defined in this package.
// // This prevents collisions with keys defined in other packages.
// type key int
//
// // userKey is the key for user.User values in Contexts. It is
// // unexported; clients use user.NewContext and user.FromContext
// // instead of using this key directly.
// var userKey key = 0
//
// // NewContext returns a new Context that carries value u.
// func NewContext(ctx context.Context, u *User) context.Context {
// return context.WithValue(ctx, userKey, u)
// }
//
// // FromContext returns the User value stored in ctx, if any.
// func FromContext(ctx context.Context) (*User, bool) {
// u, ok := ctx.Value(userKey).(*User)
// return u, ok
// }
Value(key interface{}) interface{}
}
// Background returns a non-nil, empty Context. It is never canceled, has no
// values, and has no deadline. It is typically used by the main function,
// initialization, and tests, and as the top-level Context for incoming
// requests.
func Background() Context {
return background
}
// TODO returns a non-nil, empty Context. Code should use context.TODO when
// it's unclear which Context to use or it is not yet available (because the
// surrounding function has not yet been extended to accept a Context
// parameter). TODO is recognized by static analysis tools that determine
// whether Contexts are propagated correctly in a program.
func TODO() Context {
return todo
}
// A CancelFunc tells an operation to abandon its work.
// A CancelFunc does not wait for the work to stop.
// After the first call, subsequent calls to a CancelFunc do nothing.
type CancelFunc func()
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build go1.7
// Package ctxhttp provides helper functions for performing context-aware HTTP requests.
package ctxhttp // import "golang.org/x/net/context/ctxhttp"
import (
"io"
"net/http"
"net/url"
"strings"
"golang.org/x/net/context"
)
// Do sends an HTTP request with the provided http.Client and returns
// an HTTP response.
//
// If the client is nil, http.DefaultClient is used.
//
// The provided ctx must be non-nil. If it is canceled or times out,
// ctx.Err() will be returned.
func Do(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error) {
if client == nil {
client = http.DefaultClient
}
resp, err := client.Do(req.WithContext(ctx))
// If we got an error, and the context has been canceled,
// the context's error is probably more useful.
if err != nil {
select {
case <-ctx.Done():
err = ctx.Err()
default:
}
}
return resp, err
}
// Get issues a GET request via the Do function.
func Get(ctx context.Context, client *http.Client, url string) (*http.Response, error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
return Do(ctx, client, req)
}
// Head issues a HEAD request via the Do function.
func Head(ctx context.Context, client *http.Client, url string) (*http.Response, error) {
req, err := http.NewRequest("HEAD", url, nil)
if err != nil {
return nil, err
}
return Do(ctx, client, req)
}
// Post issues a POST request via the Do function.
func Post(ctx context.Context, client *http.Client, url string, bodyType string, body io.Reader) (*http.Response, error) {
req, err := http.NewRequest("POST", url, body)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", bodyType)
return Do(ctx, client, req)
}