diff --git a/noblogsmv/state.py b/noblogsmv/state.py index dff4cce1d296d40590c2838ccbc01d4de65e4a6c..701b6463048a51fd6618d4beb0de2ee64ad0668e 100644 --- a/noblogsmv/state.py +++ b/noblogsmv/state.py @@ -7,14 +7,8 @@ import threading import time import Queue -import sqlalchemy as sa -import sqlalchemy.exc -from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.ext.mutable import Mutable -from sqlalchemy.orm import sessionmaker -from sqlalchemy.types import TypeDecorator, VARCHAR +import leveldb -Base = declarative_base() log = logging.getLogger(__name__) @@ -28,119 +22,130 @@ STATE_ERROR = 'error' EXIT_STATES = (STATE_DONE, STATE_ERROR) -# Encode the data field as JSON -class JSONEncodedDict(TypeDecorator): - "Represents an immutable structure as a json-encoded string." +# The base work unit has a primary key, a state, and some opaque data. +class WorkUnit(object): - impl = VARCHAR + def __init__(self, key, state, data): + self.key = key + self.state = state + self.data = data - def process_bind_param(self, value, dialect): - if value is not None: - value = json.dumps(value) - return value + def __eq__(self, b): + return ((self.key == b.key) and + (self.state == b.state) and + (self.data == b.data)) - def process_result_value(self, value, dialect): - if value is not None: - value = json.loads(value) - return value +from json import JSONEncoder, JSONDecoder -class MutableDict(Mutable, dict): +class _WUEncoder(JSONEncoder): - @classmethod - def coerce(cls, key, value): - "Convert plain dictionaries to MutableDict." + def default(self, o): + return o.__dict__ - if not isinstance(value, MutableDict): - if isinstance(value, dict): - return MutableDict(value) - # this call will raise ValueError - return Mutable.coerce(key, value) - else: - return value +def from_json(json_object): + if 'key' in json_object: + return WorkUnit(json_object['key'], + json_object['state'], + json_object['data']) + return json_object - def __setitem__(self, key, value): - "Detect dictionary set events and emit change events." +WUEncoder = _WUEncoder() +WUDecoder = JSONDecoder(object_hook=from_json) - dict.__setitem__(self, key, value) - self.changed() - def __delitem__(self, key): - "Detect dictionary del events and emit change events." +class LevelDbSession(object): - dict.__delitem__(self, key) - self.changed() + def __init__(self, db): + self.db = db + self.snap = db.CreateSnapshot() + def _decode(self, obj): + if obj: + return WUDecoder.decode(obj) -# The base work unit has a primary key, a state, and some opaque data. -class WorkUnit(Base): + def _encode(self, obj): + return WUEncoder.encode(obj) + + def close(self): + del self.snap - __tablename__ = 'work' - key = sa.Column(sa.String(128), primary_key=True, unique=True) - state = sa.Column(sa.String(6), index=True) - data = sa.Column(MutableDict.as_mutable(JSONEncodedDict)) + def put(self, key, value): + self.db.Put(key, self._encode(value)) - def __init__(self, key, state, data): - self.key = key - self.state = state - self.data = data + def get(self, key): + return self._decode(self.snap.Get(key)) - def __eq__(self, b): - return ((self.key == b.key) and - (self.state == b.state) and - (self.data == b.data)) + def put_many(self, values): + wb = self.db.WriteBatch() + for key, value in values: + wb.Put(key, self._encode(value)) + self.db.Write(wb) + + def scan(self, keys_only=False): + iter = self.snap.RangeIter(key_from='', + key_to='\xff', + fill_cache=False, + include_value=not keys_only) + for datum in iter: + if keys_only: + yield datum + else: + yield datum[0], self._decode(datum[1]) class StateDatabase(object): """Simple key-value database that encodes data with a codec.""" def __init__(self, path, codec=json): - self.path = path + self.db = leveldb.LevelDB(path) self.codec = codec - self.engine = sa.create_engine('sqlite:///' + path) - self.session = sessionmaker(bind=self.engine) - Base.metadata.create_all(self.engine) - - @property - def conn(self): - if not hasattr(self._local, 'conn'): - self._local.conn = sqlite3.connect(self.path) - return self._local.conn + def session(self): + return LevelDbSession(self.db) def close(self): pass def put(self, session, work): - session.add(work) + session.put(work.key, work) def set(self, session, work): # The Mutable takes care of keeping track of changes to 'data'. self.put(session, work) def get(self, session, key): - return session.query(WorkUnit).get(key) + return session.get(key) def scan(self, session, only_pending=True): """Iterate over all the database keys.""" - q = session.query(WorkUnit) - if only_pending: - q = q.filter(~WorkUnit.state.in_(EXIT_STATES)) - return (x.key for x in q) + result = [] + for key, value in session.scan(): + if only_pending and value.state in EXIT_STATES: + continue + result.append(key) + return result def dump(self, session): """Return every object in the db.""" - return session.query(WorkUnit) + return [x[1] for x in session.scan()] def are_we_done(self, session): """Check if all the entries are in a final state.""" - n = session.query(WorkUnit).filter(~WorkUnit.state.in_(EXIT_STATES)).count() - return n == 0 + for key, value in session.scan(): + if value.state not in EXIT_STATES: + return False + return True def count_by_state(self, session): - return dict(session.query('state', 'c').from_statement( - 'select state, count(*) as c from work group by state').all()) + count = {} + for key, value in session.scan(): + if value.state in count: + count[value.state] += 1 + else: + count[value.state] = 1 + return count @contextlib.contextmanager @@ -148,21 +153,17 @@ def transaction(db): session = db.session() try: yield session - session.commit() - except: - session.rollback() - raise finally: session.close() @contextlib.contextmanager def readonly_transaction(db): - for i in xrange(10): + for i in xrange(3): try: with transaction(db) as session: yield session - except sqlalchemy.exc.OperationalError: + except leveldb.LevelDBError: time.sleep(0.01) continue break @@ -380,7 +381,7 @@ class StateMachine(object): with readonly_transaction(self.db) as session: if self.db.are_we_done(session): break - except sqlalchemy.exc.SQLAlchemyError: + except levelDb.LevelDBError: pass time.sleep(3) diff --git a/noblogsmv/test/test_state.py b/noblogsmv/test/test_state.py index 2f1a208d540f6eac7be9a4d6694668a6c9e22fff..c943e7fe5b032c3f62100a34f0427d3158b45a54 100644 --- a/noblogsmv/test/test_state.py +++ b/noblogsmv/test/test_state.py @@ -1,8 +1,6 @@ import collections import os import shutil -import sqlalchemy -import sqlalchemy.exc import tempfile import threading import time @@ -10,6 +8,8 @@ import unittest import urllib2 from noblogsmv import state +import leveldb + def _test_work_unit(): return state.WorkUnit('key', state.STATE_INIT, {'a': 42}) @@ -44,12 +44,12 @@ class DatabaseTest(TestBase): w2 = self.db.get(session, key) self.assertEquals(_test_work_unit(), w2) - def test_put_is_unique(self): - def _saveit(): - with state.transaction(self.db) as session: - self.db.put(session, _test_work_unit()) - _saveit() - self.assertRaises(sqlalchemy.exc.SQLAlchemyError, _saveit) + # def test_put_is_unique(self): + # def _saveit(): + # with state.transaction(self.db) as session: + # self.db.put(session, _test_work_unit()) + # _saveit() + # self.assertRaises(leveldb.LevelDBError, _saveit) def test_set(self): t = _test_work_unit() @@ -111,7 +111,7 @@ class DatabaseTest(TestBase): try: with state.work_transaction(self.db, key) as work: work.state = state.STATE_DONE - except sqlalchemy.exc.OperationalError, e: + except leveldb.LevelDBError: out['errors'] += 1 except Exception, e: print e diff --git a/setup.py b/setup.py index fc39f7daff7c82505b5fb84eeda306c074489123..2b3c544f256676dbcd0e2daf701e43cc9dbeb9d8 100755 --- a/setup.py +++ b/setup.py @@ -9,7 +9,7 @@ setup( author="ale", author_email="ale@incal.net", url="https://git.autistici.org/ai/noblogsmv", - install_requires=['Flask', 'SQLAlchemy', 'pygooglechart'], + install_requires=['Flask', 'leveldb', 'pygooglechart'], setup_requires=[], zip_safe=True, packages=find_packages(),