diff --git a/configdb/tests/__init__.py b/configdb/tests/__init__.py index 2a523b0b54eb1c0d695ec4155de59081863be6d7..fde0f3f0f90dcb4e138a319341f38a9d29b2c04a 100644 --- a/configdb/tests/__init__.py +++ b/configdb/tests/__init__.py @@ -1,3 +1,4 @@ +import json import os import unittest import shutil @@ -51,3 +52,16 @@ class WsgiTestBase(TestBase): schema_file = os.path.join( os.path.dirname(__file__), schema_file) return self.create_app(SCHEMA_FILE=schema_file, **kwargs) + + def _login(self): + rv = self.app.post('/login', + data=json.dumps({'username': 'admin', + 'password': 'admin'}), + content_type='application/json') + self.assertEquals(200, rv.status_code) + + def _parse(self, rv): + self.assertEquals(200, rv.status_code) + data = json.loads(rv.data) + self.assertTrue(data['ok']) + return data['result'] diff --git a/configdb/tests/schema-large-noacl.json b/configdb/tests/schema-large-noacl.json new file mode 100644 index 0000000000000000000000000000000000000000..d61fc0d267b22cfe3b687b2b658275ecd2756125 --- /dev/null +++ b/configdb/tests/schema-large-noacl.json @@ -0,0 +1,189 @@ +{ + "user": { + "_help": "user model", + "name": { + "type": "string", + "index": true, + "nullable": false + }, + "uid": { + "type": "int", + "index": true + }, + "password": { + "type": "password", + "nullable": false + }, + "groups": { + "type": "relation", + "rel": "group", + "identifier": 1 + }, + "ssh_key": { + "type": "text" + }, + "home": { + "type": "string" + }, + "email": { + "type": "string" + }, + "shell": { + "type": "string", + "size": 32, + "default": "'/bin/bash'" + }, + "enabled": { + "type": "bool", + "default": true + }, + "created_at": { + "type": "datetime" + }, + "updated_at": { + "type": "datetime" + } + }, + "group": { + + "name": { + "type": "string", + "index": true, + "nullable": false + }, + "gid": { + "type": "int", + "index": true + }, + "created_at": { + "type": "datetime" + }, + "updated_at": { + "type": "datetime" + }, + "owners": { + "type": "relation", + "rel": "user" + }, + "users": { + "type": "relation", + "rel": "user", + "identifier": 1 + } + }, + "role": { + "name": { + "type": "string" + } + }, + "host_ssh_key": { + "name": { + "type": "string" + }, + "key": { + "type": "text" + }, + "type": { + "type": "string" + } + }, + "host": { + "name": { + "type" : "string" + }, + "server_type" :{ + "type" : "string", + "size": 16 + }, + "ip": { + "type": "string", + "size": 16, + "nullable": true, + "validator": "ip" + }, + "public_id": { + "type": "int" + }, + "cfengine_key": { + "type": "text" + }, + "root_ssh_key": { + "type": "text" + }, + "host_ssh_keys": { + "type": "relation", + "rel": "host_ssh_key" + }, + "created_at": { + "type": "datetime" + }, + "updated_at": { + "type": "datetime" + }, + "roles": { + "type": "relation", + "rel": "role" + }, + "sudo_users": { + "type": "relation", + "rel": "user", + "identifier": 1 + }, + "login_users": { + "type": "relation", + "rel": "user", + "identifier": 2 + }, + "sudo_groups": { + "type": "relation", + "rel": "group" + }, + "vpn_interfaces": { + "type": "relation", + "rel": "vpn_interface" + }, + "location": { + "type": "string", + "size": 64 + }, + "notes": { + "type": "text" + } + }, + "vpn_interface": { + "name": { + "type": "string", + "size": 128 + }, + "ip": { + "type": "string", + "size": 16, + "nullable": false, + "validator": "ip" + }, + "mtu": { + "type": "int" + }, + "rsa_key": { + "type": "text" + }, + "vpns": { + "type": "relation", + "rel": "vpn" + } + }, + "vpn": { + "name": { + "type": "string", + "size": 64 + }, + "network": { + "type": "string", + "size": 32 + }, + "port": { + "type": "int" + } + } + +} diff --git a/configdb/tests/test_wsgiapp.py b/configdb/tests/test_wsgiapp.py index 2da09763b2ada01be9d269079542a769a09847e9..40631b7034651a4c0e3a8a5864d3767f3f520bc5 100644 --- a/configdb/tests/test_wsgiapp.py +++ b/configdb/tests/test_wsgiapp.py @@ -36,19 +36,6 @@ class WsgiTest(WsgiTestBase): 'name': 'obz'}, acl.AuthContext('admin'), s) - def _parse(self, rv): - self.assertEquals(200, rv.status_code) - data = json.loads(rv.data) - self.assertTrue(data['ok']) - return data['result'] - - def _login(self): - rv = self.app.post('/login', - data=json.dumps({'username': 'admin', - 'password': 'admin'}), - content_type='application/json') - self.assertEquals(200, rv.status_code) - def test_config_without_schema_file_raises_exception(self): self.assertRaises(Exception, wsgiapp.make_app, {}) @@ -227,3 +214,4 @@ class WsgiTest(WsgiTestBase): data = json.loads(rv.data) self.assertEquals( {'ok': False, 'error': 'Authentication error'}, data) + diff --git a/configdb/tests/test_wsgiapp_complex.py b/configdb/tests/test_wsgiapp_complex.py new file mode 100644 index 0000000000000000000000000000000000000000..3ab176655b8f2823a2cdb3c12ba622c530e18223 --- /dev/null +++ b/configdb/tests/test_wsgiapp_complex.py @@ -0,0 +1,102 @@ +import json +import os +from werkzeug.exceptions import Forbidden +from datetime import datetime +from configdb.db import acl +from configdb.tests import * + + + +class WsgiComplexTest(WsgiTestBase): + + def setUp(self): + WsgiTestBase.setUp(self) + + app = self.create_app_with_schema('schema-large-noacl.json') + self.wsgiapp = app + self.app = app.test_client() + + db = app.api.db + with db.session() as s: + u1 = db.create('user', {'name': 'user1', + 'uid': 666, + 'password': 'pw', + 'email': 'user@example.com', + 'created_at': datetime(2006, 1, 1), + 'updated_at': datetime(2006, 1, 1), + }, s) + u2 = db.create('user', {'name': 'user2', + 'uid': 777, + 'password': 'pw', + 'email': 'user2@example.com', + 'created_at': datetime(2006, 2, 2), + 'updated_at': datetime(2006, 2, 2), + }, s) + g = db.create('group', {'name': 'group1', + 'gid': 2}, s) + g.owners.append(u1) + g.users.append(u1) + + r = db.create('role', {'name': 'server'}, s) + h = db.create('host', {'name': 'hosto', + 'server_type': 'vm', + 'ip': '1.2.3.4', + 'public_id': 1, + 'created_at': datetime(2007, 1, 1), + 'updated_at': datetime(2007, 1, 1), + 'notes': 'a test host', + }, s) + h.roles.append(r) + h.sudo_groups.append(g) + h.login_users.append(u1) + + def test_get_host(self): + self._login() + result = self._parse(self.app.get('/get/host/hosto')) + self.assertEquals('hosto', result['name']) + self.assertEquals('1.2.3.4', result['ip']) + self.assertEquals([u'server'], result['roles']) + + def test_host_add_and_remove_role(self): + self._login() + + # Create the new role. + rv = self._parse( + self.app.post('/create/role', + data=json.dumps({'name': 'role2'}), + content_type='application/json')) + self.assertTrue(rv) + + # Add it to the existing test host. Read/modify/update cycle. + host = self._parse(self.app.get('/get/host/hosto')) + host['roles'].append(u'role2') + rv = self._parse( + self.app.post('/update/host/hosto', + data=json.dumps(host), + content_type='application/json')) + self.assertTrue(rv) + + # Check the reference by using find() on the relation. + rv = self._parse( + self.app.post('/find/host', + data=json.dumps({'roles': {'type': 'eq', 'value': 'role2'}}), + content_type='application/json')) + + self.assertEquals(1, len(rv)) + self.assertEquals(u'hosto', rv[0]['name']) + + # Remove the role (skip reading back the host for simplicity). + host['roles'] = [u'server'] + rv = self._parse( + self.app.post('/update/host/hosto', + data=json.dumps(host), + content_type='application/json')) + self.assertTrue(rv) + + # Verify that the association is gone by running find() again. + rv = self._parse( + self.app.post('/find/host', + data=json.dumps({'roles': {'type': 'eq', 'value': 'role2'}}), + content_type='application/json')) + + self.assertEquals(0, len(rv))