Skip to content
Snippets Groups Projects
Commit 55b3fa00 authored by ale's avatar ale
Browse files

Refactor the Watcher -> TriggerManager relation

It is now possible to run a Watcher without triggers.
parent b12fa516
No related branches found
No related tags found
No related merge requests found
......@@ -75,6 +75,18 @@ func (c *pullCommand) Execute(ctx context.Context, f *flag.FlagSet, args ...inte
return subcommands.ExitUsageError
}
// Load all triggers from their JSON configuration directory.
var triggers watcher.TriggerManager
if c.triggersPath != "" {
var err error
triggers, err = watcher.LoadTriggersFromDir(c.triggersPath)
if err != nil {
log.Printf("error loading triggers: %v", err)
return subcommands.ExitFailure
}
}
// Create a file-backed store for the local target path.
store, err := mem.NewMemFileStore(c.storePath)
if err != nil {
log.Printf("error initializing storage: %v", err)
......@@ -82,6 +94,7 @@ func (c *pullCommand) Execute(ctx context.Context, f *flag.FlagSet, args ...inte
}
defer store.Close()
// Dial the GRPC server.
opts, err := c.grpcDialOptions()
if err != nil {
log.Printf("error in GRPC options: %v", err)
......@@ -94,15 +107,9 @@ func (c *pullCommand) Execute(ctx context.Context, f *flag.FlagSet, args ...inte
return subcommands.ExitFailure
}
triggers := make(map[string]watcher.Trigger)
if c.triggersPath != "" {
triggers, err = watcher.LoadTriggersFromDir(c.triggersPath)
if err != nil {
log.Printf("error loading triggers: %v", err)
return subcommands.ExitFailure
}
}
// Create the Watcher, and run it with a controlling Context
// that allows us to stop everything upon receiving a
// termination signal.
prefix := f.Arg(0)
if !strings.HasPrefix(prefix, "/") {
prefix = "/" + prefix
......
......@@ -12,22 +12,22 @@ import (
pb "git.autistici.org/ai3/tools/replds2/proto"
)
type Trigger interface {
Run([]*pb.Node) error
}
// The nullTriggerManager does nothing.
type nullTriggerManager struct{}
func (nullTriggerManager) Has(string) bool { return false }
func (nullTriggerManager) Notify(*common.NotifyBatch) {}
type triggerManager map[string]Trigger
// The scriptTriggerManager runs user-configured trigger scripts.
type scriptTriggerManager map[string]*scriptTrigger
func (m triggerManager) Has(path string) bool {
func (m scriptTriggerManager) Has(path string) bool {
_, ok := m[path]
return ok
}
func (m triggerManager) newBatch() *common.NotifyBatch {
return common.NewNotifyBatch(m)
}
func (m triggerManager) Notify(b *common.NotifyBatch) {
func (m scriptTriggerManager) Notify(b *common.NotifyBatch) {
b.Apply(func(path string, nodes []*pb.Node) {
trigger := m[path]
trigger.Run(nodes)
......@@ -43,14 +43,14 @@ func (t *scriptTrigger) Run(nodes []*pb.Node) error {
return nil
}
func LoadTriggersFromDir(dir string) (map[string]Trigger, error) {
func LoadTriggersFromDir(dir string) (TriggerManager, error) {
d, err := os.Open(dir)
if err != nil {
return nil, err
}
defer d.Close()
m := make(map[string]Trigger)
m := make(map[string]*scriptTrigger)
files, err := d.Readdir(0)
if err != nil {
return nil, err
......@@ -76,9 +76,8 @@ func LoadTriggersFromDir(dir string) (map[string]Trigger, error) {
log.Printf("invalid trigger specification in %s", f.Name())
continue
}
m[trig.Path] = &trig
}
return m, nil
return scriptTriggerManager(m), nil
}
......@@ -7,25 +7,42 @@ import (
"time"
replds "git.autistici.org/ai3/tools/replds2"
"git.autistici.org/ai3/tools/replds2/common"
pb "git.autistici.org/ai3/tools/replds2/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// A TriggerManager is both source and executor of triggers
// (notifications in common/notify.go terms).
type TriggerManager interface {
common.NotifySource
Notify(*common.NotifyBatch)
}
// The Watcher is an asynchronous replication client that can operate
// on a subtree of the remote database. It will watch for changes and
// propagate them to the local database with low latency.
//
// It also has the capability of executing path-specific triggers upon
// changes.
type Watcher struct {
path string
store replds.Store
triggers triggerManager
triggers TriggerManager
conn *grpc.ClientConn
}
func New(conn *grpc.ClientConn, store replds.Store, path string, triggers map[string]Trigger) *Watcher {
func New(conn *grpc.ClientConn, store replds.Store, path string, triggers TriggerManager) *Watcher {
if triggers == nil {
triggers = new(nullTriggerManager)
}
return &Watcher{
path: path,
conn: conn,
store: store,
triggers: triggerManager(triggers),
triggers: triggers,
}
}
......@@ -57,7 +74,7 @@ func (w *Watcher) Run(ctx context.Context) {
}
// Run triggers for each batch.
tb := w.triggers.newBatch()
tb := common.NewNotifyBatch(w.triggers)
for _, node := range resp.Nodes {
w.store.AddNode(node)
tb.Add(node)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment