refactor db access
[matthijs/upstream/django-ldapdb.git] / examples / tests.py
index 51c4183e9ef12b9f1771275c3121681cf38c4607..65f051b80b20cbc2d99e4c8fd50d31605212b470 100644 (file)
 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 #
 
+from django.db import connections, router
+from django.db.models import Q
 from django.test import TestCase
 
 import ldap
 import ldapdb
+from ldapdb.backends.ldap.compiler import query_as_ldap
 from examples.models import LdapUser, LdapGroup
-
+      
 class BaseTestCase(TestCase):
     def setUp(self):
-        cursor = ldapdb.connection._cursor()
-        for base in [LdapGroup.base_dn, LdapUser.base_dn]:
-            rdn = base.split(',')[0]
+        for model in [LdapGroup, LdapUser]:
+            using = router.db_for_write(model)
+            connection = connections[using]
+
+            rdn = model.base_dn.split(',')[0]
             key, val = rdn.split('=')
             attrs = [('objectClass', ['top', 'organizationalUnit']), (key, [val])]
             try:
-                cursor.connection.add_s(base, attrs)
+                connection.add_s(model.base_dn, attrs)
             except ldap.ALREADY_EXISTS:
                 pass
 
     def tearDown(self):
-        cursor = ldapdb.connection._cursor()
-        for base in [LdapGroup.base_dn, LdapUser.base_dn]:
+        for model in [LdapGroup, LdapUser]:
+            using = router.db_for_write(model)
+            connection = connections[using]
+
             try:
-                results = cursor.connection.search_s(base, ldap.SCOPE_SUBTREE)
+                results = connection.search_s(model.base_dn, ldap.SCOPE_SUBTREE)
                 for dn, attrs in reversed(results):
-                    cursor.connection.delete_s(dn)
+                    connection.delete_s(dn)
             except ldap.NO_SUCH_OBJECT:
                 pass
 
@@ -97,6 +104,39 @@ class GroupTestCase(BaseTestCase):
         qs = LdapGroup.objects.all()
         self.assertEquals(len(qs), 3)
 
+    def test_ldap_filter(self):
+        # single filter
+        qs = LdapGroup.objects.filter(name='foogroup')
+        self.assertEquals(query_as_ldap(qs.query), '(&(objectClass=posixGroup)(cn=foogroup))')
+
+        qs = LdapGroup.objects.filter(Q(name='foogroup'))
+        self.assertEquals(query_as_ldap(qs.query), '(&(objectClass=posixGroup)(cn=foogroup))')
+
+        # AND filter
+        qs = LdapGroup.objects.filter(gid=1000, name='foogroup')
+        self.assertEquals(query_as_ldap(qs.query), '(&(objectClass=posixGroup)(&(gidNumber=1000)(cn=foogroup)))')
+
+        qs = LdapGroup.objects.filter(Q(gid=1000) & Q(name='foogroup'))
+        self.assertEquals(query_as_ldap(qs.query), '(&(objectClass=posixGroup)(&(gidNumber=1000)(cn=foogroup)))')
+
+        # OR filter
+        qs = LdapGroup.objects.filter(Q(gid=1000) | Q(name='foogroup'))
+        self.assertEquals(query_as_ldap(qs.query), '(&(objectClass=posixGroup)(|(gidNumber=1000)(cn=foogroup)))')
+
+        # single exclusion
+        qs = LdapGroup.objects.exclude(name='foogroup')
+        self.assertEquals(query_as_ldap(qs.query), '(&(objectClass=posixGroup)(!(cn=foogroup)))')
+        
+        qs = LdapGroup.objects.filter(~Q(name='foogroup'))
+        self.assertEquals(query_as_ldap(qs.query), '(&(objectClass=posixGroup)(!(cn=foogroup)))')
+
+        # multiple exclusion
+        qs = LdapGroup.objects.exclude(name='foogroup', gid=1000)
+        self.assertEquals(query_as_ldap(qs.query), '(&(objectClass=posixGroup)(!(&(gidNumber=1000)(cn=foogroup))))')
+
+        qs = LdapGroup.objects.filter(name='foogroup').exclude(gid=1000)
+        self.assertEquals(query_as_ldap(qs.query), '(&(objectClass=posixGroup)(&(cn=foogroup)(!(gidNumber=1000))))')
+
     def test_filter(self):
         qs = LdapGroup.objects.filter(name='foogroup')
         self.assertEquals(qs.count(), 1)
@@ -272,6 +312,41 @@ class UserTestCase(BaseTestCase):
         u.save()
         self.assertEquals(u.dn, 'uid=foouser2,%s' % LdapUser.base_dn)
 
+class ScopedTestCase(BaseTestCase):
+    def setUp(self):
+        super(ScopedTestCase, self).setUp()
+
+        self.scoped_dn = "ou=contacts,%s" % LdapGroup.base_dn
+        attrs = [('objectClass', ['top', 'organizationalUnit']), ("ou", ["contacts"])]
+        ldapdb.connection.add_s(self.scoped_dn, attrs)
+
+    def test_scope(self):
+        ScopedGroup = LdapGroup.scoped(self.scoped_dn)
+
+        # create group
+        g = LdapGroup()
+        g.name = "foogroup"
+        g.gid = 1000
+        g.save()
+
+        qs = LdapGroup.objects.all()
+        self.assertEquals(qs.count(), 1)
+
+        qs = ScopedGroup.objects.all()
+        self.assertEquals(qs.count(), 0)
+
+        # create scoped group
+        g2 = ScopedGroup()
+        g2.name = "scopedgroup"
+        g2.gid = 5000
+        g2.save()
+
+        qs = LdapGroup.objects.all()
+        self.assertEquals(qs.count(), 2)
+
+        qs = ScopedGroup.objects.all()
+        self.assertEquals(qs.count(), 1)
+
 class AdminTestCase(BaseTestCase):
     fixtures = ['test_users.json']
 
@@ -340,9 +415,17 @@ class AdminTestCase(BaseTestCase):
         self.assertContains(response, "foogroup")
         self.assertContains(response, "1000")
 
+    def test_group_add(self):
+        response = self.client.post('/admin/examples/ldapgroup/add/', {'gid': '1002', 'name': 'wizgroup'})
+        self.assertRedirects(response, '/admin/examples/ldapgroup/')
+        qs = LdapGroup.objects.all()
+        self.assertEquals(qs.count(), 3)
+
     def test_group_delete(self):
         response = self.client.post('/admin/examples/ldapgroup/foogroup/delete/', {'yes': 'post'})
         self.assertRedirects(response, '/admin/examples/ldapgroup/')
+        qs = LdapGroup.objects.all()
+        self.assertEquals(qs.count(), 1)
 
     def test_group_search(self):
         response = self.client.get('/admin/examples/ldapgroup/?q=foo')