Skip to content
Snippets Groups Projects
Select Git revision
  • e558cb36d0c9663d4e67dc60069cd2cdedba4a37
  • noblogs default
  • noblogs-5.7.1
  • upstream
  • noblogs-5.7
  • noblogs-5.6new
  • upstream5.5.1
  • noblogs28dic
  • upstream28dic
  • noblogs-5.5.1
  • noblogs-5.4.2
  • noblogs-5.4_seconda
  • noblogs-5.4
  • noblogs-7c
  • wp5.2.3p3
  • mergedbconf
  • noblogs-5.7.1
  • noblogs.5.7.0p1
  • noblogs-5.7.0
  • noblogs-5.6p3
  • noblogs5.6p2
  • noblogs-5.6p1
  • noblogs-5.6
  • noblogs-5.4.2p1
  • noblogs-5.4.2
  • noblogs-5.4.1
  • noblogs-5.4
  • noblogs-p5.4
  • noblogs-5.3.2p2
  • noblogs-5.3.2p1
  • noblogs-5.3.2
  • noblogs-5.3
  • noblogs-5.2.3p4
  • noblogs-5.2.3p3
  • noblogs-5.2.3p2
  • noblogs-5.2.3p1
36 results

customize-preview.js

Blame
  • agent.go 2.41 KiB
    package tabacco
    
    import (
    	"context"
    	"fmt"
    	"log"
    
    	"git.autistici.org/ai3/tools/tabacco/jobs"
    	"git.autistici.org/ai3/tools/tabacco/util"
    )
    
    // Agent holds a Manager and a Scheduler together, and runs periodic
    // backup jobs for all known sources.
    type Agent struct {
    	mgr    Manager
    	sched  *jobs.Scheduler
    	stopCh chan struct{}
    }
    
    // NewAgent creates a new Agent with the specified config.
    func NewAgent(ctx context.Context, configMgr *ConfigManager, ms MetadataStore) (*Agent, error) {
    	mgr, err := NewManager(ctx, configMgr, ms)
    	if err != nil {
    		return nil, err
    	}
    
    	// Create a Scheduler and register with the configMgr so we
    	// can reload the schedule on configuration changes.
    	sched := jobs.NewScheduler()
    	stopCh := make(chan struct{})
    	notifyCh := configMgr.Notify()
    	go func() {
    		for {
    			select {
    			case <-stopCh:
    				return
    			case <-notifyCh:
    				config := configMgr.current()
    				schedule, err := makeSchedule(ctx, mgr, config.SourceSpecs(), config.Seed())
    				if err != nil {
    					log.Printf("error updating scheduler: %v", err)
    				}
    				if schedule != nil {
    					log.Printf("scheduler updated")
    					sched.SetSchedule(schedule)
    				}
    			}
    		}
    	}()
    
    	return &Agent{
    		mgr:    mgr,
    		sched:  sched,
    		stopCh: stopCh,
    	}, nil
    }
    
    // Close the Agent and all associated resources.
    func (a *Agent) Close() {
    	close(a.stopCh)
    	a.mgr.Close() // nolint
    	a.sched.Stop()
    }
    
    // Create a new jobs.Schedule that will trigger a separate backup for
    // each configured data source that includes a 'schedule' attribute.
    func makeSchedule(ctx context.Context, m Manager, sourceSpecs []*SourceSpec, hostSeed int64) (*jobs.Schedule, error) {
    	sched := jobs.NewSchedule(ctx, hostSeed)
    	merr := new(util.MultiError)
    	var good int
    	for _, spec := range sourceSpecs {
    		if spec.Schedule == "" {
    			continue
    		}
    		// Bind spec to a new closure.
    		err := func(spec *SourceSpec) error {
    			return sched.Add(spec.Name, spec.Schedule, func() jobs.Job {
    				_, j, err := m.BackupJob(ctx, spec)
    				if err != nil {
    					log.Printf("%s: can't create backup job: %v", spec.Name, err)
    				}
    				return j
    			})
    		}(spec)
    		if err != nil {
    			merr.Add(fmt.Errorf("%s: %v", spec.Name, err))
    		} else {
    			good++
    		}
    	}
    
    	// All sources failing is a fatal error, return a nil Schedule.
    	if good == 0 && !merr.IsNil() {
    		return nil, merr
    	}
    
    	return sched, merr.OrNil()
    }
    
    // RunNow starts all jobs right now, regardless of their schedule.
    func (a *Agent) RunNow() {
    	a.sched.RunNow()
    }