X-Git-Url: https://git.stderr.nl/gitweb?p=matthijs%2Fupstream%2Fdjango-ldapdb.git;a=blobdiff_plain;f=examples%2Ftests.py;h=65f051b80b20cbc2d99e4c8fd50d31605212b470;hp=51c4183e9ef12b9f1771275c3121681cf38c4607;hb=649f74436527abbe004a04856a1be212b972057b;hpb=baba34a193fe7d304d86003b19f36ed638c7ec80 diff --git a/examples/tests.py b/examples/tests.py index 51c4183..65f051b 100644 --- a/examples/tests.py +++ b/examples/tests.py @@ -32,31 +32,38 @@ # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # +from django.db import connections, router +from django.db.models import Q from django.test import TestCase import ldap import ldapdb +from ldapdb.backends.ldap.compiler import query_as_ldap from examples.models import LdapUser, LdapGroup - + class BaseTestCase(TestCase): def setUp(self): - cursor = ldapdb.connection._cursor() - for base in [LdapGroup.base_dn, LdapUser.base_dn]: - rdn = base.split(',')[0] + for model in [LdapGroup, LdapUser]: + using = router.db_for_write(model) + connection = connections[using] + + rdn = model.base_dn.split(',')[0] key, val = rdn.split('=') attrs = [('objectClass', ['top', 'organizationalUnit']), (key, [val])] try: - cursor.connection.add_s(base, attrs) + connection.add_s(model.base_dn, attrs) except ldap.ALREADY_EXISTS: pass def tearDown(self): - cursor = ldapdb.connection._cursor() - for base in [LdapGroup.base_dn, LdapUser.base_dn]: + for model in [LdapGroup, LdapUser]: + using = router.db_for_write(model) + connection = connections[using] + try: - results = cursor.connection.search_s(base, ldap.SCOPE_SUBTREE) + results = connection.search_s(model.base_dn, ldap.SCOPE_SUBTREE) for dn, attrs in reversed(results): - cursor.connection.delete_s(dn) + connection.delete_s(dn) except ldap.NO_SUCH_OBJECT: pass @@ -97,6 +104,39 @@ class GroupTestCase(BaseTestCase): qs = LdapGroup.objects.all() self.assertEquals(len(qs), 3) + def test_ldap_filter(self): + # single filter + qs = LdapGroup.objects.filter(name='foogroup') + self.assertEquals(query_as_ldap(qs.query), '(&(objectClass=posixGroup)(cn=foogroup))') + + qs = LdapGroup.objects.filter(Q(name='foogroup')) + self.assertEquals(query_as_ldap(qs.query), '(&(objectClass=posixGroup)(cn=foogroup))') + + # AND filter + qs = LdapGroup.objects.filter(gid=1000, name='foogroup') + self.assertEquals(query_as_ldap(qs.query), '(&(objectClass=posixGroup)(&(gidNumber=1000)(cn=foogroup)))') + + qs = LdapGroup.objects.filter(Q(gid=1000) & Q(name='foogroup')) + self.assertEquals(query_as_ldap(qs.query), '(&(objectClass=posixGroup)(&(gidNumber=1000)(cn=foogroup)))') + + # OR filter + qs = LdapGroup.objects.filter(Q(gid=1000) | Q(name='foogroup')) + self.assertEquals(query_as_ldap(qs.query), '(&(objectClass=posixGroup)(|(gidNumber=1000)(cn=foogroup)))') + + # single exclusion + qs = LdapGroup.objects.exclude(name='foogroup') + self.assertEquals(query_as_ldap(qs.query), '(&(objectClass=posixGroup)(!(cn=foogroup)))') + + qs = LdapGroup.objects.filter(~Q(name='foogroup')) + self.assertEquals(query_as_ldap(qs.query), '(&(objectClass=posixGroup)(!(cn=foogroup)))') + + # multiple exclusion + qs = LdapGroup.objects.exclude(name='foogroup', gid=1000) + self.assertEquals(query_as_ldap(qs.query), '(&(objectClass=posixGroup)(!(&(gidNumber=1000)(cn=foogroup))))') + + qs = LdapGroup.objects.filter(name='foogroup').exclude(gid=1000) + self.assertEquals(query_as_ldap(qs.query), '(&(objectClass=posixGroup)(&(cn=foogroup)(!(gidNumber=1000))))') + def test_filter(self): qs = LdapGroup.objects.filter(name='foogroup') self.assertEquals(qs.count(), 1) @@ -272,6 +312,41 @@ class UserTestCase(BaseTestCase): u.save() self.assertEquals(u.dn, 'uid=foouser2,%s' % LdapUser.base_dn) +class ScopedTestCase(BaseTestCase): + def setUp(self): + super(ScopedTestCase, self).setUp() + + self.scoped_dn = "ou=contacts,%s" % LdapGroup.base_dn + attrs = [('objectClass', ['top', 'organizationalUnit']), ("ou", ["contacts"])] + ldapdb.connection.add_s(self.scoped_dn, attrs) + + def test_scope(self): + ScopedGroup = LdapGroup.scoped(self.scoped_dn) + + # create group + g = LdapGroup() + g.name = "foogroup" + g.gid = 1000 + g.save() + + qs = LdapGroup.objects.all() + self.assertEquals(qs.count(), 1) + + qs = ScopedGroup.objects.all() + self.assertEquals(qs.count(), 0) + + # create scoped group + g2 = ScopedGroup() + g2.name = "scopedgroup" + g2.gid = 5000 + g2.save() + + qs = LdapGroup.objects.all() + self.assertEquals(qs.count(), 2) + + qs = ScopedGroup.objects.all() + self.assertEquals(qs.count(), 1) + class AdminTestCase(BaseTestCase): fixtures = ['test_users.json'] @@ -340,9 +415,17 @@ class AdminTestCase(BaseTestCase): self.assertContains(response, "foogroup") self.assertContains(response, "1000") + def test_group_add(self): + response = self.client.post('/admin/examples/ldapgroup/add/', {'gid': '1002', 'name': 'wizgroup'}) + self.assertRedirects(response, '/admin/examples/ldapgroup/') + qs = LdapGroup.objects.all() + self.assertEquals(qs.count(), 3) + def test_group_delete(self): response = self.client.post('/admin/examples/ldapgroup/foogroup/delete/', {'yes': 'post'}) self.assertRedirects(response, '/admin/examples/ldapgroup/') + qs = LdapGroup.objects.all() + self.assertEquals(qs.count(), 1) def test_group_search(self): response = self.client.get('/admin/examples/ldapgroup/?q=foo')