Merge pull request #17439 from gavindsouza/ldap-user_type

feat: Allow setting "User Type" for LDAP user creation
This commit is contained in:
gavin 2022-07-19 15:40:49 +05:30 committed by GitHub
commit cf7cb387f3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 209 additions and 281 deletions

View file

@ -1,4 +1,5 @@
{
"actions": [],
"creation": "2019-05-29 01:24:29.585060",
"doctype": "DocType",
"editable_grid": 1,
@ -19,13 +20,14 @@
"fieldname": "erpnext_role",
"fieldtype": "Link",
"in_list_view": 1,
"label": "ERPNext Role",
"label": "User Role",
"options": "Role",
"reqd": 1
}
],
"istable": 1,
"modified": "2019-07-15 06:46:38.050408",
"links": [],
"modified": "2022-07-07 16:28:44.828514",
"modified_by": "Administrator",
"module": "Integrations",
"name": "LDAP Group Mapping",
@ -34,5 +36,6 @@
"quick_entry": 1,
"sort_field": "modified",
"sort_order": "ASC",
"states": [],
"track_changes": 1
}

View file

@ -42,7 +42,10 @@
"column_break_33",
"ldap_group_member_attribute",
"ldap_group_mappings_section",
"default_user_type",
"column_break_38",
"default_role",
"section_break_40",
"ldap_groups",
"ldap_group_field"
],
@ -79,9 +82,11 @@
"reqd": 1
},
{
"depends_on": "eval: doc.default_user_type == \"System User\"",
"fieldname": "default_role",
"fieldtype": "Link",
"label": "Default Role on Creation",
"label": "Default User Role",
"mandatory_depends_on": "eval: doc.default_user_type == \"System User\"",
"options": "Role",
"reqd": 1
},
@ -249,10 +254,10 @@
"label": "Group Object Class"
},
{
"description": "string value, i.e. {0} or uid={0},ou=users,dc=example,dc=com",
"fieldname": "ldap_custom_group_search",
"fieldtype": "Data",
"label": "Custom Group Search"
"description": "string value, i.e. {0} or uid={0},ou=users,dc=example,dc=com",
"fieldname": "ldap_custom_group_search",
"fieldtype": "Data",
"label": "Custom Group Search"
},
{
"description": "Requires any valid fdn path. i.e. ou=users,dc=example,dc=com",
@ -268,12 +273,28 @@
"fieldtype": "Data",
"label": "LDAP search path for Groups",
"reqd": 1
},
{
"fieldname": "default_user_type",
"fieldtype": "Link",
"label": "Default User Type",
"options": "User Type",
"reqd": 1
},
{
"fieldname": "column_break_38",
"fieldtype": "Column Break"
},
{
"fieldname": "section_break_40",
"fieldtype": "Section Break",
"hide_border": 1
}
],
"in_create": 1,
"issingle": 1,
"links": [],
"modified": "2021-07-27 11:51:43.328271",
"modified": "2022-07-07 16:51:46.230793",
"modified_by": "Administrator",
"module": "Integrations",
"name": "LDAP Settings",
@ -294,5 +315,6 @@
"read_only": 1,
"sort_field": "modified",
"sort_order": "DESC",
"states": [],
"track_changes": 1
}

View file

