diff --git a/configdb/db/interface/etcd_interface.py b/configdb/db/interface/etcd_interface.py index 64f6e12687dfe8bb0cab961880ceabab7e9da4c0..bc5a9ea3b1b6335a7f9234679b5cc5f395c44d7b 100644 --- a/configdb/db/interface/etcd_interface.py +++ b/configdb/db/interface/etcd_interface.py @@ -14,12 +14,15 @@ from configdb import exceptions from configdb.db.interface import base from configdb.db.interface import inmemory_interface +import logging +log = logging.getLogger(__name__) class EtcdSession(inmemory_interface.InMemorySession): """A EtcdInterface session.""" def __init__(self,db): self.db = db + self.revisions = {} def _escape(self,s): return s.encode('hex') @@ -36,18 +39,22 @@ class EtcdSession(inmemory_interface.InMemorySession): def add(self, obj): path = self._mkpath(obj._entity_name, obj.name) - try: - idx = self.db.conn.read(path).modifiedIndex - opts = {'prevIndex': idx} - except KeyError: + # If we don't have a revision, + rev = self.revisions.get(path, None) + log.debug("Path %s, rev %s", path, rev) + if rev is None: opts = {'prevExist': False} + else: + opts = {'prevIndex': rev} # Will raise ValueError if the test fails. try: - self.db.conn.write(path, self.db._serialize(obj), **opts) - except ValueError: + r = self.db.conn.write(path, self.db._serialize(obj), **opts) + self.revisions[path] = r.modifiedIndex + except (ValueError, KeyError): raise exceptions.IntegrityError('Bad revision') + def delete(self, obj): self._delte_by_name(obj._entity_name, obj.name) @@ -55,11 +62,16 @@ class EtcdSession(inmemory_interface.InMemorySession): def _delete_by_name(self, entity_name, obj_name): path = self._mkpath(entity_name, obj_name) try: - #etcd has no way to atomically delete objects. Meh! + rev = self.revisions.pop(path, 0) + # etcd has no way to atomically delete objects depending on their index. Meh! + # we simulate (coarsely) the correct behaviour + self.db.conn.write(path, '__to_delete',prevIndex = rev) self.db.conn.delete(path) except KeyError: pass - + except ValueError: + # CAS has failed + raise exceptions.IntegrityError('Bad revision') def _deserialize_if_not_none(self, data): if data: @@ -70,8 +82,10 @@ class EtcdSession(inmemory_interface.InMemorySession): def _get(self, entity_name, obj_name): path = self._mkpath(entity_name, obj_name) try: - data = self.db.conn.read(path).value - return self._deserialize_if_not_none(data) + # Again, reads are not atomic in etcd and watchIndex is not useful. + data = self.db.conn.read(path) + self.revisions[path] = data.modifiedIndex + return self._deserialize_if_not_none(data.value) except KeyError: pass @@ -79,6 +93,8 @@ class EtcdSession(inmemory_interface.InMemorySession): path = self._mkpath(entity_name) for r in self.db.conn.read(path, recursive = True).kvs: if not r.dir: + curpath = r.key.replace(self.db.conn.key_endpoint,'') + self.revisions[curpath] = r.modifiedIndex yield self._deserialize_if_not_none(r.value) def commit(self): @@ -195,7 +211,11 @@ class EtcdInterface(base.DbInterface): """Query the audit log.""" # This is actually very expensive and this is why we have a limited number of slots path = os.path.join(self.root, '_audit') - data = self.conn.read(path, recursive=True) + try: + data = self.conn.read(path, recursive=True) + except KeyError: + # special case: no audit log present! + return [] log = [] for result in data.kvs: diff --git a/configdb/tests/db_api_test_base.py b/configdb/tests/db_api_test_base.py index e5302ec8688568461a6846097606d42f81228a2c..d987c1d941666bc202ec11ba5396e7001264a65f 100644 --- a/configdb/tests/db_api_test_base.py +++ b/configdb/tests/db_api_test_base.py @@ -185,6 +185,11 @@ class DbApiTestBase(object): 'host', {'name': 'host2', 'ip': '299.0.0.1'}, self.ctx) + def test_create_existing_object(self): + host_data = {'name': 'obz', 'ip': '2.3.4.5'} + self.assertRaises( + Exception, self.api.create,'host', host_data, self.ctx) + def test_update_ok(self): result = self.api.update('host', 'obz', {'ip': '2.3.4.5'}, self.ctx) self.assertTrue(result) diff --git a/configdb/tests/test_db_api_etcd.py b/configdb/tests/test_db_api_etcd.py index 525fa5dbf0864c13f43d6f094b1612b093de1d0b..3f11a4dc0c8dbcbec9de1586992910779fcba3ff 100644 --- a/configdb/tests/test_db_api_etcd.py +++ b/configdb/tests/test_db_api_etcd.py @@ -19,3 +19,9 @@ class EtcdInterfaceTest(DbApiTestBase, TestBase): def init_db(self): return etcd_interface.EtcdInterface( 'http://127.0.0.1:4001', self.get_schema(), self.TESTROOT) + + def tearDown(self): + try: + self.db.conn.delete(self.TESTROOT, recursive = True) + except KeyError: + pass diff --git a/configdb/tests/test_etcd_interface.py b/configdb/tests/test_etcd_interface.py index c4c4817927aa12e90d514d3d6cf33a637659d9e4..d9f1ee5e8ab48dc744ab879491ec9763113ea72d 100644 --- a/configdb/tests/test_etcd_interface.py +++ b/configdb/tests/test_etcd_interface.py @@ -19,3 +19,11 @@ class EtcdInterfaceTest(DbInterfaceTestBase, TestBase): def init_db(self): return etcd_interface.EtcdInterface( 'http://127.0.0.1:4001', self.get_schema(), self.TESTROOT) + + + def tearDown(self): + db = self.init_db() + try: + db.conn.delete(self.TESTROOT, recursive = True) + except KeyError: + pass