diff --git a/noblogsmv/driver.py b/noblogsmv/main.py similarity index 80% rename from noblogsmv/driver.py rename to noblogsmv/main.py index ef22b1235ee343d05b3c1e6569ff78da62b4df05..f7b5e8c0b509592a3a3496edfe2e5e8c045fa2c2 100644 --- a/noblogsmv/driver.py +++ b/noblogsmv/main.py @@ -3,6 +3,7 @@ import optparse import os import re import subprocess +import sys from noblogsmv import state from noblogsmv import webapp @@ -62,7 +63,7 @@ def process_move_db(blog_id, value, progress): tables = ' '.join(t.strip() for t in raw_tables.splitlines()) move_db = ['mysqldump', '--opt', old_mysql_opts, value['old_db'], tables, '|', 'mysql', new_mysql_opts, value['new_db']] - cmd = ['ssh', 'root@%s' value['old_host'], '"%s"' % ' '.join(move_db)] + cmd = ['ssh', 'root@%s' % value['old_host'], '"%s"' % ' '.join(move_db)] log.debug('Calling %s', ' '.join(cmd)) try: raw_tables = subprocess.check_output(cmd) @@ -94,7 +95,19 @@ class NoblogsStateMachine(state.StateMachine): def main(): - parser = optparse.OptionParser() + parser = optparse.OptionParser(usage='''%prog [<OPTIONS>] + +Noblogs rebalancing driver. Given a set of blog diffs on standard +input, it will perform the necessary migration tasks to ensure the +system reaches the desired state. An HTTP server allows the user to +monitor the migration progress. + +The program keeps its state checkpointed to a local database, so +that its execution can be resumed if interrupted. In this case, +rather than passing the data again on standard input, one should +use the `--recover' option. + +''') parser.add_option('--state', dest='state_file', default='noblogsmv.state', help='Location of the state file (default: %default)') @@ -106,6 +119,8 @@ def main(): help='Number of workers (default: %default)') parser.add_option('--recover', dest='recover', action='store_true', help='Recover a previous run') + parser.add_option('--noexit', action='store_true', + help='Do not exit when done') parser.add_option('--debug', action='store_true') opts, args = parser.parse_args() @@ -115,17 +130,22 @@ def main(): logging.basicConfig( level=(logging.DEBUG if opts.debug else logging.INFO)) - sm = NoblogsStateMachine.new_with_fd( + inputfd = None + if not opts.recover: + inputfd = sys.stdin + sm, httpserver = NoblogsStateMachine.new_with_fd( opts.state_file, - inputfd=opts.recover and sys.stdin or None, - num_workers=opts.num_workers) - - httpserver = webapp.WsgiThread(sm, opts.port) - httpserver.setDaemon(True) - httpserver.start() + inputfd=inputfd, + num_workers=opts.num_workers, + enable_http=True, + http_port=opts.port) sm.run() + if opts.noexit: + # This will never return. + httpserver.join() + if __name__ == '__main__': main() diff --git a/noblogsmv/state.py b/noblogsmv/state.py index df6c859133268f613760b0950e24f9b534c6064c..f8defddb66ecba0161571d46ece3c349fc63dc73 100644 --- a/noblogsmv/state.py +++ b/noblogsmv/state.py @@ -101,6 +101,16 @@ create table kv ( break yield row[0] + def dump(self): + """Return every object in the db.""" + cursor = self.conn.cursor() + cursor.execute('select key, state, value from kv') + while True: + row = cursor.fetchone() + if not row: + break + yield WorkUnit(row[0], row[1], self.codec.loads(row[2])) + def are_we_done(self): """Check if all the entries are in a final state.""" cursor = self.conn.cursor() @@ -112,6 +122,14 @@ create table kv ( return False # error, default to false return row[0] == 0 + def count_by_state(self): + cursor = self.conn.cursor() + cursor.execute('select state, count(*) from kv group by state') + counts = {} + for row in cursor.fetchall(): + counts[row[0]] = row[1] + return counts + @contextlib.contextmanager def work_transaction(db, key): @@ -194,6 +212,22 @@ class StateMachineWorker(threading.Thread): return info +class StatsThread(threading.Thread): + + def __init__(self, sm): + threading.Thread.__init__(self) + self.sm = sm + self._stop = threading.Event() + + def stop(self): + self._stop.set() + + def run(self): + while not self._stop.isSet(): + self._stop.wait(10) + self.sm.compute_stats() + + class StateMachine(object): # Subclasses should override this, retaining the 'error' rule. @@ -206,6 +240,7 @@ class StateMachine(object): self.running = False self.num_workers = num_workers self.threads = [] + self.state_count = {} def load_data(self, input_stream): for key, value in input_stream: @@ -238,6 +273,12 @@ class StateMachine(object): def get_worker_info(self): return [t.info for t in self.threads] + def get_state(self): + return self.db.dump() + + def compute_stats(self): + self.state_count = self.db.count_by_state() + def run(self): input_queue = Queue.Queue() @@ -245,11 +286,12 @@ class StateMachine(object): StateMachineWorker(self, input_queue, i + 1) for i in xrange(self.num_workers)] [x.start() for x in self.threads] + stats_thread = StatsThread(self) + stats_thread.start() self.running = True # Inject initial state. for key in self.db.scan(): - print 'queuing', key input_queue.put(key) # Wait until everything is done. @@ -260,17 +302,31 @@ class StateMachine(object): for i in xrange(self.num_workers): input_queue.put(None) [x.join() for x in self.threads] + stats_thread.stop() + stats_thread.join() self.running = False @classmethod - def new_with_fd(cls, dbfile, inputfd=None, num_workers=10): + def new_with_fd(cls, dbfile, inputfd=None, num_workers=10, + enable_http=False, http_port=3030): sm = cls(num_workers, dbfile) + + if enable_http: + from noblogsmv import webapp + httpserver = webapp.WsgiThread(sm, http_port) + httpserver.setDaemon(True) + httpserver.start() + if inputfd: def _read_values(): for line in inputfd: key, value = line.strip().split(' ', 1) yield key, json.loads(value) sm.load_data(_read_values()) + + if enable_http: + return sm, httpserver + return sm diff --git a/noblogsmv/test/test_state.py b/noblogsmv/test/test_state.py index f39a75aa5997da670a1a018804ab4fc4884ae630..71db635a3f16c3143f045ecf83e1145ec68aff60 100644 --- a/noblogsmv/test/test_state.py +++ b/noblogsmv/test/test_state.py @@ -56,6 +56,13 @@ class DatabaseTest(TestBase): result = set(self.db.scan()) self.assertEquals(set(['key1', 'key3']), result) + def test_dump(self): + self.db.put(state.WorkUnit('key1', state.STATE_INIT, {})) + self.db.put(state.WorkUnit('key2', state.STATE_DONE, {})) + self.db.put(state.WorkUnit('key3', 'mystate', {})) + result = list(self.db.dump()) + self.assertEquals(3, len(result)) + def test_are_we_done(self): self.db.put(TEST_WORK_UNIT) self.assertFalse(self.db.are_we_done()) diff --git a/noblogsmv/webapp.py b/noblogsmv/webapp.py index 15151c10e0ec3f6273c6afa16d8b2491db9357e4..ef38da8e8adc11d5ad4cfe3e677c57c85e0a18ec 100644 --- a/noblogsmv/webapp.py +++ b/noblogsmv/webapp.py @@ -1,5 +1,6 @@ import collections import logging +import os import socket import threading import time @@ -20,10 +21,7 @@ def index(): now = time.time() cur_state = g.sm.get_state() worker_info = g.sm.get_worker_info() - # Compute some nice aggregates. - state_count = collections.defaultdict(int) - for data in cur_state.itervalues(): - state_count[data.get('state', 'unknown')] += 1 + state_count = g.sm.state_count return render_template('index.html', state=cur_state, state_count=state_count, @@ -32,6 +30,8 @@ def index(): def create_app(sm, port, config={}): + app.config.update( + SECRET_KEY=os.urandom(8)) app.config.from_pyfile('APP_CONFIG', silent=True) app.config.update( HOST_NAME=socket.gethostname(), diff --git a/setup.py b/setup.py index f38d4590b4301418af18c95315ceda78003821cb..2cda674f4203e5ab3fb5e7e7ea7b22e872760f9c 100644 --- a/setup.py +++ b/setup.py @@ -4,11 +4,11 @@ from setuptools import setup, find_packages setup( name="noblogsmv", - version="0.2", + version="0.2.1", description="Parallel FSM for Noblogs operations", author="ale", author_email="ale@incal.net", - url="http://git.autistici.org/ai/noblogsmv", + url="https://git.autistici.org/ai/noblogsmv", install_requires=['Flask'], setup_requires=[], zip_safe=True, @@ -17,7 +17,7 @@ setup( entry_points={ "console_scripts": [ "noblogs-map-diff = noblogsmv.noblogs_map_diff:main", - "noblogsmv-driver = noblogsmv.driver:main", + "noblogsmv = noblogsmv.main:main", ], }, )