@ -1,19 +1,37 @@
# Copyright (c) 2015, Frappe Technologies and contributors
# Copyright (c) 2022, Frappe Technologies and contributors
# License: MIT. See LICENSE
import ssl
from typing import TYPE_CHECKING
import ldap3
from ldap3 import AUTO_BIND_TLS_BEFORE_BIND, HASHED_SALTED_SHA, MODIFY_REPLACE
from ldap3.abstract.entry import Entry
from ldap3.core.exceptions import (
LDAPAttributeError,
LDAPInvalidCredentialsResult,
LDAPInvalidFilterError,
LDAPNoSuchObjectResult,
)
from ldap3.utils.hashed import hashed
import frappe
from frappe import _, safe_encode
from frappe.model.document import Document
from frappe.twofactor import authenticate_for_2factor, confirm_otp_token, should_run_2fa
if TYPE_CHECKING:
from frappe.core.doctype.user.user import User
class LDAPSettings(Document):
def validate(self):
self.default_user_type = self.default_user_type or "System User"
if not self.enabled:
return
if not self.flags.ignore_mandatory:
if (
self.ldap_search_string.count("(") == self.ldap_search_string.count(")")
and self.ldap_search_string.startswith("(")
@ -28,8 +46,6 @@ class LDAPSettings(Document):
try:
if conn.result["type"] == "bindResponse" and self.base_dn:
import ldap3
conn.search(
search_base=self.ldap_search_path_user,
search_filter="(objectClass=*)",
@ -40,13 +56,13 @@ class LDAPSettings(Document):
search_base=self.ldap_search_path_group, search_filter="(objectClass=*)", attributes=["cn"]
)
except ldap3.core.exceptions.LDAPAttributeError as ex:
except LDAPAttributeError as ex:
frappe.throw(
_("LDAP settings incorrect. validation response was: {0}").format(ex),
title=_("Misconfigured"),
)
except ldap3.core.exceptions.LDAPNoSuchObjectResult:
except LDAPNoSuchObjectResult:
frappe.throw(
_("Ensure the user and group search paths are correct."), title=_("Misconfigured")
)
@ -75,12 +91,8 @@ class LDAPSettings(Document):
)
)
def connect_to_ldap(self, base_dn, password, read_only=True):
def connect_to_ldap(self, base_dn, password, read_only=True) -> ldap3.Connection:
try:
import ssl
import ldap3
if self.require_trusted_certificate == "Yes":
tls_configuration = ldap3.Tls(validate=ssl.CERT_REQUIRED, version=ssl.PROTOCOL_TLS_CLIENT)
else:
@ -94,9 +106,9 @@ class LDAPSettings(Document):
tls_configuration.ca_certs_file = self.local_ca_certs_file
server = ldap3.Server(host=self.ldap_server_url, tls=tls_configuration)
bind_type = ldap3.AUTO_BIND_TLS_BEFORE_BIND if self.ssl_tls_mode == "StartTLS" else True
bind_type = AUTO_BIND_TLS_BEFORE_BIND if self.ssl_tls_mode == "StartTLS" else True
conn = ldap3.Connection(
return ldap3.Connection(
server=server,
user=base_dn,
password=password,
@ -105,18 +117,16 @@ class LDAPSettings(Document):
raise_exceptions=True,
)
return conn
except ImportError:
msg = _("Please Install the ldap3 library via pip to use ldap functionality.")
frappe.throw(msg, title=_("LDAP Not Installed"))
except ldap3.core.exceptions.LDAPInvalidCredentialsResult:
except LDAPInvalidCredentialsResult:
frappe.throw(_("Invalid username or password"))
except Exception as ex:
frappe.throw(_(str(ex)))
@staticmethod
def get_ldap_client_settings():
def get_ldap_client_settings() -> dict:
# return the settings to be used on the client side.
result = {"enabled": False}
ldap = frappe.get_cached_doc("LDAP Settings")
@ -126,21 +136,19 @@ class LDAPSettings(Document):
return result
@classmethod
def update_user_fields(cls, user, user_data):
def update_user_fields(cls, user: "User", user_data: dict):
updatable_data = {key: value for key, value in user_data.items() if key != "email"}
for key, value in updatable_data.items():
setattr(user, key, value)
user.save(ignore_permissions=True)
def sync_roles(self, user, additional_groups=None):
def sync_roles(self, user: "User", additional_groups: list = None):
current_roles = {d.role for d in user.get("roles")}
needed_roles = set()
needed_roles.add(self.default_role)
if self.default_user_type == "System User":
needed_roles = {self.default_role}
else:
needed_roles = set()
lower_groups = [g.lower() for g in additional_groups or []]
all_mapped_roles = {r.erpnext_role for r in self.ldap_groups}
@ -157,28 +165,31 @@ class LDAPSettings(Document):
user.remove_roles(*roles_to_remove)
def create_or_update_user(self, user_data, groups=None):
user = None
def create_or_update_user(self, user_data: dict, groups: list = None):
user: "User" = None
role: str = None
if frappe.db.exists("User", user_data["email"]):
user = frappe.get_doc("User", user_data["email"])
LDAPSettings.update_user_fields(user=user, user_data=user_data)
else:
doc = user_data
doc.update(
{
"doctype": "User",
"send_welcome_email": 0,
"language": "",
"user_type": "System User",
# "roles": [{
# "role": self.default_role
# }]
}
)
doc = user_data | {
"doctype": "User",
"send_welcome_email": 0,
"language": "",
"user_type": self.default_user_type,
}
user = frappe.get_doc(doc)
user.insert(ignore_permissions=True)
# always add default role.
user.add_roles(self.default_role)
if self.default_user_type == "System User":
role = self.default_role
else:
role = frappe.db.get_value("User Type", user.user_type, "role")
if role:
user.add_roles(role)
self.sync_roles(user, groups)
return user
@ -203,38 +214,28 @@ class LDAPSettings(Document):
return ldap_attributes
def fetch_ldap_groups(self, user, conn):
import ldap3
def fetch_ldap_groups(self, user: Entry, conn: ldap3.Connection) -> list:
if not isinstance(user, Entry):
raise TypeError("Invalid type, attribute 'user' must be of type 'ldap3.abstract.entry.Entry'")
if type(user) is not ldap3.abstract.entry.Entry:
raise TypeError(
"Invalid type, attribute {} must be of type '{}'".format("user", "ldap3.abstract.entry.Entry")
)
if type(conn) is not ldap3.core.connection.Connection:
raise TypeError(
"Invalid type, attribute {} must be of type '{}'".format("conn", "ldap3.Connection")
)
if not isinstance(conn, ldap3.Connection):
raise TypeError("Invalid type, attribute 'conn' must be of type 'ldap3.Connection'")
fetch_ldap_groups = None
ldap_object_class = None
ldap_group_members_attribute = None
if self.ldap_directory_server.lower() == "active directory":
ldap_object_class = "Group"
ldap_group_members_attribute = "member"
user_search_str = user.entry_dn
elif self.ldap_directory_server.lower() == "openldap":
ldap_object_class = "posixgroup"
ldap_group_members_attribute = "memberuid"
user_search_str = getattr(user, self.ldap_username_field).value
elif self.ldap_directory_server.lower() == "custom":
ldap_object_class = self.ldap_group_objectclass
ldap_group_members_attribute = self.ldap_group_member_attribute
ldap_custom_group_search = self.ldap_custom_group_search or "{0}"
@ -245,39 +246,31 @@ class LDAPSettings(Document):
# this path will be hit for everyone with preconfigured ldap settings. this must be taken into account so as not to break ldap for those users.
if self.ldap_group_field:
fetch_ldap_groups = getattr(user, self.ldap_group_field).values
if ldap_object_class is not None:
conn.search(
search_base=self.ldap_search_path_group,
search_filter="(&(objectClass={})({}={}))".format(
ldap_object_class, ldap_group_members_attribute, user_search_str
),
search_filter=f"(&(objectClass={ldap_object_class})({ldap_group_members_attribute}={user_search_str}))",
attributes=["cn"],
) # Build search query
if len(conn.entries) >= 1:
fetch_ldap_groups = []
for group in conn.entries:
fetch_ldap_groups.append(group["cn"].value)
return fetch_ldap_groups
def authenticate(self, username, password):
def authenticate(self, username: str, password: str):
if not self.enabled:
frappe.throw(_("LDAP is not enabled."))
user_filter = self.ldap_search_string.format(username)
ldap_attributes = self.get_ldap_attributes()
conn = self.connect_to_ldap(self.base_dn, self.get_password(raise_exception=False))
try:
import ldap3
conn.search(
search_base=self.ldap_search_path_user,
search_filter=f"{user_filter}",
@ -286,26 +279,21 @@ class LDAPSettings(Document):
if len(conn.entries) == 1 and conn.entries[0]:
user = conn.entries[0]
groups = self.fetch_ldap_groups(user, conn)
# only try and connect as the user, once we have their fqdn entry.
if user.entry_dn and password and conn.rebind(user=user.entry_dn, password=password):
return self.create_or_update_user(self.convert_ldap_entry_to_dict(user), groups=groups)
raise ldap3.core.exceptions.LDAPInvalidCredentialsResult # even though nothing foundor failed authentication raise invalid credentials
raise LDAPInvalidCredentialsResult # even though nothing foundor failed authentication raise invalid credentials
except ldap3.core.exceptions.LDAPInvalidFilterError:
except LDAPInvalidFilterError:
frappe.throw(_("Please use a valid LDAP search filter"), title=_("Misconfigured"))
except ldap3.core.exceptions.LDAPInvalidCredentialsResult:
except LDAPInvalidCredentialsResult:
frappe.throw(_("Invalid username or password"))
def reset_password(self, user, password, logout_sessions=False):
from ldap3 import HASHED_SALTED_SHA, MODIFY_REPLACE
from ldap3.utils.hashed import hashed
search_filter = f"({self.ldap_email_field}={user})"
conn = self.connect_to_ldap(
@ -334,8 +322,7 @@ class LDAPSettings(Document):
else:
frappe.throw(_("No LDAP User found for email: {0}").format(user))
def convert_ldap_entry_to_dict(self, user_entry):
def convert_ldap_entry_to_dict(self, user_entry: Entry):
# support multiple email values
email = user_entry[self.ldap_email_field]
@ -346,7 +333,6 @@ class LDAPSettings(Document):
}
# optional fields
if self.ldap_middle_name_field:
data["middle_name"] = user_entry[self.ldap_middle_name_field].value
@ -366,7 +352,7 @@ class LDAPSettings(Document):
def login():
# LDAP LOGIN LOGIC
args = frappe.form_dict
ldap = frappe.get_doc("LDAP Settings")
ldap: LDAPSettings = frappe.get_doc("LDAP Settings")
user = ldap.authenticate(frappe.as_unicode(args.usr), frappe.as_unicode(args.pwd))
@ -383,7 +369,7 @@ def login():
@frappe.whitelist()
def reset_password(user, password, logout):
ldap = frappe.get_doc("LDAP Settings")
ldap: LDAPSettings = frappe.get_doc("LDAP Settings")
if not ldap.enabled:
frappe.throw(_("LDAP is not enabled."))
ldap.reset_password(user, password, logout_sessions=int(logout))

View file

@ -1,15 +1,16 @@
# Copyright (c) 2019, Frappe Technologies and Contributors
# Copyright (c) 2022, Frappe Technologies and Contributors
# License: MIT. See LICENSE
import contextlib
import functools
import os
import ssl
import unittest
from unittest import mock
from unittest import TestCase, mock
import ldap3
from ldap3 import MOCK_SYNC, OFFLINE_AD_2012_R2, OFFLINE_SLAPD_2_4, Connection, Server
import frappe
from frappe.exceptions import MandatoryError, ValidationError
from frappe.integrations.doctype.ldap_settings.ldap_settings import LDAPSettings
@ -22,15 +23,19 @@ class LDAP_TestCase:
LDAP_LDIF_JSON = None
TEST_VALUES_LDAP_COMPLEX_SEARCH_STRING = None
# for adding type hints during development ^_^
assertTrue = TestCase.assertTrue
assertEqual = TestCase.assertEqual
assertIn = TestCase.assertIn
def mock_ldap_connection(f):
@functools.wraps(f)
def wrapped(self, *args, **kwargs):
with mock.patch(
"frappe.integrations.doctype.ldap_settings.ldap_settings.LDAPSettings.connect_to_ldap"
) as mock_connection:
mock_connection.return_value = self.connection
"frappe.integrations.doctype.ldap_settings.ldap_settings.LDAPSettings.connect_to_ldap",
return_value=self.connection,
):
self.test_class = LDAPSettings(self.doc)
# Create a clean doc
@ -47,80 +52,66 @@ class LDAP_TestCase:
return wrapped
def clean_test_users():
try: # clean up test user 1
with contextlib.suppress(Exception):
frappe.get_doc("User", "posix.user1@unit.testing").delete()
except Exception:
pass
try: # clean up test user 2
with contextlib.suppress(Exception):
frappe.get_doc("User", "posix.user2@unit.testing").delete()
except Exception:
pass
with contextlib.suppress(Exception):
frappe.get_doc("User", "website_ldap_user@test.com").delete()
@classmethod
def setUpClass(self, ldapServer="OpenLDAP"):
self.clean_test_users()
def setUpClass(cls):
cls.clean_test_users()
# Save user data for restoration in tearDownClass()
self.user_ldap_settings = frappe.get_doc("LDAP Settings")
cls.user_ldap_settings = frappe.get_doc("LDAP Settings")
# Create test user1
self.user1doc = {
cls.user1doc = {
"username": "posix.user",
"email": "posix.user1@unit.testing",
"first_name": "posix",
"doctype": "User",
"send_welcome_email": 0,
"language": "",
"user_type": "System User",
}
self.user1doc.update(
{
"doctype": "User",
"send_welcome_email": 0,
"language": "",
"user_type": "System User",
}
)
user = frappe.get_doc(self.user1doc)
user = frappe.get_doc(cls.user1doc)
user.insert(ignore_permissions=True)
# Create test user1
self.user2doc = {
cls.user2doc = {
"username": "posix.user2",
"email": "posix.user2@unit.testing",
"first_name": "posix",
"doctype": "User",
"send_welcome_email": 0,
"language": "",
"user_type": "System User",
}
self.user2doc.update(
{
"doctype": "User",
"send_welcome_email": 0,
"language": "",
"user_type": "System User",
}
)
user = frappe.get_doc(self.user2doc)
user = frappe.get_doc(cls.user2doc)
user.insert(ignore_permissions=True)
# Setup Mock OpenLDAP Directory
self.ldap_dc_path = "dc=unit,dc=testing"
self.ldap_user_path = "ou=users," + self.ldap_dc_path
self.ldap_group_path = "ou=groups," + self.ldap_dc_path
self.base_dn = "cn=base_dn_user," + self.ldap_dc_path
self.base_password = "my_password"
self.ldap_server = "ldap://my_fake_server:389"
cls.ldap_dc_path = "dc=unit,dc=testing"
cls.ldap_user_path = f"ou=users,{cls.ldap_dc_path}"
cls.ldap_group_path = f"ou=groups,{cls.ldap_dc_path}"
cls.base_dn = f"cn=base_dn_user,{cls.ldap_dc_path}"
cls.base_password = "my_password"
cls.ldap_server = "ldap://my_fake_server:389"
self.doc = {
cls.doc = {
"doctype": "LDAP Settings",
"enabled": True,
"ldap_directory_server": self.TEST_LDAP_SERVER,
"ldap_server_url": self.ldap_server,
"base_dn": self.base_dn,
"password": self.base_password,
"ldap_search_path_user": self.ldap_user_path,
"ldap_search_string": self.TEST_LDAP_SEARCH_STRING,
"ldap_search_path_group": self.ldap_group_path,
"ldap_directory_server": cls.TEST_LDAP_SERVER,
"ldap_server_url": cls.ldap_server,
"base_dn": cls.base_dn,
"password": cls.base_password,
"ldap_search_path_user": cls.ldap_user_path,
"ldap_search_string": cls.TEST_LDAP_SEARCH_STRING,
"ldap_search_path_group": cls.ldap_group_path,
"ldap_user_creation_and_mapping_section": "",
"ldap_email_field": "mail",
"ldap_username_field": self.LDAP_USERNAME_FIELD,
"ldap_username_field": cls.LDAP_USERNAME_FIELD,
"ldap_first_name_field": "givenname",
"ldap_middle_name_field": "",
"ldap_last_name_field": "sn",
@ -135,50 +126,41 @@ class LDAP_TestCase:
"ldap_group_objectclass": "",
"ldap_group_member_attribute": "",
"default_role": "Newsletter Manager",
"ldap_groups": self.DOCUMENT_GROUP_MAPPINGS,
"ldap_groups": cls.DOCUMENT_GROUP_MAPPINGS,
"ldap_group_field": "",
"default_user_type": "System User",
}
self.server = Server(host=self.ldap_server, port=389, get_info=self.LDAP_SCHEMA)
self.connection = Connection(
self.server,
user=self.base_dn,
password=self.base_password,
cls.server = Server(host=cls.ldap_server, port=389, get_info=cls.LDAP_SCHEMA)
cls.connection = Connection(
cls.server,
user=cls.base_dn,
password=cls.base_password,
read_only=True,
client_strategy=MOCK_SYNC,
)
self.connection.strategy.entries_from_json(
os.path.abspath(os.path.dirname(__file__)) + "/" + self.LDAP_LDIF_JSON
cls.connection.strategy.entries_from_json(
f"{os.path.abspath(os.path.dirname(__file__))}/{cls.LDAP_LDIF_JSON}"
)
self.connection.bind()
cls.connection.bind()
@classmethod
def tearDownClass(self):
try:
def tearDownClass(cls):
with contextlib.suppress(Exception):
frappe.get_doc("LDAP Settings").delete()
except Exception:
pass
try:
# return doc back to user data
self.user_ldap_settings.save()
except Exception:
pass
# return doc back to user data
with contextlib.suppress(Exception):
cls.user_ldap_settings.save()
# Clean-up test users
self.clean_test_users()
cls.clean_test_users()
# Clear OpenLDAP connection
self.connection = None
cls.connection = None
@mock_ldap_connection
def test_mandatory_fields(self):
mandatory_fields = [
"ldap_server_url",
"ldap_directory_server",
@ -195,26 +177,14 @@ class LDAP_TestCase:
] # fields that are required to have ldap functioning need to be mandatory
for mandatory_field in mandatory_fields:
localdoc = self.doc.copy()
localdoc[mandatory_field] = ""
try:
with contextlib.suppress(MandatoryError, ValidationError):
frappe.get_doc(localdoc).save()
self.fail(f"Document LDAP Settings field [{mandatory_field}] is not mandatory")
except frappe.exceptions.MandatoryError:
pass
except frappe.exceptions.ValidationError:
if mandatory_field == "ldap_search_string":
# additional validation is done on this field, pass in this instance
pass
for non_mandatory_field in self.doc: # Ensure remaining fields have not been made mandatory
if non_mandatory_field == "doctype" or non_mandatory_field in mandatory_fields:
continue
@ -222,15 +192,12 @@ class LDAP_TestCase:
localdoc[non_mandatory_field] = ""
try:
frappe.get_doc(localdoc).save()
except frappe.exceptions.MandatoryError:
except MandatoryError:
self.fail(f"Document LDAP Settings field [{non_mandatory_field}] should not be mandatory")
@mock_ldap_connection
def test_validation_ldap_search_string(self):
invalid_ldap_search_strings = [
"",
"uid={0}",
@ -242,19 +209,26 @@ class LDAP_TestCase:
] # ldap search string must be enclosed in '()' for ldap search to work for finding user and have the same number of opening and closing brackets.
for invalid_search_string in invalid_ldap_search_strings:
localdoc = self.doc.copy()
localdoc["ldap_search_string"] = invalid_search_string
try:
with contextlib.suppress(ValidationError):
frappe.get_doc(localdoc).save()
self.fail(f"LDAP search string [{invalid_search_string}] should not validate")
except frappe.exceptions.ValidationError:
pass
def test_connect_to_ldap(self):
# prevent these parameters for security or lack of the und user from being able to configure
prevent_connection_parameters = {
"mode": {
"IP_V4_ONLY": "Locks the user to IPv4 without frappe providing a way to configure",
"IP_V6_ONLY": "Locks the user to IPv6 without frappe providing a way to configure",
},
"auto_bind": {
"NONE": "ldap3.Connection must autobind with base_dn",
"NO_TLS": "ldap3.Connection must have TLS",
"TLS_AFTER_BIND": "[Security] ldap3.Connection TLS bind must occur before bind",
},
}
# setup a clean doc with ldap disabled so no validation occurs (this is tested seperatly)
local_doc = self.doc.copy()
@ -262,48 +236,25 @@ class LDAP_TestCase:
self.test_class = LDAPSettings(self.doc)
with mock.patch("ldap3.Server") as ldap3_server_method:
with mock.patch("ldap3.Connection") as ldap3_connection_method:
ldap3_connection_method.return_value = self.connection
with mock.patch("ldap3.Connection", return_value=self.connection) as ldap3_connection_method:
with mock.patch("ldap3.Tls") as ldap3_Tls_method:
function_return = self.test_class.connect_to_ldap(
base_dn=self.base_dn, password=self.base_password
)
args, kwargs = ldap3_connection_method.call_args
prevent_connection_parameters = {
# prevent these parameters for security or lack of the und user from being able to configure
"mode": {
"IP_V4_ONLY": "Locks the user to IPv4 without frappe providing a way to configure",
"IP_V6_ONLY": "Locks the user to IPv6 without frappe providing a way to configure",
},
"auto_bind": {
"NONE": "ldap3.Connection must autobind with base_dn",
"NO_TLS": "ldap3.Connection must have TLS",
"TLS_AFTER_BIND": "[Security] ldap3.Connection TLS bind must occur before bind",
},
}
for connection_arg in kwargs:
if (
connection_arg in prevent_connection_parameters
and kwargs[connection_arg] in prevent_connection_parameters[connection_arg]
):
self.fail(
"ldap3.Connection was called with {}, failed reason: [{}]".format(
kwargs[connection_arg],
prevent_connection_parameters[connection_arg][kwargs[connection_arg]],
)
f"ldap3.Connection was called with {kwargs[connection_arg]}, failed reason: [{prevent_connection_parameters[connection_arg][kwargs[connection_arg]]}]"
)
tls_version = ssl.PROTOCOL_TLS_CLIENT
if local_doc["require_trusted_certificate"] == "Yes":
tls_validate = ssl.CERT_REQUIRED
tls_version = ssl.PROTOCOL_TLS_CLIENT
tls_configuration = ldap3.Tls(validate=tls_validate, version=tls_version)
self.assertTrue(
@ -313,7 +264,6 @@ class LDAP_TestCase:
else:
tls_validate = ssl.CERT_NONE
tls_version = ssl.PROTOCOL_TLS_CLIENT
tls_configuration = ldap3.Tls(validate=tls_validate, version=tls_version)
self.assertTrue(kwargs["auto_bind"], "ldap3.Connection must autobind")
@ -347,7 +297,7 @@ class LDAP_TestCase:
)
self.assertTrue(
type(function_return) is ldap3.core.connection.Connection,
type(function_return) is Connection,
"The return type must be of ldap3.Connection",
)
@ -364,24 +314,20 @@ class LDAP_TestCase:
@mock_ldap_connection
def test_get_ldap_client_settings(self):
result = self.test_class.get_ldap_client_settings()
self.assertIsInstance(result, dict)
self.assertTrue(result["enabled"] == self.doc["enabled"]) # settings should match doc
localdoc = self.doc.copy()
localdoc["enabled"] = False
frappe.get_doc(localdoc).save()
result = self.test_class.get_ldap_client_settings()
self.assertFalse(result["enabled"]) # must match the edited doc
@mock_ldap_connection
def test_update_user_fields(self):
test_user_data = {
"username": "posix.user",
"email": "posix.user1@unit.testing",
@ -391,11 +337,8 @@ class LDAP_TestCase:
"phone": "08 1234 5678",
"mobile_no": "0421 123 456",
}
test_user = frappe.get_doc("User", test_user_data["email"])
self.test_class.update_user_fields(test_user, test_user_data)
updated_user = frappe.get_doc("User", test_user_data["email"])
self.assertTrue(updated_user.middle_name == test_user_data["middle_name"])
@ -403,9 +346,23 @@ class LDAP_TestCase:
self.assertTrue(updated_user.phone == test_user_data["phone"])
self.assertTrue(updated_user.mobile_no == test_user_data["mobile_no"])
self.assertEqual(updated_user.user_type, self.test_class.default_user_type)
self.assertIn(self.test_class.default_role, frappe.get_roles(updated_user.name))
@mock_ldap_connection
def test_create_website_user(self):
new_test_user_data = {
"username": "website_ldap_user.test",
"email": "website_ldap_user@test.com",
"first_name": "Website User - LDAP Test",
}
self.test_class.default_user_type = "Website User"
self.test_class.create_or_update_user(user_data=new_test_user_data, groups=[])
new_user = frappe.get_doc("User", new_test_user_data["email"])
self.assertEqual(new_user.user_type, "Website User")
@mock_ldap_connection
def test_sync_roles(self):
if self.TEST_LDAP_SERVER.lower() == "openldap":
test_user_data = {
"posix.user1": [
@ -457,9 +414,8 @@ class LDAP_TestCase:
user.insert(ignore_permissions=True)
for test_user in test_user_data:
test_user_doc = frappe.get_doc("User", test_user + "@unit.testing")
test_user_roles = frappe.get_roles(test_user + "@unit.testing")
test_user_doc = frappe.get_doc("User", f"{test_user}@unit.testing")
test_user_roles = frappe.get_roles(f"{test_user}@unit.testing")
self.assertTrue(
len(test_user_roles) == 2, "User should only be a part of the All and Guest roles"
@ -467,28 +423,22 @@ class LDAP_TestCase:
self.test_class.sync_roles(test_user_doc, test_user_data[test_user]) # update user roles
frappe.get_doc("User", test_user + "@unit.testing")
updated_user_roles = frappe.get_roles(test_user + "@unit.testing")
frappe.get_doc("User", f"{test_user}@unit.testing")
updated_user_roles = frappe.get_roles(f"{test_user}@unit.testing")
self.assertTrue(
len(updated_user_roles) == len(test_user_data[test_user]),
"syncing of the user roles failed. {} != {} for user {}".format(
len(updated_user_roles), len(test_user_data[test_user]), test_user
),
f"syncing of the user roles failed. {len(updated_user_roles)} != {len(test_user_data[test_user])} for user {test_user}",
)
for user_role in updated_user_roles: # match each users role mapped to ldap groups
self.assertTrue(
role_to_group_map[user_role] in test_user_data[test_user],
"during sync_roles(), the user was given role {} which should not have occured".format(
user_role
),
f"during sync_roles(), the user was given role {user_role} which should not have occured",
)
@mock_ldap_connection
def test_create_or_update_user(self):
test_user_data = {
"posix.user1": [
"Users",
@ -498,28 +448,21 @@ class LDAP_TestCase:
"frappe_default_guest",
],
}
test_user = "posix.user1"
frappe.get_doc("User", test_user + "@unit.testing").delete() # remove user 1
frappe.get_doc("User", f"{test_user}@unit.testing").delete()
with self.assertRaises(
frappe.exceptions.DoesNotExistError
): # ensure user deleted so function can be tested
frappe.get_doc("User", test_user + "@unit.testing")
frappe.get_doc("User", f"{test_user}@unit.testing")
with mock.patch(
"frappe.integrations.doctype.ldap_settings.ldap_settings.LDAPSettings.update_user_fields"
) as update_user_fields_method:
update_user_fields_method.return_value = None
with mock.patch(
"frappe.integrations.doctype.ldap_settings.ldap_settings.LDAPSettings.sync_roles"
) as sync_roles_method:
sync_roles_method.return_value = None
# New user
self.test_class.create_or_update_user(self.user1doc, test_user_data[test_user])
@ -539,14 +482,11 @@ class LDAP_TestCase:
@mock_ldap_connection
def test_get_ldap_attributes(self):
method_return = self.test_class.get_ldap_attributes()
self.assertTrue(type(method_return) is list)
@mock_ldap_connection
def test_fetch_ldap_groups(self):
if self.TEST_LDAP_SERVER.lower() == "openldap":
test_users = {"posix.user": ["Users", "Administrators"], "posix.user2": ["Users", "Group3"]}
elif self.TEST_LDAP_SERVER.lower() == "active directory":
@ -556,7 +496,6 @@ class LDAP_TestCase:
}
for test_user in test_users:
self.connection.search(
search_base=self.ldap_user_path,
search_filter=self.TEST_LDAP_SEARCH_STRING.format(test_user),
@ -569,18 +508,13 @@ class LDAP_TestCase:
self.assertTrue(len(method_return) == len(test_users[test_user]))
for returned_group in method_return:
self.assertTrue(returned_group in test_users[test_user])
@mock_ldap_connection
def test_authenticate(self):
with mock.patch(
"frappe.integrations.doctype.ldap_settings.ldap_settings.LDAPSettings.fetch_ldap_groups"
) as fetch_ldap_groups_function:
fetch_ldap_groups_function.return_value = None
self.assertTrue(self.test_class.authenticate("posix.user", "posix_user_password"))
self.assertTrue(
@ -599,25 +533,19 @@ class LDAP_TestCase:
] # All invalid users should return 'invalid username or password'
for username, password in enumerate(invalid_users):
with self.assertRaises(frappe.exceptions.ValidationError) as display_massage:
self.test_class.authenticate(username, password)
self.assertTrue(
str(display_massage.exception).lower() == "invalid username or password",
"invalid credentials passed authentication [user: {}, password: {}]".format(
username, password
),
f"invalid credentials passed authentication [user: {username}, password: {password}]",
)
@mock_ldap_connection
def test_complex_ldap_search_filter(self):
ldap_search_filters = self.TEST_VALUES_LDAP_COMPLEX_SEARCH_STRING
for search_filter in ldap_search_filters:
self.test_class.ldap_search_string = search_filter
if (
@ -634,55 +562,44 @@ class LDAP_TestCase:
self.assertTrue(self.test_class.authenticate("posix.user", "posix_user_password"))
def test_reset_password(self):
self.test_class = LDAPSettings(self.doc)
# Create a clean doc
localdoc = self.doc.copy()
localdoc["enabled"] = False
frappe.get_doc(localdoc).save()
with mock.patch(
"frappe.integrations.doctype.ldap_settings.ldap_settings.LDAPSettings.connect_to_ldap"
"frappe.integrations.doctype.ldap_settings.ldap_settings.LDAPSettings.connect_to_ldap",
return_value=self.connection,
) as connect_to_ldap:
connect_to_ldap.return_value = self.connection
with self.assertRaises(
frappe.exceptions.ValidationError
) as validation: # Fail if username string used
self.test_class.reset_password("posix.user", "posix_user_password")
self.assertTrue(str(validation.exception) == "No LDAP User found for email: posix.user")
try:
with contextlib.suppress(Exception):
self.test_class.reset_password(
"posix.user1@unit.testing", "posix_user_password"
) # Change Password
except Exception: # An exception from the tested class is ok, as long as the connection to LDAP was made writeable
pass
connect_to_ldap.assert_called_with(self.base_dn, self.base_password, read_only=False)
@mock_ldap_connection
def test_convert_ldap_entry_to_dict(self):
self.connection.search(
search_base=self.ldap_user_path,
search_filter=self.TEST_LDAP_SEARCH_STRING.format("posix.user"),
attributes=self.test_class.get_ldap_attributes(),
)
test_ldap_entry = self.connection.entries[0]
method_return = self.test_class.convert_ldap_entry_to_dict(test_ldap_entry)
self.assertTrue(type(method_return) is dict) # must be dict
self.assertTrue(len(method_return) == 6) # there are 6 fields in mock_ldap for use
class Test_OpenLDAP(LDAP_TestCase, unittest.TestCase):
class Test_OpenLDAP(LDAP_TestCase, TestCase):
TEST_LDAP_SERVER = "OpenLDAP"
TEST_LDAP_SEARCH_STRING = "(uid={0})"
DOCUMENT_GROUP_MAPPINGS = [
@ -706,7 +623,7 @@ class Test_OpenLDAP(LDAP_TestCase, unittest.TestCase):
]
class Test_ActiveDirectory(LDAP_TestCase, unittest.TestCase):
class Test_ActiveDirectory(LDAP_TestCase, TestCase):
TEST_LDAP_SERVER = "Active Directory"
TEST_LDAP_SEARCH_STRING = "(samaccountname={0})"
DOCUMENT_GROUP_MAPPINGS = [