Skip to content
Snippets Groups Projects
Verified Commit 180b3d6a authored by blallo's avatar blallo
Browse files

Init

parents
No related branches found
No related tags found
No related merge requests found
go.mod 0 → 100644
module git.autistici.org/blallo/justsendit
go 1.18
require (
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/nxadm/tail v1.4.8 // indirect
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
)
main.go 0 → 100644
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"net/url"
"os"
"path/filepath"
"strings"
"time"
"github.com/nxadm/tail"
)
const (
JSONContent = "application/json"
defaultBufLen = 10
defaultBufMaxSize = 16 * (1 << 20)
)
var (
defaultTimeout = 30 * time.Second
filePath = flag.String("filepath", "", "Path to file to listen to")
destinations = flag.String("destinations", "", "Comma-separated list of elasticsearch targets")
index = flag.String("index", "", "The elasticsearch index to send the data to")
bufLen = flag.Int("buf-len", defaultBufLen, "Internal buffer length")
bufSize = flag.Int("buf-max-size", int(defaultBufMaxSize), "Internal buffer max size")
)
type ErrSendFailed struct {
statusCode int
destination string
body io.Reader
}
func (e *ErrSendFailed) Error() string {
body, err := ioutil.ReadAll(e.body)
if err != nil {
return fmt.Sprintf("[%s] failed with: %d", e.destination, e.statusCode)
}
return fmt.Sprintf("[%s] failed (code %d): %s", e.destination, e.statusCode, string(body))
}
type sendErrors struct {
errors []error
}
func (e *sendErrors) add(err error) {
e.errors = append(e.errors, err)
}
func (e *sendErrors) isEmpty() bool {
return len(e.errors) == 0
}
func (e *sendErrors) Error() string {
var errs []string
for _, err := range e.errors {
errs = append(errs, err.Error())
}
return strings.Join(errs, " | ")
}
func getTimestamp() string {
return time.Now().Format(time.RFC3339)
}
type client struct {
index string
destinations []*url.URL
clients []http.Client
errors chan error
}
func newClient(index string, destinations ...*url.URL) *client {
errors := make(chan error, len(destinations))
c := client{errors: errors, destinations: destinations, index: index}
for range destinations {
c.clients = append(c.clients, http.Client{Timeout: defaultTimeout})
}
c.formatDestinations()
return &c
}
func (c *client) formatDestinations() {
for _, dst := range c.destinations {
dst.Path = filepath.Join(dst.Path, c.index, "_bulk")
}
}
func (c *client) send(ctx context.Context, body string) error {
for i, dst := range c.destinations {
go func(i int, dst *url.URL) {
bodyReader := strings.NewReader(body)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, dst.String(), bodyReader)
if err != nil {
c.errors <- err
return
}
req.Header["Content-Type"] = []string{JSONContent}
resp, err := c.clients[i].Do(req)
if resp.StatusCode != http.StatusOK {
c.errors <- &ErrSendFailed{statusCode: resp.StatusCode, destination: dst.String(), body: resp.Body}
return
}
c.errors <- nil
}(i, dst)
}
errCount := 0
errors := &sendErrors{}
for err := range c.errors {
errCount++
if err != nil {
errors.add(err)
}
if errCount == len(c.destinations) {
break
}
}
if errors.isEmpty() {
return nil
}
return errors
}
type forwarder struct {
client *client
filepath string
tail *tail.Tail
buffer []string
bufLen int64
bufMaxSize int64
bufCurLen int64
bufCurSize int64
}
func newForwarder(filepath, index string, bufLen, bufMaxSize int64, destinations ...*url.URL) (*forwarder, error) {
t, err := tail.TailFile(filepath, tail.Config{
Follow: true,
Location: &tail.SeekInfo{Offset: 0, Whence: io.SeekEnd},
})
if err != nil {
return nil, err
}
client := newClient(index, destinations...)
return &forwarder{
client: client,
filepath: filepath,
tail: t,
buffer: make([]string, 0, bufLen),
bufLen: bufLen,
bufMaxSize: bufMaxSize,
}, nil
}
func (f *forwarder) send(ctx context.Context) error {
tmp := []string{}
head := fmt.Sprintf("{\"index\":{\"_index\":\"%s\"}}", f.client.index)
for _, line := range f.buffer {
tmp = append(tmp, head, line)
}
payload := strings.Join(tmp, "\n") + "\n"
log.Println("Sending:", payload)
err := f.client.send(ctx, payload)
if err != nil {
return err
}
f.buffer = make([]string, 0, f.bufLen)
f.bufCurLen = 0
f.bufCurSize = 0
return nil
}
func (f *forwarder) enqueue(ctx context.Context, line string) error {
log.Println("Enqueueing:", line)
var lineJSON map[string]interface{}
if err := json.Unmarshal([]byte(line), &lineJSON); err != nil {
return err
}
lineJSON["@timestamp"] = getTimestamp()
lineB, err := json.Marshal(lineJSON)
if err != nil {
return err
}
f.buffer = append(f.buffer, string(lineB))
f.bufCurLen++
f.bufCurSize += int64(len(line))
if f.bufCurLen == f.bufLen {
return f.send(ctx)
}
if f.bufCurSize >= f.bufMaxSize {
return f.send(ctx)
}
return nil
}
func (f *forwarder) run(ctx context.Context) error {
for {
select {
case line := <-f.tail.Lines:
if line.Err != nil {
return line.Err
}
if err := f.enqueue(ctx, line.Text); err != nil {
return err
}
case <-ctx.Done():
return ctx.Err()
}
}
}
func parseDestinations(destinations string) ([]*url.URL, error) {
var results []*url.URL
for _, dst := range strings.Split(destinations, ",") {
u, err := url.Parse(dst)
if err != nil {
return nil, err
}
results = append(results, u)
}
return results, nil
}
func main() {
flag.Parse()
if *filePath == "" {
fmt.Fprintln(os.Stderr, "Missing -filepath")
os.Exit(1)
}
if *index == "" {
fmt.Fprintln(os.Stderr, "Missing -index")
os.Exit(1)
}
if *destinations == "" {
fmt.Fprintln(os.Stderr, "Missing -destinations")
os.Exit(1)
}
log.Println("filePath:", *filePath)
log.Println("index:", *index)
log.Println("destinations:", *destinations)
log.Println("bufLen:", *bufLen)
log.Println("bufSize:", *bufSize)
parsed, err := parseDestinations(*destinations)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed parsing destinations: %s\n", err)
os.Exit(2)
}
f, err := newForwarder(*filePath, *index, int64(*bufLen), int64(*bufSize), parsed...)
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(-2)
}
ctx := context.Background()
if err := f.run(ctx); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(-1)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment