Skip to content
Snippets Groups Projects
Commit 005f3f23 authored by ale's avatar ale
Browse files

use leveldb

parent 0e88b058
Branches
No related tags found
No related merge requests found
......@@ -7,14 +7,8 @@ import threading
import time
import Queue
import sqlalchemy as sa
import sqlalchemy.exc
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.mutable import Mutable
from sqlalchemy.orm import sessionmaker
from sqlalchemy.types import TypeDecorator, VARCHAR
import leveldb
Base = declarative_base()
log = logging.getLogger(__name__)
......@@ -28,119 +22,130 @@ STATE_ERROR = 'error'
EXIT_STATES = (STATE_DONE, STATE_ERROR)
# Encode the data field as JSON
class JSONEncodedDict(TypeDecorator):
"Represents an immutable structure as a json-encoded string."
# The base work unit has a primary key, a state, and some opaque data.
class WorkUnit(object):
impl = VARCHAR
def __init__(self, key, state, data):
self.key = key
self.state = state
self.data = data
def process_bind_param(self, value, dialect):
if value is not None:
value = json.dumps(value)
return value
def __eq__(self, b):
return ((self.key == b.key) and
(self.state == b.state) and
(self.data == b.data))
def process_result_value(self, value, dialect):
if value is not None:
value = json.loads(value)
return value
from json import JSONEncoder, JSONDecoder
class MutableDict(Mutable, dict):
class _WUEncoder(JSONEncoder):
@classmethod
def coerce(cls, key, value):
"Convert plain dictionaries to MutableDict."
def default(self, o):
return o.__dict__
if not isinstance(value, MutableDict):
if isinstance(value, dict):
return MutableDict(value)
# this call will raise ValueError
return Mutable.coerce(key, value)
else:
return value
def from_json(json_object):
if 'key' in json_object:
return WorkUnit(json_object['key'],
json_object['state'],
json_object['data'])
return json_object
def __setitem__(self, key, value):
"Detect dictionary set events and emit change events."
WUEncoder = _WUEncoder()
WUDecoder = JSONDecoder(object_hook=from_json)
dict.__setitem__(self, key, value)
self.changed()
def __delitem__(self, key):
"Detect dictionary del events and emit change events."
class LevelDbSession(object):
dict.__delitem__(self, key)
self.changed()
def __init__(self, db):
self.db = db
self.snap = db.CreateSnapshot()
def _decode(self, obj):
if obj:
return WUDecoder.decode(obj)
# The base work unit has a primary key, a state, and some opaque data.
class WorkUnit(Base):
def _encode(self, obj):
return WUEncoder.encode(obj)
def close(self):
del self.snap
__tablename__ = 'work'
key = sa.Column(sa.String(128), primary_key=True, unique=True)
state = sa.Column(sa.String(6), index=True)
data = sa.Column(MutableDict.as_mutable(JSONEncodedDict))
def put(self, key, value):
self.db.Put(key, self._encode(value))
def __init__(self, key, state, data):
self.key = key
self.state = state
self.data = data
def get(self, key):
return self._decode(self.snap.Get(key))
def __eq__(self, b):
return ((self.key == b.key) and
(self.state == b.state) and
(self.data == b.data))
def put_many(self, values):
wb = self.db.WriteBatch()
for key, value in values:
wb.Put(key, self._encode(value))
self.db.Write(wb)
def scan(self, keys_only=False):
iter = self.snap.RangeIter(key_from='',
key_to='\xff',
fill_cache=False,
include_value=not keys_only)
for datum in iter:
if keys_only:
yield datum
else:
yield datum[0], self._decode(datum[1])
class StateDatabase(object):
"""Simple key-value database that encodes data with a codec."""
def __init__(self, path, codec=json):
self.path = path
self.db = leveldb.LevelDB(path)
self.codec = codec
self.engine = sa.create_engine('sqlite:///' + path)
self.session = sessionmaker(bind=self.engine)
Base.metadata.create_all(self.engine)
@property
def conn(self):
if not hasattr(self._local, 'conn'):
self._local.conn = sqlite3.connect(self.path)
return self._local.conn
def session(self):
return LevelDbSession(self.db)
def close(self):
pass
def put(self, session, work):
session.add(work)
session.put(work.key, work)
def set(self, session, work):
# The Mutable takes care of keeping track of changes to 'data'.
self.put(session, work)
def get(self, session, key):
return session.query(WorkUnit).get(key)
return session.get(key)
def scan(self, session, only_pending=True):
"""Iterate over all the database keys."""
q = session.query(WorkUnit)
if only_pending:
q = q.filter(~WorkUnit.state.in_(EXIT_STATES))
return (x.key for x in q)
result = []
for key, value in session.scan():
if only_pending and value.state in EXIT_STATES:
continue
result.append(key)
return result
def dump(self, session):
"""Return every object in the db."""
return session.query(WorkUnit)
return [x[1] for x in session.scan()]
def are_we_done(self, session):
"""Check if all the entries are in a final state."""
n = session.query(WorkUnit).filter(~WorkUnit.state.in_(EXIT_STATES)).count()
return n == 0
for key, value in session.scan():
if value.state not in EXIT_STATES:
return False
return True
def count_by_state(self, session):
return dict(session.query('state', 'c').from_statement(
'select state, count(*) as c from work group by state').all())
count = {}
for key, value in session.scan():
if value.state in count:
count[value.state] += 1
else:
count[value.state] = 1
return count
@contextlib.contextmanager
......@@ -148,21 +153,17 @@ def transaction(db):
session = db.session()
try:
yield session
session.commit()
except:
session.rollback()
raise
finally:
session.close()
@contextlib.contextmanager
def readonly_transaction(db):
for i in xrange(10):
for i in xrange(3):
try:
with transaction(db) as session:
yield session
except sqlalchemy.exc.OperationalError:
except leveldb.LevelDBError:
time.sleep(0.01)
continue
break
......@@ -380,7 +381,7 @@ class StateMachine(object):
with readonly_transaction(self.db) as session:
if self.db.are_we_done(session):
break
except sqlalchemy.exc.SQLAlchemyError:
except levelDb.LevelDBError:
pass
time.sleep(3)
......
import collections
import os
import shutil
import sqlalchemy
import sqlalchemy.exc
import tempfile
import threading
import time
......@@ -10,6 +8,8 @@ import unittest
import urllib2
from noblogsmv import state
import leveldb
def _test_work_unit():
return state.WorkUnit('key', state.STATE_INIT, {'a': 42})
......@@ -44,12 +44,12 @@ class DatabaseTest(TestBase):
w2 = self.db.get(session, key)
self.assertEquals(_test_work_unit(), w2)
def test_put_is_unique(self):
def _saveit():
with state.transaction(self.db) as session:
self.db.put(session, _test_work_unit())
_saveit()
self.assertRaises(sqlalchemy.exc.SQLAlchemyError, _saveit)
# def test_put_is_unique(self):
# def _saveit():
# with state.transaction(self.db) as session:
# self.db.put(session, _test_work_unit())
# _saveit()
# self.assertRaises(leveldb.LevelDBError, _saveit)
def test_set(self):
t = _test_work_unit()
......@@ -111,7 +111,7 @@ class DatabaseTest(TestBase):
try:
with state.work_transaction(self.db, key) as work:
work.state = state.STATE_DONE
except sqlalchemy.exc.OperationalError, e:
except leveldb.LevelDBError:
out['errors'] += 1
except Exception, e:
print e
......
......@@ -9,7 +9,7 @@ setup(
author="ale",
author_email="ale@incal.net",
url="https://git.autistici.org/ai/noblogsmv",
install_requires=['Flask', 'SQLAlchemy', 'pygooglechart'],
install_requires=['Flask', 'leveldb', 'pygooglechart'],
setup_requires=[],
zip_safe=True,
packages=find_packages(),
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment