Skip to content
Snippets Groups Projects
Unverified Commit bddfb2a8 authored by Antoine Leroyer's avatar Antoine Leroyer Committed by GitHub
Browse files

Merge pull request #5 from filippog/omfwd

Add omfwd and mmkubernetes support
parents d721280f 37df97c6
No related branches found
No related tags found
No related merge requests found
......@@ -22,6 +22,8 @@ const (
rsyslogDynStat
rsyslogDynafileCache
rsyslogInputIMDUP
rsyslogForward
rsyslogKubernetes
)
type rsyslogExporter struct {
......@@ -112,6 +114,22 @@ func (re *rsyslogExporter) handleStatLine(rawbuf []byte) error {
for _, p := range d.toPoints() {
re.set(p)
}
case rsyslogForward:
f, err := newForwardFromJSON(buf)
if err != nil {
return err
}
for _, p := range f.toPoints() {
re.set(p)
}
case rsyslogKubernetes:
k, err := newKubernetesFromJSON(buf)
if err != nil {
return err
}
for _, p := range k.toPoints() {
re.set(p)
}
default:
return fmt.Errorf("unknown pstat type: %v", pstatType)
......
package main
import (
"encoding/json"
"fmt"
)
type forward struct {
Name string `json:"name"`
BytesSent int64 `json:"bytes.sent"`
}
func newForwardFromJSON(b []byte) (*forward, error) {
var pstat forward
err := json.Unmarshal(b, &pstat)
if err != nil {
return nil, fmt.Errorf("failed to decode forward stat `%v`: %v", string(b), err)
}
return &pstat, nil
}
func (f *forward) toPoints() []*point {
points := make([]*point, 1)
points[0] = &point{
Name: "forward_bytes_total",
Type: counter,
Value: f.BytesSent,
Description: "bytes forwarded to destination",
LabelName: "destination",
LabelValue: f.Name,
}
return points
}
package main
import "testing"
var (
forwardLog = []byte(`{ "name": "TCP-FQDN-6514", "origin": "omfwd", "bytes.sent": 666 }`)
)
func TestNewForwardFromJSON(t *testing.T) {
logType := getStatType(forwardLog)
if logType != rsyslogForward {
t.Errorf("detected pstat type should be %d but is %d", rsyslogForward, logType)
}
pstat, err := newForwardFromJSON([]byte(forwardLog))
if err != nil {
t.Fatalf("expected parsing action not to fail, got: %v", err)
}
if want, got := "TCP-FQDN-6514", pstat.Name; want != got {
t.Errorf("wanted '%s', got '%s'", want, got)
}
if want, got := int64(666), pstat.BytesSent; want != got {
t.Errorf("wanted '%d', got '%d'", want, got)
}
}
func TestForwardToPoints(t *testing.T) {
pstat, err := newForwardFromJSON([]byte(forwardLog))
if err != nil {
t.Fatalf("expected parsing action not to fail, got: %v", err)
}
points := pstat.toPoints()
point := points[0]
if want, got := "forward_bytes_total", point.Name; want != got {
t.Errorf("wanted '%s', got '%s'", want, got)
}
if want, got := int64(666), point.Value; want != got {
t.Errorf("wanted '%d', got '%d'", want, got)
}
if want, got := counter, point.Type; want != got {
t.Errorf("wanted '%d', got '%d'", want, got)
}
if want, got := "TCP-FQDN-6514", point.LabelValue; want != got {
t.Errorf("wanted '%s', got '%s'", want, got)
}
}
package main
import (
"encoding/json"
"fmt"
"regexp"
)
var (
apiNameRegexp = regexp.MustCompile(`mmkubernetes\((\S+)\)`)
)
type kubernetes struct {
Name string `json:"name"`
Url string
RecordSeen int64 `json:"recordseen"`
NamespaceMetaSuccess int64 `json:"namespacemetadatasuccess"`
NamespaceMetaNotFound int64 `json:"namespacemetadatanotfound"`
NamespaceMetaBusy int64 `json:"namespacemetadatabusy"`
NamespaceMetaError int64 `json:"namespacemetadataerror"`
PodMetaSuccess int64 `json:"podmetadatasuccess"`
PodMetaNotFound int64 `json:"podmetadatanotfound"`
PodMetaBusy int64 `json:"podmetadatabusy"`
PodMetaError int64 `json:"podmetadataerror"`
}
func newKubernetesFromJSON(b []byte) (*kubernetes, error) {
var pstat kubernetes
err := json.Unmarshal(b, &pstat)
if err != nil {
return nil, fmt.Errorf("failed to decode kubernetes stat `%v`: %v", string(b), err)
}
matches := apiNameRegexp.FindSubmatch([]byte(pstat.Name))
if matches != nil {
pstat.Url = string(matches[1])
}
return &pstat, nil
}
func (k *kubernetes) toPoints() []*point {
points := make([]*point, 9)
points[0] = &point{
Name: "kubernetes_namespace_metadata_success_total",
Type: counter,
Value: k.NamespaceMetaSuccess,
Description: "successful fetches of namespace metadata",
LabelName: "url",
LabelValue: k.Url,
}
points[1] = &point{
Name: "kubernetes_namespace_metadata_notfound_total",
Type: counter,
Value: k.NamespaceMetaNotFound,
Description: "notfound fetches of namespace metadata",
LabelName: "url",
LabelValue: k.Url,
}
points[2] = &point{
Name: "kubernetes_namespace_metadata_busy_total",
Type: counter,
Value: k.NamespaceMetaBusy,
Description: "busy fetches of namespace metadata",
LabelName: "url",
LabelValue: k.Url,
}
points[3] = &point{
Name: "kubernetes_namespace_metadata_error_total",
Type: counter,
Value: k.NamespaceMetaError,
Description: "error fetches of namespace metadata",
LabelName: "url",
LabelValue: k.Url,
}
points[4] = &point{
Name: "kubernetes_pod_metadata_success_total",
Type: counter,
Value: k.PodMetaSuccess,
Description: "successful fetches of pod metadata",
LabelName: "url",
LabelValue: k.Url,
}
points[5] = &point{
Name: "kubernetes_pod_metadata_notfound_total",
Type: counter,
Value: k.PodMetaNotFound,
Description: "notfound fetches of pod metadata",
LabelName: "url",
LabelValue: k.Url,
}
points[6] = &point{
Name: "kubernetes_pod_metadata_busy_total",
Type: counter,
Value: k.PodMetaBusy,
Description: "busy fetches of pod metadata",
LabelName: "url",
LabelValue: k.Url,
}
points[7] = &point{
Name: "kubernetes_pod_metadata_error_total",
Type: counter,
Value: k.PodMetaError,
Description: "error fetches of pod metadata",
LabelName: "url",
LabelValue: k.Url,
}
points[8] = &point{
Name: "kubernetes_record_seen_total",
Type: counter,
Value: k.RecordSeen,
Description: "records fetched from the api",
LabelName: "url",
LabelValue: k.Url,
}
return points
}
package main
import "testing"
var (
kubernetesLog = []byte(`{ "name": "mmkubernetes(https://host.domain.tld:6443)", "origin": "mmkubernetes", "recordseen": 477943, "namespacemetadatasuccess": 7, "namespacemetadatanotfound": 0, "namespacemetadatabusy": 0, "namespacemetadataerror": 0, "podmetadatasuccess": 26, "podmetadatanotfound": 0, "podmetadatabusy": 0, "podmetadataerror": 0 }`)
)
func TestNewKubernetesFromJSON(t *testing.T) {
logType := getStatType(kubernetesLog)
if logType != rsyslogKubernetes {
t.Errorf("detected pstat type should be %d but is %d", rsyslogKubernetes, logType)
}
pstat, err := newKubernetesFromJSON([]byte(kubernetesLog))
if err != nil {
t.Fatalf("expected parsing action not to fail, got: %v", err)
}
if want, got := "mmkubernetes(https://host.domain.tld:6443)", pstat.Name; want != got {
t.Errorf("wanted '%s', got '%s'", want, got)
}
if want, got := "https://host.domain.tld:6443", pstat.Url; want != got {
t.Errorf("wanted '%s', got '%s'", want, got)
}
if want, got := int64(477943), pstat.RecordSeen; want != got {
t.Errorf("wanted '%d', got '%d'", want, got)
}
if want, got := int64(7), pstat.NamespaceMetaSuccess; want != got {
t.Errorf("wanted '%d', got '%d'", want, got)
}
if want, got := int64(0), pstat.NamespaceMetaNotFound; want != got {
t.Errorf("wanted '%d', got '%d'", want, got)
}
if want, got := int64(0), pstat.NamespaceMetaBusy; want != got {
t.Errorf("wanted '%d', got '%d'", want, got)
}
if want, got := int64(0), pstat.NamespaceMetaError; want != got {
t.Errorf("wanted '%d', got '%d'", want, got)
}
if want, got := int64(26), pstat.PodMetaSuccess; want != got {
t.Errorf("wanted '%d', got '%d'", want, got)
}
if want, got := int64(0), pstat.PodMetaNotFound; want != got {
t.Errorf("wanted '%d', got '%d'", want, got)
}
if want, got := int64(0), pstat.PodMetaBusy; want != got {
t.Errorf("wanted '%d', got '%d'", want, got)
}
if want, got := int64(0), pstat.PodMetaError; want != got {
t.Errorf("wanted '%d', got '%d'", want, got)
}
}
func TestKubernetesToPoints(t *testing.T) {
pstat, err := newKubernetesFromJSON([]byte(kubernetesLog))
if err != nil {
t.Fatalf("expected parsing action not to fail, got: %v", err)
}
points := pstat.toPoints()
point := points[0]
if want, got := "kubernetes_namespace_metadata_success_total", point.Name; want != got {
t.Errorf("wanted '%s', got '%s'", want, got)
}
if want, got := "https://host.domain.tld:6443", point.LabelValue; want != got {
t.Errorf("wanted '%s', got '%s'", want, got)
}
point = points[1]
if want, got := "kubernetes_namespace_metadata_notfound_total", point.Name; want != got {
t.Errorf("wanted '%s', got '%s'", want, got)
}
point = points[2]
if want, got := "kubernetes_namespace_metadata_busy_total", point.Name; want != got {
t.Errorf("wanted '%s', got '%s'", want, got)
}
point = points[3]
if want, got := "kubernetes_namespace_metadata_error_total", point.Name; want != got {
t.Errorf("wanted '%s', got '%s'", want, got)
}
point = points[4]
if want, got := "kubernetes_pod_metadata_success_total", point.Name; want != got {
t.Errorf("wanted '%s', got '%s'", want, got)
}
point = points[5]
if want, got := "kubernetes_pod_metadata_notfound_total", point.Name; want != got {
t.Errorf("wanted '%s', got '%s'", want, got)
}
point = points[6]
if want, got := "kubernetes_pod_metadata_busy_total", point.Name; want != got {
t.Errorf("wanted '%s', got '%s'", want, got)
}
point = points[7]
if want, got := "kubernetes_pod_metadata_error_total", point.Name; want != got {
t.Errorf("wanted '%s', got '%s'", want, got)
}
point = points[8]
if want, got := "kubernetes_record_seen_total", point.Name; want != got {
t.Errorf("wanted '%s', got '%s'", want, got)
}
}
......@@ -18,6 +18,10 @@ func getStatType(buf []byte) rsyslogType {
return rsyslogDynStat
} else if strings.Contains(line, "dynafile cache") {
return rsyslogDynafileCache
} else if strings.Contains(line, "omfwd") {
return rsyslogForward
} else if strings.Contains(line, "mmkubernetes") {
return rsyslogKubernetes
}
return rsyslogUnknown
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment