diff --git a/mapreduce/mr_clean_tasks/mr_clean_tasks.go b/mapreduce/mr_clean_tasks/mr_clean_tasks.go index b7f017d3c9e4f5b2d97d039f095042325b215010..665a582181b6c7ba6bf2f2e9454d7af706cee76f 100644 --- a/mapreduce/mr_clean_tasks/mr_clean_tasks.go +++ b/mapreduce/mr_clean_tasks/mr_clean_tasks.go @@ -10,37 +10,26 @@ import ( "github.com/dgryski/dmrgo" "git.autistici.org/ale/djrandom/api/task_api" - "git.autistici.org/ale/djrandom/services" - db_client "git.autistici.org/ale/djrandom/services/database/client" - "git.autistici.org/ale/djrandom/util/config" + "git.autistici.org/ale/djrandom/mapreduce" ) var ( - configFile = flag.String("config", "/etc/djrandom/server.conf", "Config file location") cleanAllTasks = flag.Bool("all", false, "Clear all tasks (dangerous)") maxJobAge = flag.Int64("max-age", 86400, "Task age cutoff") ) type MRCleanTasks struct { - db services.Database - proto dmrgo.StreamProtocol - now int64 + mapreduce.MapReduce + now int64 } func (mr *MRCleanTasks) Map(key string, value string, emitter dmrgo.Emitter) { jobId := value - // Create a database session. - session, err := mr.db.NewSession() - if err != nil { - return - } - defer session.Close() - // Fetch the job and delete it if it's too old. if !*cleanAllTasks { var job task_api.Job - if err := mr.db.Get(session, "tasks", jobId, &job); err != nil { + if err := mr.Db.Get(mr.Session, "tasks", jobId, &job); err != nil { return } if !job.Final || (mr.now-job.CompletedAt) < *maxJobAge { @@ -48,7 +37,7 @@ func (mr *MRCleanTasks) Map(key string, value string, emitter dmrgo.Emitter) { } } - mr.db.Del(session, "tasks", jobId) + mr.Db.Del(mr.Session, "tasks", jobId) log.Printf("removed job %s", jobId) } @@ -59,14 +48,5 @@ func (mr *MRCleanTasks) Reduce(key string, values []string, emitter dmrgo.Emitte } func main() { - flag.Parse() - config.Parse(*configFile) - - db := db_client.NewDatabaseImplFromConfig() - mr := &MRCleanTasks{ - db: db, - proto: &dmrgo.JSONProtocol{}, - now: time.Now().Unix(), - } - dmrgo.Main(mr) + mapreduce.Main(&MRCleanTasks{now: time.Now().Unix()}, nil) } diff --git a/mapreduce/mr_dedup_meta/mr_dedup_meta.go b/mapreduce/mr_dedup_meta/mr_dedup_meta.go index 16bb3e53b2078dea57e153c72f87785399ee65a2..cf3cfca30526b4c12e5f8029c5b35ae4abaa529f 100644 --- a/mapreduce/mr_dedup_meta/mr_dedup_meta.go +++ b/mapreduce/mr_dedup_meta/mr_dedup_meta.go @@ -18,7 +18,6 @@ import ( ) var ( - configFile = flag.String("config", "/etc/djrandom/server.conf", "Config file location") memProfile = flag.String("memprofile", "", "save memory profile to file") apply = flag.Bool("apply", false, "Really apply deduplication changes") ) diff --git a/mapreduce/mr_reprocess/mr_reprocess.go b/mapreduce/mr_reprocess/mr_reprocess.go index 44c474553fa6df77b54ad14e1fe2b48d929517b8..42e0d87ec08ef2df5956bb080eadeae98c40edcb 100644 --- a/mapreduce/mr_reprocess/mr_reprocess.go +++ b/mapreduce/mr_reprocess/mr_reprocess.go @@ -13,9 +13,8 @@ import ( ) var ( - configFile = flag.String("config", "/etc/djrandom/server.conf", "Config file location") - planName = flag.String("plan", "incoming", "Plan to execute") - state = flag.String("state", "", "Filter object with this state") + planName = flag.String("plan", "incoming", "Plan to execute") + state = flag.String("state", "", "Filter object with this state") ) type MRReprocess struct { diff --git a/mapreduce/mr_stats/mr_stats.go b/mapreduce/mr_stats/mr_stats.go index 4322fb26a31c6c8ddd40ee13199499a1a5f8f113..120314307013a03188b0e37bd82a96751b7de123 100644 --- a/mapreduce/mr_stats/mr_stats.go +++ b/mapreduce/mr_stats/mr_stats.go @@ -3,7 +3,6 @@ package main import ( - "flag" "fmt" "log" "strings" @@ -16,10 +15,6 @@ import ( "git.autistici.org/ale/djrandom/util/instrumentation" ) -var ( - configFile = flag.String("config", "/etc/djrandom/server.conf", "Config file location") -) - type MRStats struct { mapreduce.MapReduce }