diff --git a/README.md b/README.md index e68af91fe9327b5f6d07aae9473430efb0cb0bfd..22eb03595e53f1fb04cff326ad0be24adda7d1af 100644 --- a/README.md +++ b/README.md @@ -113,6 +113,28 @@ The ingestion API endpoint is at `/ingest`, and it expects a POST request with a ND-JSON request body (newline-delimited JSON-encoded records, no additional headers or footers). +#### Nested JSON records + +The *"message"* field is handled specially: in many logging systems +this field might end up containing nested JSON-encoded data, so if +*pqlog* detects this to be the case, it will unmarshal the JSON string +and replace it with the encoded object, e.g.: + +```json +{ + "message": "{\"foo\":\"bar\",\"baz\":\"123\"}" +} +``` + +will result in the following flattened record: + +```json +{ + "message.foo": "bar", + "message.baz": "123" +} +``` + ### Schema unification As it ingests records, *pqlog* learns the data types of the incoming diff --git a/schema/tuples.go b/schema/tuples.go index 6077f089c84d91ca5041d88c5991699e388218a1..676636bad15cf638901e9618f15e633a704a1cdb 100644 --- a/schema/tuples.go +++ b/schema/tuples.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "reflect" + "strings" "time" ) @@ -74,11 +75,33 @@ func flatten(m map[string]any, pfx string, tuples []Tuple, unprocessed []TypeErr return tuples, unprocessed } +// unmarshalRecordRecursively can look into the special 'message' +// field and eventually unmarshal it twice. +func unmarshalRecordRecursively(record []byte) (map[string]any, error) { + var m map[string]any + if err := json.Unmarshal(record, &m); err != nil { + return nil, err + } + + // Try to unmarshal 'message' if it looks like JSON (with optional @cee prefix). + if s, ok := m["message"].(string); ok { + s = strings.TrimPrefix(s, "@cee:") + if strings.HasPrefix(s, "{") { + var m2 map[string]any + if err := json.Unmarshal([]byte(s[5:]), &m2); err == nil { + m["message"] = m2 + } + } + } + + return m, nil +} + // RecordToTuples converts a JSON record to tuples, by flattening the // deserialized map. func RecordToTuples(record []byte) ([]Tuple, []TypeError, error) { - var m map[string]any - if err := json.Unmarshal(record, &m); err != nil { + m, err := unmarshalRecordRecursively(record) + if err != nil { return nil, nil, err }