# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
+from django.db import connections, router
+from django.db.models import Q
from django.test import TestCase
import ldap
import ldapdb
+from ldapdb.backends.ldap.compiler import query_as_ldap
from examples.models import LdapUser, LdapGroup
-
+
class BaseTestCase(TestCase):
def setUp(self):
- cursor = ldapdb.connection._cursor()
- for base in [LdapGroup.base_dn, LdapUser.base_dn]:
- rdn = base.split(',')[0]
+ for model in [LdapGroup, LdapUser]:
+ using = router.db_for_write(model)
+ connection = connections[using]
+
+ rdn = model.base_dn.split(',')[0]
key, val = rdn.split('=')
attrs = [('objectClass', ['top', 'organizationalUnit']), (key, [val])]
try:
- cursor.connection.add_s(base, attrs)
+ connection.add_s(model.base_dn, attrs)
except ldap.ALREADY_EXISTS:
pass
def tearDown(self):
- cursor = ldapdb.connection._cursor()
- for base in [LdapGroup.base_dn, LdapUser.base_dn]:
+ for model in [LdapGroup, LdapUser]:
+ using = router.db_for_write(model)
+ connection = connections[using]
+
try:
- results = cursor.connection.search_s(base, ldap.SCOPE_SUBTREE)
+ results = connection.search_s(model.base_dn, ldap.SCOPE_SUBTREE)
for dn, attrs in reversed(results):
- cursor.connection.delete_s(dn)
+ connection.delete_s(dn)
except ldap.NO_SUCH_OBJECT:
pass
qs = LdapGroup.objects.all()
self.assertEquals(len(qs), 3)
+ def test_ldap_filter(self):
+ # single filter
+ qs = LdapGroup.objects.filter(name='foogroup')
+ self.assertEquals(query_as_ldap(qs.query), '(&(objectClass=posixGroup)(cn=foogroup))')
+
+ qs = LdapGroup.objects.filter(Q(name='foogroup'))
+ self.assertEquals(query_as_ldap(qs.query), '(&(objectClass=posixGroup)(cn=foogroup))')
+
+ # AND filter
+ qs = LdapGroup.objects.filter(gid=1000, name='foogroup')
+ self.assertEquals(query_as_ldap(qs.query), '(&(objectClass=posixGroup)(&(gidNumber=1000)(cn=foogroup)))')
+
+ qs = LdapGroup.objects.filter(Q(gid=1000) & Q(name='foogroup'))
+ self.assertEquals(query_as_ldap(qs.query), '(&(objectClass=posixGroup)(&(gidNumber=1000)(cn=foogroup)))')
+
+ # OR filter
+ qs = LdapGroup.objects.filter(Q(gid=1000) | Q(name='foogroup'))
+ self.assertEquals(query_as_ldap(qs.query), '(&(objectClass=posixGroup)(|(gidNumber=1000)(cn=foogroup)))')
+
+ # single exclusion
+ qs = LdapGroup.objects.exclude(name='foogroup')
+ self.assertEquals(query_as_ldap(qs.query), '(&(objectClass=posixGroup)(!(cn=foogroup)))')
+
+ qs = LdapGroup.objects.filter(~Q(name='foogroup'))
+ self.assertEquals(query_as_ldap(qs.query), '(&(objectClass=posixGroup)(!(cn=foogroup)))')
+
+ # multiple exclusion
+ qs = LdapGroup.objects.exclude(name='foogroup', gid=1000)
+ self.assertEquals(query_as_ldap(qs.query), '(&(objectClass=posixGroup)(!(&(gidNumber=1000)(cn=foogroup))))')
+
+ qs = LdapGroup.objects.filter(name='foogroup').exclude(gid=1000)
+ self.assertEquals(query_as_ldap(qs.query), '(&(objectClass=posixGroup)(&(cn=foogroup)(!(gidNumber=1000))))')
+
def test_filter(self):
qs = LdapGroup.objects.filter(name='foogroup')
self.assertEquals(qs.count(), 1)
u.save()
self.assertEquals(u.dn, 'uid=foouser2,%s' % LdapUser.base_dn)
+class ScopedTestCase(BaseTestCase):
+ def setUp(self):
+ super(ScopedTestCase, self).setUp()
+
+ self.scoped_dn = "ou=contacts,%s" % LdapGroup.base_dn
+ attrs = [('objectClass', ['top', 'organizationalUnit']), ("ou", ["contacts"])]
+ ldapdb.connection.add_s(self.scoped_dn, attrs)
+
+ def test_scope(self):
+ ScopedGroup = LdapGroup.scoped(self.scoped_dn)
+
+ # create group
+ g = LdapGroup()
+ g.name = "foogroup"
+ g.gid = 1000
+ g.save()
+
+ qs = LdapGroup.objects.all()
+ self.assertEquals(qs.count(), 1)
+
+ qs = ScopedGroup.objects.all()
+ self.assertEquals(qs.count(), 0)
+
+ # create scoped group
+ g2 = ScopedGroup()
+ g2.name = "scopedgroup"
+ g2.gid = 5000
+ g2.save()
+
+ qs = LdapGroup.objects.all()
+ self.assertEquals(qs.count(), 2)
+
+ qs = ScopedGroup.objects.all()
+ self.assertEquals(qs.count(), 1)
+
class AdminTestCase(BaseTestCase):
fixtures = ['test_users.json']
self.assertContains(response, "foogroup")
self.assertContains(response, "1000")
+ def test_group_add(self):
+ response = self.client.post('/admin/examples/ldapgroup/add/', {'gid': '1002', 'name': 'wizgroup'})
+ self.assertRedirects(response, '/admin/examples/ldapgroup/')
+ qs = LdapGroup.objects.all()
+ self.assertEquals(qs.count(), 3)
+
def test_group_delete(self):
response = self.client.post('/admin/examples/ldapgroup/foogroup/delete/', {'yes': 'post'})
self.assertRedirects(response, '/admin/examples/ldapgroup/')
+ qs = LdapGroup.objects.all()
+ self.assertEquals(qs.count(), 1)
def test_group_search(self):
response = self.client.get('/admin/examples/ldapgroup/?q=foo')