Skip to content
Snippets Groups Projects
Commit d6a3fdc1 authored by ale's avatar ale
Browse files

a few fixes to the main noblogsmv driver code

parent 0175e3b9
No related branches found
No related tags found
No related merge requests found
......@@ -3,6 +3,7 @@ import optparse
import os
import re
import subprocess
import sys
from noblogsmv import state
from noblogsmv import webapp
......@@ -62,7 +63,7 @@ def process_move_db(blog_id, value, progress):
tables = ' '.join(t.strip() for t in raw_tables.splitlines())
move_db = ['mysqldump', '--opt', old_mysql_opts, value['old_db'], tables,
'|', 'mysql', new_mysql_opts, value['new_db']]
cmd = ['ssh', 'root@%s' value['old_host'], '"%s"' % ' '.join(move_db)]
cmd = ['ssh', 'root@%s' % value['old_host'], '"%s"' % ' '.join(move_db)]
log.debug('Calling %s', ' '.join(cmd))
try:
raw_tables = subprocess.check_output(cmd)
......@@ -94,7 +95,19 @@ class NoblogsStateMachine(state.StateMachine):
def main():
parser = optparse.OptionParser()
parser = optparse.OptionParser(usage='''%prog [<OPTIONS>]
Noblogs rebalancing driver. Given a set of blog diffs on standard
input, it will perform the necessary migration tasks to ensure the
system reaches the desired state. An HTTP server allows the user to
monitor the migration progress.
The program keeps its state checkpointed to a local database, so
that its execution can be resumed if interrupted. In this case,
rather than passing the data again on standard input, one should
use the `--recover' option.
''')
parser.add_option('--state', dest='state_file',
default='noblogsmv.state',
help='Location of the state file (default: %default)')
......@@ -106,6 +119,8 @@ def main():
help='Number of workers (default: %default)')
parser.add_option('--recover', dest='recover', action='store_true',
help='Recover a previous run')
parser.add_option('--noexit', action='store_true',
help='Do not exit when done')
parser.add_option('--debug', action='store_true')
opts, args = parser.parse_args()
......@@ -115,17 +130,22 @@ def main():
logging.basicConfig(
level=(logging.DEBUG if opts.debug else logging.INFO))
sm = NoblogsStateMachine.new_with_fd(
inputfd = None
if not opts.recover:
inputfd = sys.stdin
sm, httpserver = NoblogsStateMachine.new_with_fd(
opts.state_file,
inputfd=opts.recover and sys.stdin or None,
num_workers=opts.num_workers)
httpserver = webapp.WsgiThread(sm, opts.port)
httpserver.setDaemon(True)
httpserver.start()
inputfd=inputfd,
num_workers=opts.num_workers,
enable_http=True,
http_port=opts.port)
sm.run()
if opts.noexit:
# This will never return.
httpserver.join()
if __name__ == '__main__':
main()
......@@ -101,6 +101,16 @@ create table kv (
break
yield row[0]
def dump(self):
"""Return every object in the db."""
cursor = self.conn.cursor()
cursor.execute('select key, state, value from kv')
while True:
row = cursor.fetchone()
if not row:
break
yield WorkUnit(row[0], row[1], self.codec.loads(row[2]))
def are_we_done(self):
"""Check if all the entries are in a final state."""
cursor = self.conn.cursor()
......@@ -112,6 +122,14 @@ create table kv (
return False # error, default to false
return row[0] == 0
def count_by_state(self):
cursor = self.conn.cursor()
cursor.execute('select state, count(*) from kv group by state')
counts = {}
for row in cursor.fetchall():
counts[row[0]] = row[1]
return counts
@contextlib.contextmanager
def work_transaction(db, key):
......@@ -194,6 +212,22 @@ class StateMachineWorker(threading.Thread):
return info
class StatsThread(threading.Thread):
def __init__(self, sm):
threading.Thread.__init__(self)
self.sm = sm
self._stop = threading.Event()
def stop(self):
self._stop.set()
def run(self):
while not self._stop.isSet():
self._stop.wait(10)
self.sm.compute_stats()
class StateMachine(object):
# Subclasses should override this, retaining the 'error' rule.
......@@ -206,6 +240,7 @@ class StateMachine(object):
self.running = False
self.num_workers = num_workers
self.threads = []
self.state_count = {}
def load_data(self, input_stream):
for key, value in input_stream:
......@@ -238,6 +273,12 @@ class StateMachine(object):
def get_worker_info(self):
return [t.info for t in self.threads]
def get_state(self):
return self.db.dump()
def compute_stats(self):
self.state_count = self.db.count_by_state()
def run(self):
input_queue = Queue.Queue()
......@@ -245,11 +286,12 @@ class StateMachine(object):
StateMachineWorker(self, input_queue, i + 1)
for i in xrange(self.num_workers)]
[x.start() for x in self.threads]
stats_thread = StatsThread(self)
stats_thread.start()
self.running = True
# Inject initial state.
for key in self.db.scan():
print 'queuing', key
input_queue.put(key)
# Wait until everything is done.
......@@ -260,17 +302,31 @@ class StateMachine(object):
for i in xrange(self.num_workers):
input_queue.put(None)
[x.join() for x in self.threads]
stats_thread.stop()
stats_thread.join()
self.running = False
@classmethod
def new_with_fd(cls, dbfile, inputfd=None, num_workers=10):
def new_with_fd(cls, dbfile, inputfd=None, num_workers=10,
enable_http=False, http_port=3030):
sm = cls(num_workers, dbfile)
if enable_http:
from noblogsmv import webapp
httpserver = webapp.WsgiThread(sm, http_port)
httpserver.setDaemon(True)
httpserver.start()
if inputfd:
def _read_values():
for line in inputfd:
key, value = line.strip().split(' ', 1)
yield key, json.loads(value)
sm.load_data(_read_values())
if enable_http:
return sm, httpserver
return sm
......
......@@ -56,6 +56,13 @@ class DatabaseTest(TestBase):
result = set(self.db.scan())
self.assertEquals(set(['key1', 'key3']), result)
def test_dump(self):
self.db.put(state.WorkUnit('key1', state.STATE_INIT, {}))
self.db.put(state.WorkUnit('key2', state.STATE_DONE, {}))
self.db.put(state.WorkUnit('key3', 'mystate', {}))
result = list(self.db.dump())
self.assertEquals(3, len(result))
def test_are_we_done(self):
self.db.put(TEST_WORK_UNIT)
self.assertFalse(self.db.are_we_done())
......
import collections
import logging
import os
import socket
import threading
import time
......@@ -20,10 +21,7 @@ def index():
now = time.time()
cur_state = g.sm.get_state()
worker_info = g.sm.get_worker_info()
# Compute some nice aggregates.
state_count = collections.defaultdict(int)
for data in cur_state.itervalues():
state_count[data.get('state', 'unknown')] += 1
state_count = g.sm.state_count
return render_template('index.html',
state=cur_state,
state_count=state_count,
......@@ -32,6 +30,8 @@ def index():
def create_app(sm, port, config={}):
app.config.update(
SECRET_KEY=os.urandom(8))
app.config.from_pyfile('APP_CONFIG', silent=True)
app.config.update(
HOST_NAME=socket.gethostname(),
......
......@@ -4,11 +4,11 @@ from setuptools import setup, find_packages
setup(
name="noblogsmv",
version="0.2",
version="0.2.1",
description="Parallel FSM for Noblogs operations",
author="ale",
author_email="ale@incal.net",
url="http://git.autistici.org/ai/noblogsmv",
url="https://git.autistici.org/ai/noblogsmv",
install_requires=['Flask'],
setup_requires=[],
zip_safe=True,
......@@ -17,7 +17,7 @@ setup(
entry_points={
"console_scripts": [
"noblogs-map-diff = noblogsmv.noblogs_map_diff:main",
"noblogsmv-driver = noblogsmv.driver:main",
"noblogsmv = noblogsmv.main:main",
],
},
)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment