refactor LDAP access in tests
[matthijs/upstream/django-ldapdb.git] / examples / tests.py
index 1b4a42ca4ec11f514a2328c6adaabfd7c78a2874..e8f386e037fa6ec674f6004da9230965869245c6 100644 (file)
 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 #
 
+from django.db import connections, router
 from django.db.models import Q
 from django.test import TestCase
 
 import ldap
-import ldapdb
+
 from ldapdb.backends.ldap.compiler import query_as_ldap
 from examples.models import LdapUser, LdapGroup
 
 class BaseTestCase(TestCase):
+    def _add_base_dn(self, model):
+        using = router.db_for_write(model)
+        connection = connections[using]
+
+        rdn = model.base_dn.split(',')[0]
+        key, val = rdn.split('=')
+        attrs = [('objectClass', ['top', 'organizationalUnit']), (key, [val])]
+        try:
+            connection.add_s(model.base_dn, attrs)
+        except ldap.ALREADY_EXISTS:
+            pass
+
+    def _remove_base_dn(self, model):
+        using = router.db_for_write(model)
+        connection = connections[using]
+
+        try:
+            results = connection.search_s(model.base_dn, ldap.SCOPE_SUBTREE)
+            for dn, attrs in reversed(results):
+                connection.delete_s(dn)
+        except ldap.NO_SUCH_OBJECT:
+            pass
+
     def setUp(self):
-        cursor = ldapdb.connection._cursor()
-        for dn in [LdapGroup.base_dn, LdapUser.base_dn]:
-            rdn = dn.split(',')[0]
-            key, val = rdn.split('=')
-            attrs = [('objectClass', ['top', 'organizationalUnit']), (key, [val])]
-            try:
-                cursor.connection.add_s(dn, attrs)
-            except ldap.ALREADY_EXISTS:
-                pass
+        for model in [LdapGroup, LdapUser]:
+            self._add_base_dn(model)
 
     def tearDown(self):
-        cursor = ldapdb.connection._cursor()
-        for base in [LdapGroup.base_dn, LdapUser.base_dn]:
-            try:
-                results = cursor.connection.search_s(base, ldap.SCOPE_SUBTREE)
-                for dn, attrs in reversed(results):
-                    cursor.connection.delete_s(dn)
-            except ldap.NO_SUCH_OBJECT:
-                pass
+        for model in [LdapGroup, LdapUser]:
+            self._remove_base_dn(model)
 
 class GroupTestCase(BaseTestCase):
     def setUp(self):
@@ -311,13 +322,15 @@ class ScopedTestCase(BaseTestCase):
     def setUp(self):
         super(ScopedTestCase, self).setUp()
 
-        cursor = ldapdb.connection._cursor()
-        self.scoped_dn = "ou=contacts,%s" % LdapGroup.base_dn
-        attrs = [('objectClass', ['top', 'organizationalUnit']), ("ou", ["contacts"])]
-        cursor.connection.add_s(self.scoped_dn, attrs)
+        self.scoped_model = LdapGroup.scoped("ou=contacts,%s" % LdapGroup.base_dn)
+        self._add_base_dn(self.scoped_model)
+
+    def tearDown(self):
+        self._remove_base_dn(self.scoped_model)
+        super(ScopedTestCase, self).tearDown()
 
     def test_scope(self):
-        ScopedGroup = LdapGroup.scoped(self.scoped_dn)
+        ScopedGroup = self.scoped_model
 
         # create group
         g = LdapGroup()