Skip to content
Snippets Groups Projects
Commit 49c4b7df authored by ale's avatar ale
Browse files

work around some sqlite concurrency issues

parent cc85594e
Branches
No related tags found
No related merge requests found
......@@ -8,10 +8,11 @@ import time
import Queue
import sqlalchemy as sa
import sqlalchemy.exc
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.mutable import Mutable
from sqlalchemy.orm import sessionmaker
from sqlalchemy.types import TypeDecorator, VARCHAR
from sqlalchemy.ext.mutable import Mutable
Base = declarative_base()
......@@ -115,11 +116,7 @@ class StateDatabase(object):
session.add(work)
def set(self, session, work):
# The PickleField does not detect changes in the contained
# object, so we have to create a new one and add that instead.
#tmp = WorkUnit(work.key, work.state, work.data)
#self.put(session, tmp)
work.data = work.data
# The Mutable takes care of keeping track of changes to 'data'.
self.put(session, work)
def get(self, session, key):
......@@ -159,6 +156,17 @@ def transaction(db):
session.close()
@contextlib.contextmanager
def readonly_transaction(db):
for i in xrange(3):
try:
with transaction(db) as session:
yield session
except sqlalchemy.exc.OperationalError:
continue
break
@contextlib.contextmanager
def work_transaction(db, key):
with transaction(db) as session:
......@@ -206,20 +214,25 @@ class StateMachineWorker(threading.Thread):
break
log.debug('worker %d received key "%s"', self.thread_id, key)
with work_transaction(self.sm.db, key) as work:
self.set_info({'state': 'running',
'key': key,
'value': work.data,
'started_at': time.time()})
progress = WorkerProgressReporter(self)
new_state = self.sm.process(key, work.state, work.data, progress)
log.debug('worker %d processed "%s": %s -> %s',
self.thread_id, key, work.state, new_state)
work.state = new_state
# Reinject if not done.
if new_state not in EXIT_STATES:
try:
with work_transaction(self.sm.db, key) as work:
self.set_info({'state': 'running',
'key': key,
'value': work.data,
'started_at': time.time()})
progress = WorkerProgressReporter(self)
new_state = self.sm.process(key, work.state, work.data, progress)
log.debug('worker %d processed "%s": %s -> %s',
self.thread_id, key, work.state, new_state)
work.state = new_state
# Reinject if not done.
if new_state not in EXIT_STATES:
self.input_queue.put(key)
except Exception, e:
log.exception('Unexpected exception')
self.input_queue.put(key)
continue
log.debug('worker %d exiting', self.thread_id)
......@@ -301,11 +314,11 @@ class StateMachine(object):
return [t.info for t in self.threads]
def get_state(self):
with transaction(self.db) as session:
with readonly_transaction(self.db) as session:
return self.db.dump(session)
def compute_stats(self):
with transaction(self.db) as session:
with readonly_transaction(self.db) as session:
self.state_count = self.db.count_by_state(session)
def run(self):
......@@ -320,15 +333,18 @@ class StateMachine(object):
self.running = True
# Inject initial state.
with transaction(self.db) as session:
with readonly_transaction(self.db) as session:
for key in self.db.scan(session):
input_queue.put(key)
# Wait until everything is done.
while True:
with transaction(self.db) as session:
if self.db.are_we_done(session):
break
try:
with readonly_transaction(self.db) as session:
if self.db.are_we_done(session):
break
except sqlalchemy.exc.SQLAlchemyError:
pass
time.sleep(3)
# Kill the workers.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment