# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
+from django.db import connections, router
from django.db.models import Q
from django.test import TestCase
import ldap
-import ldapdb
+
+from ldapdb.backends.ldap.compiler import query_as_ldap
from examples.models import LdapUser, LdapGroup
class BaseTestCase(TestCase):
+ def _add_base_dn(self, model):
+ using = router.db_for_write(model)
+ connection = connections[using]
+
+ rdn = model.base_dn.split(',')[0]
+ key, val = rdn.split('=')
+ attrs = [('objectClass', ['top', 'organizationalUnit']), (key, [val])]
+ try:
+ connection.add_s(model.base_dn, attrs)
+ except ldap.ALREADY_EXISTS:
+ pass
+
+ def _remove_base_dn(self, model):
+ using = router.db_for_write(model)
+ connection = connections[using]
+
+ try:
+ results = connection.search_s(model.base_dn, ldap.SCOPE_SUBTREE)
+ for dn, attrs in reversed(results):
+ connection.delete_s(dn)
+ except ldap.NO_SUCH_OBJECT:
+ pass
+
def setUp(self):
- cursor = ldapdb.connection._cursor()
- for dn in [LdapGroup.base_dn, LdapUser.base_dn]:
- rdn = dn.split(',')[0]
- key, val = rdn.split('=')
- attrs = [('objectClass', ['top', 'organizationalUnit']), (key, [val])]
- try:
- cursor.connection.add_s(dn, attrs)
- except ldap.ALREADY_EXISTS:
- pass
+ for model in [LdapGroup, LdapUser]:
+ self._add_base_dn(model)
def tearDown(self):
- cursor = ldapdb.connection._cursor()
- for base in [LdapGroup.base_dn, LdapUser.base_dn]:
- try:
- results = cursor.connection.search_s(base, ldap.SCOPE_SUBTREE)
- for dn, attrs in reversed(results):
- cursor.connection.delete_s(dn)
- except ldap.NO_SUCH_OBJECT:
- pass
+ for model in [LdapGroup, LdapUser]:
+ self._remove_base_dn(model)
class GroupTestCase(BaseTestCase):
def setUp(self):
def test_ldap_filter(self):
# single filter
qs = LdapGroup.objects.filter(name='foogroup')
- self.assertEquals(qs.query._ldap_filter(), '(&(objectClass=posixGroup)(cn=foogroup))')
+ self.assertEquals(query_as_ldap(qs.query), '(&(objectClass=posixGroup)(cn=foogroup))')
qs = LdapGroup.objects.filter(Q(name='foogroup'))
- self.assertEquals(qs.query._ldap_filter(), '(&(objectClass=posixGroup)(cn=foogroup))')
+ self.assertEquals(query_as_ldap(qs.query), '(&(objectClass=posixGroup)(cn=foogroup))')
# AND filter
qs = LdapGroup.objects.filter(gid=1000, name='foogroup')
- self.assertEquals(qs.query._ldap_filter(), '(&(objectClass=posixGroup)(&(gidNumber=1000)(cn=foogroup)))')
+ self.assertEquals(query_as_ldap(qs.query), '(&(objectClass=posixGroup)(&(gidNumber=1000)(cn=foogroup)))')
qs = LdapGroup.objects.filter(Q(gid=1000) & Q(name='foogroup'))
- self.assertEquals(qs.query._ldap_filter(), '(&(objectClass=posixGroup)(&(gidNumber=1000)(cn=foogroup)))')
+ self.assertEquals(query_as_ldap(qs.query), '(&(objectClass=posixGroup)(&(gidNumber=1000)(cn=foogroup)))')
# OR filter
qs = LdapGroup.objects.filter(Q(gid=1000) | Q(name='foogroup'))
- self.assertEquals(qs.query._ldap_filter(), '(&(objectClass=posixGroup)(|(gidNumber=1000)(cn=foogroup)))')
+ self.assertEquals(query_as_ldap(qs.query), '(&(objectClass=posixGroup)(|(gidNumber=1000)(cn=foogroup)))')
# single exclusion
qs = LdapGroup.objects.exclude(name='foogroup')
- self.assertEquals(qs.query._ldap_filter(), '(&(objectClass=posixGroup)(!(cn=foogroup)))')
+ self.assertEquals(query_as_ldap(qs.query), '(&(objectClass=posixGroup)(!(cn=foogroup)))')
qs = LdapGroup.objects.filter(~Q(name='foogroup'))
- self.assertEquals(qs.query._ldap_filter(), '(&(objectClass=posixGroup)(!(cn=foogroup)))')
+ self.assertEquals(query_as_ldap(qs.query), '(&(objectClass=posixGroup)(!(cn=foogroup)))')
# multiple exclusion
qs = LdapGroup.objects.exclude(name='foogroup', gid=1000)
- self.assertEquals(qs.query._ldap_filter(), '(&(objectClass=posixGroup)(!(&(gidNumber=1000)(cn=foogroup))))')
+ self.assertEquals(query_as_ldap(qs.query), '(&(objectClass=posixGroup)(!(&(gidNumber=1000)(cn=foogroup))))')
qs = LdapGroup.objects.filter(name='foogroup').exclude(gid=1000)
- self.assertEquals(qs.query._ldap_filter(), '(&(objectClass=posixGroup)(&(cn=foogroup)(!(gidNumber=1000))))')
+ self.assertEquals(query_as_ldap(qs.query), '(&(objectClass=posixGroup)(&(cn=foogroup)(!(gidNumber=1000))))')
def test_filter(self):
qs = LdapGroup.objects.filter(name='foogroup')
def setUp(self):
super(ScopedTestCase, self).setUp()
- cursor = ldapdb.connection._cursor()
- self.scoped_dn = "ou=contacts,%s" % LdapGroup.base_dn
- attrs = [('objectClass', ['top', 'organizationalUnit']), ("ou", ["contacts"])]
- cursor.connection.add_s(self.scoped_dn, attrs)
+ self.scoped_model = LdapGroup.scoped("ou=contacts,%s" % LdapGroup.base_dn)
+ self._add_base_dn(self.scoped_model)
+
+ def tearDown(self):
+ self._remove_base_dn(self.scoped_model)
+ super(ScopedTestCase, self).tearDown()
def test_scope(self):
- ScopedGroup = LdapGroup.scoped(self.scoped_dn)
+ ScopedGroup = self.scoped_model
# create group
g = LdapGroup()