diff --git a/noblogsmv/state.py b/noblogsmv/state.py index 06ce4ca1bb4ebb33d548e573840d0fb3cfe689aa..644f25a821aeb3ec27318f1ec995932b6dc6165a 100644 --- a/noblogsmv/state.py +++ b/noblogsmv/state.py @@ -8,10 +8,11 @@ import time import Queue import sqlalchemy as sa +import sqlalchemy.exc from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.ext.mutable import Mutable from sqlalchemy.orm import sessionmaker from sqlalchemy.types import TypeDecorator, VARCHAR -from sqlalchemy.ext.mutable import Mutable Base = declarative_base() @@ -115,11 +116,7 @@ class StateDatabase(object): session.add(work) def set(self, session, work): - # The PickleField does not detect changes in the contained - # object, so we have to create a new one and add that instead. - #tmp = WorkUnit(work.key, work.state, work.data) - #self.put(session, tmp) - work.data = work.data + # The Mutable takes care of keeping track of changes to 'data'. self.put(session, work) def get(self, session, key): @@ -159,6 +156,17 @@ def transaction(db): session.close() +@contextlib.contextmanager +def readonly_transaction(db): + for i in xrange(3): + try: + with transaction(db) as session: + yield session + except sqlalchemy.exc.OperationalError: + continue + break + + @contextlib.contextmanager def work_transaction(db, key): with transaction(db) as session: @@ -206,20 +214,25 @@ class StateMachineWorker(threading.Thread): break log.debug('worker %d received key "%s"', self.thread_id, key) - with work_transaction(self.sm.db, key) as work: - self.set_info({'state': 'running', - 'key': key, - 'value': work.data, - 'started_at': time.time()}) - progress = WorkerProgressReporter(self) - new_state = self.sm.process(key, work.state, work.data, progress) - log.debug('worker %d processed "%s": %s -> %s', - self.thread_id, key, work.state, new_state) - work.state = new_state - - # Reinject if not done. - if new_state not in EXIT_STATES: + try: + with work_transaction(self.sm.db, key) as work: + self.set_info({'state': 'running', + 'key': key, + 'value': work.data, + 'started_at': time.time()}) + progress = WorkerProgressReporter(self) + new_state = self.sm.process(key, work.state, work.data, progress) + log.debug('worker %d processed "%s": %s -> %s', + self.thread_id, key, work.state, new_state) + work.state = new_state + + # Reinject if not done. + if new_state not in EXIT_STATES: + self.input_queue.put(key) + except Exception, e: + log.exception('Unexpected exception') self.input_queue.put(key) + continue log.debug('worker %d exiting', self.thread_id) @@ -301,11 +314,11 @@ class StateMachine(object): return [t.info for t in self.threads] def get_state(self): - with transaction(self.db) as session: + with readonly_transaction(self.db) as session: return self.db.dump(session) def compute_stats(self): - with transaction(self.db) as session: + with readonly_transaction(self.db) as session: self.state_count = self.db.count_by_state(session) def run(self): @@ -320,15 +333,18 @@ class StateMachine(object): self.running = True # Inject initial state. - with transaction(self.db) as session: + with readonly_transaction(self.db) as session: for key in self.db.scan(session): input_queue.put(key) # Wait until everything is done. while True: - with transaction(self.db) as session: - if self.db.are_we_done(session): - break + try: + with readonly_transaction(self.db) as session: + if self.db.are_we_done(session): + break + except sqlalchemy.exc.SQLAlchemyError: + pass time.sleep(3) # Kill the workers.