Skip to content
Snippets Groups Projects
Commit 07c93aaf authored by ale's avatar ale
Browse files

use strings for message timestamps

parent 7d422064
No related branches found
No related tags found
No related merge requests found
...@@ -11,7 +11,7 @@ import ( ...@@ -11,7 +11,7 @@ import (
type Message map[string]interface{} type Message map[string]interface{}
var mandatoryAttributes = []string{ var mandatoryAttributes = []string{
"user", "message", "stamp", "user", "message", "timestamp",
} }
func (m Message) IsValid() bool { func (m Message) IsValid() bool {
...@@ -30,15 +30,9 @@ func (m Message) GetString(key string) string { ...@@ -30,15 +30,9 @@ func (m Message) GetString(key string) string {
return "" return ""
} }
func (m Message) GetInt(key string) int64 {
if value, ok := m[key]; ok {
return int64(value.(float64))
}
return 0
}
func (m Message) Stamp() time.Time { func (m Message) Stamp() time.Time {
return time.Unix(m.GetInt("stamp"), 0) t, _ := time.Parse(m.GetString("timestamp"), time.RFC3339)
return t
} }
func (m Message) ToJSON() []byte { func (m Message) ToJSON() []byte {
......
...@@ -6,7 +6,7 @@ import ( ...@@ -6,7 +6,7 @@ import (
var okTestData = ` var okTestData = `
{ {
"stamp": 1394393566, "timestamp": "2009-11-10T23:00:00Z",
"user": "test", "user": "test",
"message": "sample message" "message": "sample message"
} }
......
...@@ -31,6 +31,9 @@ import ( ...@@ -31,6 +31,9 @@ import (
// and store the message id under index/<key>/<value>/<id> (yes, the // and store the message id under index/<key>/<value>/<id> (yes, the
// id appears twice, it's part of the key just to ensure uniqueness). // id appears twice, it's part of the key just to ensure uniqueness).
// //
// Keys and values can be anything as long as they don't contain a
// null byte (which is used internally as the primary key separator).
//
type DB struct { type DB struct {
leveldb *levigo.DB leveldb *levigo.DB
cache *levigo.Cache cache *levigo.Cache
...@@ -80,7 +83,8 @@ func NewDB(path string, dbopts *DBOptions) *DB { ...@@ -80,7 +83,8 @@ func NewDB(path string, dbopts *DBOptions) *DB {
// Exclude some keys by default. // Exclude some keys by default.
db.excludedKeys["message"] = struct{}{} db.excludedKeys["message"] = struct{}{}
db.excludedKeys["stamp"] = struct{}{} db.excludedKeys["stamp"] = struct{}{} // old field for timestamp
db.excludedKeys["timestamp"] = struct{}{}
for _, key := range dbopts.ExcludedKeys { for _, key := range dbopts.ExcludedKeys {
db.excludedKeys[key] = struct{}{} db.excludedKeys[key] = struct{}{}
} }
...@@ -99,7 +103,7 @@ func (db *DB) isExcluded(key string) bool { ...@@ -99,7 +103,7 @@ func (db *DB) isExcluded(key string) bool {
return ok return ok
} }
var keySeparator = byte('|') var keySeparator = byte(0)
func makeKey(prefix string, parts ...[]byte) []byte { func makeKey(prefix string, parts ...[]byte) []byte {
allParts := make([][]byte, 0, len(parts)+1) allParts := make([][]byte, 0, len(parts)+1)
...@@ -305,7 +309,7 @@ func (db *DB) Query(q map[string]string) ([]audit.Message, error) { ...@@ -305,7 +309,7 @@ func (db *DB) Query(q map[string]string) ([]audit.Message, error) {
} }
} }
if len(errors) > 0 { if len(errors) > 0 {
return nil, fmt.Errorf("Query errors: %s", strings.Join(errors, "; ")) return nil, fmt.Errorf("query errors: %s", strings.Join(errors, "; "))
} }
// Retrieve rows from the key set. // Retrieve rows from the key set.
......
...@@ -19,6 +19,17 @@ func NewHttpServer(db *DB) *HttpServer { ...@@ -19,6 +19,17 @@ func NewHttpServer(db *DB) *HttpServer {
return &HttpServer{database: db} return &HttpServer{database: db}
} }
func remoteHost(r *http.Request) string {
if host, _, err := net.SplitHostPort(r.RemoteAddr); err != nil {
return host
}
return r.RemoteAddr
}
func dumpMsg(msg audit.Message) string {
return strings.TrimRight(string(msg.ToJSON()), "\n")
}
// Write handler: the input data is already a JSON message. // Write handler: the input data is already a JSON message.
func (h *HttpServer) writeHandler(w http.ResponseWriter, r *http.Request) { func (h *HttpServer) writeHandler(w http.ResponseWriter, r *http.Request) {
if r.Header.Get("Content-Type") != "application/json" { if r.Header.Get("Content-Type") != "application/json" {
...@@ -32,18 +43,22 @@ func (h *HttpServer) writeHandler(w http.ResponseWriter, r *http.Request) { ...@@ -32,18 +43,22 @@ func (h *HttpServer) writeHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
// Log all received messages, even if there is an error later.
log.Printf("%s: received message: %s", remoteHost(r), dumpMsg(msg))
if !msg.IsValid() {
log.Printf("%s: invalid message", remoteHost(r))
http.Error(w, "Invalid Message", http.StatusBadRequest)
return
}
if err := h.database.Write(msg); err != nil { if err := h.database.Write(msg); err != nil {
log.Printf("%s: error writing to database: %v", remoteHost(r), err)
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
// Request was successful, log it and return empty 200 OK reply. // Request was successful, return an empty 200 OK reply.
var hostStr string
if host, _, err := net.SplitHostPort(r.RemoteAddr); err != nil {
hostStr = host
}
logText := strings.TrimRight(string(msg.ToJSON()), "\n")
log.Printf("%s: received message: %s", hostStr, logText)
w.WriteHeader(200) w.WriteHeader(200)
} }
......
package audit package audit
import ( import (
"bytes"
"encoding/binary" "encoding/binary"
"math/rand" "math/rand"
"time" "time"
...@@ -17,12 +16,13 @@ func init() { ...@@ -17,12 +16,13 @@ func init() {
// resulting IDs can be sorted lexicographically preserving the time // resulting IDs can be sorted lexicographically preserving the time
// ordering. // ordering.
func NewUniqueId(t time.Time) []byte { func NewUniqueId(t time.Time) []byte {
var b bytes.Buffer var b [16]byte
// The key consists of serialized time + sufficiently large // The key consists of serialized time + sufficiently large
// random number. This should allow a high insertion rate. // random number. This should allow a high insertion rate, and
binary.Write(&b, binary.BigEndian, t.UnixNano()) // hopefully prevent conflicts considering the rand global lock.
binary.Write(&b, binary.BigEndian, rand.Int63()) binary.BigEndian.PutUint64(b[:8], uint64(t.UnixNano()))
binary.BigEndian.PutUint64(b[8:16], uint64(rand.Int63()))
return b.Bytes() return b[:]
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment