Commit dc35ba58 authored by ale's avatar ale

add an ipset StateMachine to support log compaction and snapshots; add a "del" command

parent fbc22ef2
......@@ -17,6 +17,7 @@ var (
serverStr = flag.String("server", "localhost:1313", "Comma-separated list of servers")
doCreate = flag.Bool("create", false, "Create a new ipset")
doAdd = flag.Bool("add", false, "Add an entry to an ipset")
doDel = flag.Bool("del", false, "Delete an entry from an ipset")
)
// HTTP client with a connect() timeout.
......@@ -35,6 +36,7 @@ func init() {
}
}
// Client for the ipsetd HTTP interface.
type Client struct {
servers []string
}
......@@ -45,25 +47,80 @@ func NewClient(servers []string) *Client {
}
}
// Permutate list of known servers in random order.
func (c *Client) getServers() []string {
if len(c.servers) < 2 {
return c.servers
// Maintains a re-prioritizable list of servers, iterating until
// they've all been visited once.
type serverList struct {
servers []string
cur string
pos int
}
func (l *serverList) Next() bool {
if l.pos < len(l.servers) {
l.cur = l.servers[l.pos]
l.pos += 1
return true
}
return false
}
// Permutate.
servers := make([]string, len(c.servers))
for idx, oldIdx := range rand.Perm(len(c.servers)) {
servers[idx] = c.servers[oldIdx]
func (l *serverList) Value() string {
return l.cur
}
func (l *serverList) swap(i, j int) {
l.servers[i], l.servers[j] = l.servers[j], l.servers[i]
}
// Push 'server' at the front of the list.
func (l *serverList) Push(server string) {
// Find the index of 'server'.
idx := -1
for i, s := range l.servers {
if s == server {
idx = i
break
}
}
return servers
// Deal with the three possible cases: a new (previously
// unknown) server, one that we've already tried, or one that
// is present later in the list. Unless the server has already
// failed, we will connect to it next.
if idx < 0 {
// Extend the array and swap the next item with the
// new server.
l.servers = append(l.servers, server)
idx = len(l.servers) - 1
} else if idx < l.pos {
// Do nothing.
return
}
// Swap the new server on the next spot in the list.
l.swap(l.pos, idx)
}
func newServerList(servers []string) *serverList {
l := &serverList{
servers: make([]string, len(servers)),
}
// Permutate server list into the new array in random order.
for idx, oldIdx := range rand.Perm(len(servers)) {
l.servers[idx] = servers[oldIdx]
}
return l
}
func (c *Client) doRequest(url string, args url.Values) error {
var outErr error
// Iterate over servers.
for _, server := range c.getServers() {
l := newServerList(c.servers)
for l.Next() {
server := l.Value()
resp, err := httpClient.PostForm(fmt.Sprintf("http://%s%s", server, url), args)
if err != nil {
log.Printf("error for server %s: %s", server, err)
......@@ -71,6 +128,10 @@ func (c *Client) doRequest(url string, args url.Values) error {
continue
}
if resp.StatusCode == 409 {
leader := resp.Header.Get("X-Leader")
if leader != "" {
l.Push(leader)
}
continue
}
if resp.StatusCode != 200 {
......@@ -85,6 +146,13 @@ func (c *Client) doRequest(url string, args url.Values) error {
return outErr
}
func (c *Client) Create(setName, typeName string) error {
values := url.Values{}
values.Set("set", setName)
values.Set("type", typeName)
return c.doRequest("/api/1/ipset/create", values)
}
func (c *Client) Add(setName, entry string) error {
values := url.Values{}
values.Set("set", setName)
......@@ -92,21 +160,26 @@ func (c *Client) Add(setName, entry string) error {
return c.doRequest("/api/1/ipset/add", values)
}
func (c *Client) Create(setName, typeName string) error {
func (c *Client) Del(setName, entry string) error {
values := url.Values{}
values.Set("set", setName)
values.Set("type", typeName)
return c.doRequest("/api/1/ipset/create", values)
values.Set("entry", entry)
return c.doRequest("/api/1/ipset/del", values)
}
func btou(b bool) int {
if b {
return 1
}
return 0
}
func main() {
flag.Parse()
if *doAdd && *doCreate {
log.Fatal("Specify only one of --add and --create")
}
if !*doAdd && !*doCreate {
log.Fatal("Specify one of --add or --create")
numOpts := btou(*doCreate) + btou(*doAdd) + btou(*doDel)
if numOpts != 1 {
log.Fatal("Specify only one of --add, --del and --create")
}
if flag.NArg() < 2 {
log.Fatal("Not enough arguments")
......@@ -117,6 +190,8 @@ func main() {
var err error
if *doAdd {
err = client.Add(flag.Arg(0), flag.Arg(1))
} else if *doDel {
err = client.Del(flag.Arg(0), flag.Arg(1))
} else if *doCreate {
err = client.Create(flag.Arg(0), flag.Arg(1))
}
......
......@@ -26,6 +26,7 @@ func (c *AddCommand) CommandName() string {
func (c *AddCommand) Apply(server raft.Server) (interface{}, error) {
log.Printf("Apply(add, '%s', '%s')", c.SetName, c.Entry)
ipset.Run("add", c.SetName, c.Entry)
ips := server.Context().(*ipset.IPSet)
ips.Run("add", c.SetName, c.Entry)
return nil, nil
}
......@@ -26,6 +26,7 @@ func (c *CreateCommand) CommandName() string {
func (c *CreateCommand) Apply(server raft.Server) (interface{}, error) {
log.Printf("Apply(create, '%s', '%s')", c.SetName, c.TypeName)
ipset.Run("create", c.SetName, c.TypeName)
ips := server.Context().(*ipset.IPSet)
ips.Run("create", c.SetName, c.TypeName)
return nil, nil
}
package command
import (
"log"
"git.autistici.org/ale/ipsetd/ipset"
"github.com/goraft/raft"
)
// Delete an entry in a set.
type DelCommand struct {
SetName string `json:"set"`
Entry string `json:"entry"`
}
func NewDelCommand(setName, entry string) *DelCommand {
return &DelCommand{
SetName: setName,
Entry: entry,
}
}
func (c *DelCommand) CommandName() string {
return "del"
}
func (c *DelCommand) Apply(server raft.Server) (interface{}, error) {
log.Printf("Apply(del, '%s', '%s')", c.SetName, c.Entry)
ips := server.Context().(*ipset.IPSet)
ips.Run("del", c.SetName, c.Entry)
return nil, nil
}
package ipset
import (
"bytes"
"encoding/json"
"os/exec"
)
var ipsetBinary = "/usr/sbin/ipset"
// Interface to 'ipset' on the local system. We keep track of the sets
// that are managed via ipsetd, so that we can know which ones to
// dump/restore. IPSet satisfies the RAFT StateMachine interface.
type IPSet struct {
knownSets map[string]struct{}
}
func NewIPSet() *IPSet {
return &IPSet{
knownSets: make(map[string]struct{}),
}
}
// Run 'ipset' with the specified arguments. It's that simple.
func Run(args... string) error {
return exec.Command("/usr/sbin/ipset", args...).Run()
func (ips *IPSet) Run(args ...string) error {
setName := args[0]
ips.knownSets[setName] = struct{}{}
return exec.Command(ipsetBinary, args...).Run()
}
type ipsetDump struct {
sets map[string][]byte `json:"sets"`
}
// Dump an ipset as a binary blob.
func (ips *IPSet) dumpSet(setName string) ([]byte, error) {
cmd := exec.Command(ipsetBinary, "save", setName)
var out bytes.Buffer
cmd.Stdout = &out
if err := cmd.Run(); err != nil {
return nil, err
}
return out.Bytes(), nil
}
// Create a snapshot of the current ipset state.
func (ips *IPSet) Save() ([]byte, error) {
dump := ipsetDump{
sets: make(map[string][]byte),
}
var err error
for setName, _ := range ips.knownSets {
dump.sets[setName], err = ips.dumpSet(setName)
if err != nil {
return nil, err
}
}
var b bytes.Buffer
json.NewEncoder(&b).Encode(&dump)
return b.Bytes(), nil
}
// Restore an ipset from a binary blob.
func (ips *IPSet) loadSet(setName string, entries []byte) error {
cmd := exec.Command(ipsetBinary, "restore")
cmd.Stdin = bytes.NewReader(entries)
return cmd.Run()
}
// Restore ipsets from a snapshot.
func (ips *IPSet) Recovery(data []byte) error {
var dump ipsetDump
if err := json.NewDecoder(bytes.NewBuffer(data)).Decode(&dump); err != nil {
return err
}
knownSets := make(map[string]struct{})
for setName, entries := range dump.sets {
knownSets[setName] = struct{}{}
if err := ips.loadSet(setName, entries); err != nil {
return err
}
}
ips.knownSets = knownSets
return nil
}
......@@ -14,10 +14,10 @@ import (
var (
debug = flag.Bool("debug", false, "Show debugging messages")
peer = flag.String("peer", "", "host:port of a peer")
path = flag.String("path", "", "Log path")
host = flag.String("host", mustGetHostname(), "Name or IP of this host")
port = flag.Int("port", 1313, "Port to listen on")
peer = flag.String("peer", "", "host:port of a peer")
path = flag.String("path", "", "Log path")
host = flag.String("host", mustGetHostname(), "Name or IP of this host")
port = flag.Int("port", 1313, "Port to listen on")
)
func mustGetHostname() string {
......@@ -44,10 +44,11 @@ func main() {
// Set up known commands.
raft.RegisterCommand(&command.AddCommand{})
raft.RegisterCommand(&command.DelCommand{})
raft.RegisterCommand(&command.CreateCommand{})
os.MkdirAll(*path, 0755)
s := server.New(*path, *host, *port)
s := server.NewServer(*path, *host, *port)
log.Fatal(s.ListenAndServe(*peer))
}
......@@ -16,9 +16,10 @@ import (
"strings"
"sync"
"git.autistici.org/ale/ipsetd/command"
"git.autistici.org/ale/ipsetd/ipset"
"github.com/goraft/raft"
"github.com/gorilla/mux"
"git.autistici.org/ale/ipsetd/command"
)
type Server struct {
......@@ -30,15 +31,20 @@ type Server struct {
raftServer raft.Server
httpServer *http.Server
mutex sync.RWMutex
ipset *ipset.IPSet
}
func New(path, host string, port int) *Server {
// Create a new HTTP server supporting the RAFT protocol at
// 'host:port'. RAFT logs are saved in 'path'. Use a different path
// for every server instance.
func NewServer(path, host string, port int) *Server {
s := &Server{
name: host,
host: host,
port: port,
path: path,
router: mux.NewRouter(),
ipset: ipset.NewIPSet(),
}
// Autogenerate a name for every instance. Save it in a file
......@@ -64,35 +70,36 @@ func (s *Server) ListenAndServe(peer string) error {
// Create the RAFT HTTP transport.
transporter := raft.NewHTTPTransporter("/raft")
s.raftServer, err = raft.NewServer(s.name, s.path, transporter, nil, nil, "")
s.raftServer, err = raft.NewServer(s.name, s.path, transporter, s.ipset, s.ipset, "")
if err != nil {
return err
}
transporter.Install(s.raftServer, s)
s.raftServer.Start()
// Initialize the RAFT cluster.
if peer != "" {
log.Printf("Attempting to join peer: %s", peer)
if !s.raftServer.IsLogEmpty() {
return errors.New("Cannot join new peer with existing log")
}
if err := s.Join(peer); err != nil {
return err
}
} else if s.raftServer.IsLogEmpty() {
log.Println("Initializing new cluster")
_, err := s.raftServer.Do(
&raft.DefaultJoinCommand{
Name: s.raftServer.Name(),
ConnectionString: s.connectionString(),
})
if err != nil {
return err
if s.raftServer.IsLogEmpty() {
if peer != "" {
log.Printf("Attempting to join peer: %s", peer)
if err := s.Join(peer); err != nil {
return err
}
} else {
log.Println("Initializing new cluster")
_, err := s.raftServer.Do(
&raft.DefaultJoinCommand{
Name: s.raftServer.Name(),
ConnectionString: s.connectionString(),
})
if err != nil {
return err
}
}
} else {
if peer != "" {
log.Println("Warning: log is not empty, ignoring --peer option")
}
log.Println("Recovering from log")
}
......@@ -105,6 +112,7 @@ func (s *Server) ListenAndServe(peer string) error {
s.router.HandleFunc("/join", s.joinHandler).Methods("POST")
s.router.HandleFunc("/api/1/ipset/create", s.createHandler).Methods("POST")
s.router.HandleFunc("/api/1/ipset/add", s.addHandler).Methods("POST")
s.router.HandleFunc("/api/1/ipset/del", s.delHandler).Methods("POST")
log.Printf("HTTP server listening at %s", s.connectionString())
......@@ -117,7 +125,8 @@ func (s *Server) HandleFunc(pattern string, handler func(http.ResponseWriter, *h
s.router.HandleFunc(pattern, handler)
}
func (s *Server) Join(leader string) error {
// Join the current node to an existing cluster.
func (s *Server) Join(peer string) error {
command := &raft.DefaultJoinCommand{
Name: s.raftServer.Name(),
ConnectionString: s.connectionString(),
......@@ -125,14 +134,43 @@ func (s *Server) Join(leader string) error {
var b bytes.Buffer
json.NewEncoder(&b).Encode(command)
resp, err := http.Post(fmt.Sprintf("http://%s/join", leader), "application/json", &b)
resp, err := http.Post(fmt.Sprintf("http://%s/join", peer), "application/json", &b)
if err != nil {
return err
}
resp.Body.Close()
defer resp.Body.Close()
// If the peer is not the leader, but a leader hint is
// provided, forward the request to the leader.
if resp.StatusCode == 409 {
leader := resp.Header.Get("X-Leader")
if leader != "" {
lresp, lerr := http.Post(fmt.Sprintf("http://%s/join", leader), "application/json", &b)
if lerr != nil {
return lerr
}
defer lresp.Body.Close()
if lresp.StatusCode != 200 {
return errors.New(lresp.Status)
}
return nil
}
} else if resp.StatusCode != 200 {
return errors.New(resp.Status)
}
return nil
}
func (s *Server) handleError(w http.ResponseWriter, err error) {
if err == raft.NotLeaderError {
// Provide a hint to the client with the current leader.
w.Header().Set("X-Leader", s.raftServer.Leader())
http.Error(w, err.Error(), http.StatusConflict)
} else {
http.Error(w, err.Error(), http.StatusBadRequest)
}
}
func (s *Server) joinHandler(w http.ResponseWriter, r *http.Request) {
command := &raft.DefaultJoinCommand{}
if err := json.NewDecoder(r.Body).Decode(&command); err != nil {
......@@ -140,8 +178,7 @@ func (s *Server) joinHandler(w http.ResponseWriter, r *http.Request) {
return
}
if _, err := s.raftServer.Do(command); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
s.handleError(w, err)
}
}
......@@ -154,11 +191,20 @@ func (s *Server) addHandler(w http.ResponseWriter, r *http.Request) {
}
if _, err := s.raftServer.Do(command.NewAddCommand(setName, entry)); err != nil {
if err == raft.NotLeaderError {
http.Error(w, err.Error(), http.StatusConflict)
} else {
http.Error(w, err.Error(), http.StatusBadRequest)
}
s.handleError(w, err)
}
}
func (s *Server) delHandler(w http.ResponseWriter, r *http.Request) {
setName := r.FormValue("set")
entry := r.FormValue("entry")
if setName == "" || entry == "" {
http.Error(w, "Empty set/entry", http.StatusBadRequest)
return
}
if _, err := s.raftServer.Do(command.NewDelCommand(setName, entry)); err != nil {
s.handleError(w, err)
}
}
......@@ -171,10 +217,6 @@ func (s *Server) createHandler(w http.ResponseWriter, r *http.Request) {
}
if _, err := s.raftServer.Do(command.NewCreateCommand(setName, typeName)); err != nil {
if err == raft.NotLeaderError {
http.Error(w, err.Error(), http.StatusConflict)
} else {
http.Error(w, err.Error(), http.StatusBadRequest)
}
s.handleError(w, err)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment