From e7023fa74ddf927a6947b2b0b68b1e8b0d00af1b Mon Sep 17 00:00:00 2001 From: Gavin D'souza Date: Thu, 7 Jul 2022 17:07:40 +0530 Subject: [PATCH 1/7] feat(ldap): Allow setting "User Type" for new users Prior to this, every user found in LDAP would mean a System User is created - now pick the type and role you want to give newly created users. For the given user types, the role may be picked from: System User | ldap_settings.default_user_role (Fetched from LDAP settings) Website User | N/A {{ Custom Type }} | user_type.role (Fetched from User Type record) --- .../ldap_group_mapping.json | 7 ++-- .../doctype/ldap_settings/ldap_settings.json | 34 +++++++++++++++---- .../doctype/ldap_settings/ldap_settings.py | 27 ++++++++++----- 3 files changed, 52 insertions(+), 16 deletions(-) diff --git a/frappe/integrations/doctype/ldap_group_mapping/ldap_group_mapping.json b/frappe/integrations/doctype/ldap_group_mapping/ldap_group_mapping.json index 92db68e962..9bfe1eac56 100644 --- a/frappe/integrations/doctype/ldap_group_mapping/ldap_group_mapping.json +++ b/frappe/integrations/doctype/ldap_group_mapping/ldap_group_mapping.json @@ -1,4 +1,5 @@ { + "actions": [], "creation": "2019-05-29 01:24:29.585060", "doctype": "DocType", "editable_grid": 1, @@ -19,13 +20,14 @@ "fieldname": "erpnext_role", "fieldtype": "Link", "in_list_view": 1, - "label": "ERPNext Role", + "label": "User Role", "options": "Role", "reqd": 1 } ], "istable": 1, - "modified": "2019-07-15 06:46:38.050408", + "links": [], + "modified": "2022-07-07 16:28:44.828514", "modified_by": "Administrator", "module": "Integrations", "name": "LDAP Group Mapping", @@ -34,5 +36,6 @@ "quick_entry": 1, "sort_field": "modified", "sort_order": "ASC", + "states": [], "track_changes": 1 } \ No newline at end of file diff --git a/frappe/integrations/doctype/ldap_settings/ldap_settings.json b/frappe/integrations/doctype/ldap_settings/ldap_settings.json index fd45a71538..f5472a5097 100644 --- a/frappe/integrations/doctype/ldap_settings/ldap_settings.json +++ b/frappe/integrations/doctype/ldap_settings/ldap_settings.json @@ -42,7 +42,10 @@ "column_break_33", "ldap_group_member_attribute", "ldap_group_mappings_section", + "default_user_type", + "column_break_38", "default_role", + "section_break_40", "ldap_groups", "ldap_group_field" ], @@ -79,9 +82,11 @@ "reqd": 1 }, { + "depends_on": "eval: doc.default_user_type == \"System User\"", "fieldname": "default_role", "fieldtype": "Link", - "label": "Default Role on Creation", + "label": "Default User Role", + "mandatory_depends_on": "eval: doc.default_user_type == \"System User\"", "options": "Role", "reqd": 1 }, @@ -249,10 +254,10 @@ "label": "Group Object Class" }, { - "description": "string value, i.e. {0} or uid={0},ou=users,dc=example,dc=com", - "fieldname": "ldap_custom_group_search", - "fieldtype": "Data", - "label": "Custom Group Search" + "description": "string value, i.e. {0} or uid={0},ou=users,dc=example,dc=com", + "fieldname": "ldap_custom_group_search", + "fieldtype": "Data", + "label": "Custom Group Search" }, { "description": "Requires any valid fdn path. i.e. ou=users,dc=example,dc=com", @@ -268,12 +273,28 @@ "fieldtype": "Data", "label": "LDAP search path for Groups", "reqd": 1 + }, + { + "fieldname": "default_user_type", + "fieldtype": "Link", + "label": "Default User Type", + "options": "User Type", + "reqd": 1 + }, + { + "fieldname": "column_break_38", + "fieldtype": "Column Break" + }, + { + "fieldname": "section_break_40", + "fieldtype": "Section Break", + "hide_border": 1 } ], "in_create": 1, "issingle": 1, "links": [], - "modified": "2021-07-27 11:51:43.328271", + "modified": "2022-07-07 16:51:46.230793", "modified_by": "Administrator", "module": "Integrations", "name": "LDAP Settings", @@ -294,5 +315,6 @@ "read_only": 1, "sort_field": "modified", "sort_order": "DESC", + "states": [], "track_changes": 1 } \ No newline at end of file diff --git a/frappe/integrations/doctype/ldap_settings/ldap_settings.py b/frappe/integrations/doctype/ldap_settings/ldap_settings.py index ef6493717f..249d4e31f3 100644 --- a/frappe/integrations/doctype/ldap_settings/ldap_settings.py +++ b/frappe/integrations/doctype/ldap_settings/ldap_settings.py @@ -1,11 +1,16 @@ # Copyright (c) 2015, Frappe Technologies and contributors # License: MIT. See LICENSE +from typing import TYPE_CHECKING + import frappe from frappe import _, safe_encode from frappe.model.document import Document from frappe.twofactor import authenticate_for_2factor, confirm_otp_token, should_run_2fa +if TYPE_CHECKING: + from frappe.core.doctype.user.user import User + class LDAPSettings(Document): def validate(self): @@ -134,7 +139,7 @@ class LDAPSettings(Document): setattr(user, key, value) user.save(ignore_permissions=True) - def sync_roles(self, user, additional_groups=None): + def sync_roles(self, user: "User", additional_groups=None): current_roles = {d.role for d in user.get("roles")} @@ -158,7 +163,9 @@ class LDAPSettings(Document): user.remove_roles(*roles_to_remove) def create_or_update_user(self, user_data, groups=None): - user = None + user: "User" = None + role: str = None + if frappe.db.exists("User", user_data["email"]): user = frappe.get_doc("User", user_data["email"]) LDAPSettings.update_user_fields(user=user, user_data=user_data) @@ -169,16 +176,20 @@ class LDAPSettings(Document): "doctype": "User", "send_welcome_email": 0, "language": "", - "user_type": "System User", - # "roles": [{ - # "role": self.default_role - # }] + "user_type": self.default_user_type, } ) user = frappe.get_doc(doc) user.insert(ignore_permissions=True) - # always add default role. - user.add_roles(self.default_role) + + if self.default_user_type == "System User": + role = self.default_role + else: + role = frappe.db.get_value("User Type", user.user_type, "role") + + if role: + user.add_roles(role) + self.sync_roles(user, groups) return user From b20f77b9b965ba5fb03f3f7b88883413ef3e2251 Mon Sep 17 00:00:00 2001 From: Gavin D'souza Date: Fri, 8 Jul 2022 11:39:37 +0530 Subject: [PATCH 2/7] fix(ldap): Set default user type to System User --- frappe/integrations/doctype/ldap_settings/ldap_settings.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/frappe/integrations/doctype/ldap_settings/ldap_settings.py b/frappe/integrations/doctype/ldap_settings/ldap_settings.py index 249d4e31f3..23f4965438 100644 --- a/frappe/integrations/doctype/ldap_settings/ldap_settings.py +++ b/frappe/integrations/doctype/ldap_settings/ldap_settings.py @@ -14,6 +14,8 @@ if TYPE_CHECKING: class LDAPSettings(Document): def validate(self): + self.default_user_type = self.default_user_type or "System User" + if not self.enabled: return From ee97038c71e5a72dde107105d6868062c57e02fb Mon Sep 17 00:00:00 2001 From: Gavin D'souza Date: Thu, 7 Jul 2022 17:34:08 +0530 Subject: [PATCH 3/7] chore: Add typing + reduce import paths --- .../doctype/ldap_settings/ldap_settings.py | 101 +++++++----------- 1 file changed, 37 insertions(+), 64 deletions(-) diff --git a/frappe/integrations/doctype/ldap_settings/ldap_settings.py b/frappe/integrations/doctype/ldap_settings/ldap_settings.py index 23f4965438..f9d083a12f 100644 --- a/frappe/integrations/doctype/ldap_settings/ldap_settings.py +++ b/frappe/integrations/doctype/ldap_settings/ldap_settings.py @@ -1,8 +1,20 @@ -# Copyright (c) 2015, Frappe Technologies and contributors +# Copyright (c) 2022, Frappe Technologies and contributors # License: MIT. See LICENSE +import ssl from typing import TYPE_CHECKING +import ldap3 +from ldap3 import AUTO_BIND_TLS_BEFORE_BIND, HASHED_SALTED_SHA, MODIFY_REPLACE +from ldap3.abstract.entry import Entry +from ldap3.core.exceptions import ( + LDAPAttributeError, + LDAPInvalidCredentialsResult, + LDAPInvalidFilterError, + LDAPNoSuchObjectResult, +) +from ldap3.utils.hashed import hashed + import frappe from frappe import _, safe_encode from frappe.model.document import Document @@ -20,7 +32,6 @@ class LDAPSettings(Document): return if not self.flags.ignore_mandatory: - if ( self.ldap_search_string.count("(") == self.ldap_search_string.count(")") and self.ldap_search_string.startswith("(") @@ -35,8 +46,6 @@ class LDAPSettings(Document): try: if conn.result["type"] == "bindResponse" and self.base_dn: - import ldap3 - conn.search( search_base=self.ldap_search_path_user, search_filter="(objectClass=*)", @@ -47,13 +56,13 @@ class LDAPSettings(Document): search_base=self.ldap_search_path_group, search_filter="(objectClass=*)", attributes=["cn"] ) - except ldap3.core.exceptions.LDAPAttributeError as ex: + except LDAPAttributeError as ex: frappe.throw( _("LDAP settings incorrect. validation response was: {0}").format(ex), title=_("Misconfigured"), ) - except ldap3.core.exceptions.LDAPNoSuchObjectResult: + except LDAPNoSuchObjectResult: frappe.throw( _("Ensure the user and group search paths are correct."), title=_("Misconfigured") ) @@ -82,12 +91,8 @@ class LDAPSettings(Document): ) ) - def connect_to_ldap(self, base_dn, password, read_only=True): + def connect_to_ldap(self, base_dn, password, read_only=True) -> ldap3.Connection: try: - import ssl - - import ldap3 - if self.require_trusted_certificate == "Yes": tls_configuration = ldap3.Tls(validate=ssl.CERT_REQUIRED, version=ssl.PROTOCOL_TLS_CLIENT) else: @@ -101,9 +106,9 @@ class LDAPSettings(Document): tls_configuration.ca_certs_file = self.local_ca_certs_file server = ldap3.Server(host=self.ldap_server_url, tls=tls_configuration) - bind_type = ldap3.AUTO_BIND_TLS_BEFORE_BIND if self.ssl_tls_mode == "StartTLS" else True + bind_type = AUTO_BIND_TLS_BEFORE_BIND if self.ssl_tls_mode == "StartTLS" else True - conn = ldap3.Connection( + return ldap3.Connection( server=server, user=base_dn, password=password, @@ -112,18 +117,16 @@ class LDAPSettings(Document): raise_exceptions=True, ) - return conn - except ImportError: msg = _("Please Install the ldap3 library via pip to use ldap functionality.") frappe.throw(msg, title=_("LDAP Not Installed")) - except ldap3.core.exceptions.LDAPInvalidCredentialsResult: + except LDAPInvalidCredentialsResult: frappe.throw(_("Invalid username or password")) except Exception as ex: frappe.throw(_(str(ex))) @staticmethod - def get_ldap_client_settings(): + def get_ldap_client_settings() -> dict: # return the settings to be used on the client side. result = {"enabled": False} ldap = frappe.get_cached_doc("LDAP Settings") @@ -133,21 +136,16 @@ class LDAPSettings(Document): return result @classmethod - def update_user_fields(cls, user, user_data): - + def update_user_fields(cls, user: "User", user_data: dict): updatable_data = {key: value for key, value in user_data.items() if key != "email"} for key, value in updatable_data.items(): setattr(user, key, value) user.save(ignore_permissions=True) - def sync_roles(self, user: "User", additional_groups=None): - + def sync_roles(self, user: "User", additional_groups: list = None): current_roles = {d.role for d in user.get("roles")} - - needed_roles = set() - needed_roles.add(self.default_role) - + needed_roles = {self.default_role} lower_groups = [g.lower() for g in additional_groups or []] all_mapped_roles = {r.erpnext_role for r in self.ldap_groups} @@ -164,7 +162,7 @@ class LDAPSettings(Document): user.remove_roles(*roles_to_remove) - def create_or_update_user(self, user_data, groups=None): + def create_or_update_user(self, user_data: dict, groups: list = None): user: "User" = None role: str = None @@ -216,38 +214,28 @@ class LDAPSettings(Document): return ldap_attributes - def fetch_ldap_groups(self, user, conn): - import ldap3 + def fetch_ldap_groups(self, user: Entry, conn: ldap3.Connection) -> list: + if not isinstance(user, Entry): + raise TypeError("Invalid type, attribute 'user' must be of type 'ldap3.abstract.entry.Entry'") - if type(user) is not ldap3.abstract.entry.Entry: - raise TypeError( - "Invalid type, attribute {} must be of type '{}'".format("user", "ldap3.abstract.entry.Entry") - ) - - if type(conn) is not ldap3.core.connection.Connection: - raise TypeError( - "Invalid type, attribute {} must be of type '{}'".format("conn", "ldap3.Connection") - ) + if not isinstance(conn, ldap3.Connection): + raise TypeError("Invalid type, attribute 'conn' must be of type 'ldap3.Connection'") fetch_ldap_groups = None - ldap_object_class = None ldap_group_members_attribute = None if self.ldap_directory_server.lower() == "active directory": - ldap_object_class = "Group" ldap_group_members_attribute = "member" user_search_str = user.entry_dn elif self.ldap_directory_server.lower() == "openldap": - ldap_object_class = "posixgroup" ldap_group_members_attribute = "memberuid" user_search_str = getattr(user, self.ldap_username_field).value elif self.ldap_directory_server.lower() == "custom": - ldap_object_class = self.ldap_group_objectclass ldap_group_members_attribute = self.ldap_group_member_attribute ldap_custom_group_search = self.ldap_custom_group_search or "{0}" @@ -258,39 +246,31 @@ class LDAPSettings(Document): # this path will be hit for everyone with preconfigured ldap settings. this must be taken into account so as not to break ldap for those users. if self.ldap_group_field: - fetch_ldap_groups = getattr(user, self.ldap_group_field).values if ldap_object_class is not None: conn.search( search_base=self.ldap_search_path_group, - search_filter="(&(objectClass={})({}={}))".format( - ldap_object_class, ldap_group_members_attribute, user_search_str - ), + search_filter=f"(&(objectClass={ldap_object_class})({ldap_group_members_attribute}={user_search_str}))", attributes=["cn"], ) # Build search query if len(conn.entries) >= 1: - fetch_ldap_groups = [] for group in conn.entries: fetch_ldap_groups.append(group["cn"].value) return fetch_ldap_groups - def authenticate(self, username, password): - + def authenticate(self, username: str, password: str): if not self.enabled: frappe.throw(_("LDAP is not enabled.")) user_filter = self.ldap_search_string.format(username) ldap_attributes = self.get_ldap_attributes() - conn = self.connect_to_ldap(self.base_dn, self.get_password(raise_exception=False)) try: - import ldap3 - conn.search( search_base=self.ldap_search_path_user, search_filter=f"{user_filter}", @@ -299,26 +279,21 @@ class LDAPSettings(Document): if len(conn.entries) == 1 and conn.entries[0]: user = conn.entries[0] - groups = self.fetch_ldap_groups(user, conn) # only try and connect as the user, once we have their fqdn entry. if user.entry_dn and password and conn.rebind(user=user.entry_dn, password=password): - return self.create_or_update_user(self.convert_ldap_entry_to_dict(user), groups=groups) - raise ldap3.core.exceptions.LDAPInvalidCredentialsResult # even though nothing foundor failed authentication raise invalid credentials + raise LDAPInvalidCredentialsResult # even though nothing foundor failed authentication raise invalid credentials - except ldap3.core.exceptions.LDAPInvalidFilterError: + except LDAPInvalidFilterError: frappe.throw(_("Please use a valid LDAP search filter"), title=_("Misconfigured")) - except ldap3.core.exceptions.LDAPInvalidCredentialsResult: + except LDAPInvalidCredentialsResult: frappe.throw(_("Invalid username or password")) def reset_password(self, user, password, logout_sessions=False): - from ldap3 import HASHED_SALTED_SHA, MODIFY_REPLACE - from ldap3.utils.hashed import hashed - search_filter = f"({self.ldap_email_field}={user})" conn = self.connect_to_ldap( @@ -347,8 +322,7 @@ class LDAPSettings(Document): else: frappe.throw(_("No LDAP User found for email: {0}").format(user)) - def convert_ldap_entry_to_dict(self, user_entry): - + def convert_ldap_entry_to_dict(self, user_entry: Entry): # support multiple email values email = user_entry[self.ldap_email_field] @@ -359,7 +333,6 @@ class LDAPSettings(Document): } # optional fields - if self.ldap_middle_name_field: data["middle_name"] = user_entry[self.ldap_middle_name_field].value @@ -379,7 +352,7 @@ class LDAPSettings(Document): def login(): # LDAP LOGIN LOGIC args = frappe.form_dict - ldap = frappe.get_doc("LDAP Settings") + ldap: LDAPSettings = frappe.get_doc("LDAP Settings") user = ldap.authenticate(frappe.as_unicode(args.usr), frappe.as_unicode(args.pwd)) @@ -396,7 +369,7 @@ def login(): @frappe.whitelist() def reset_password(user, password, logout): - ldap = frappe.get_doc("LDAP Settings") + ldap: LDAPSettings = frappe.get_doc("LDAP Settings") if not ldap.enabled: frappe.throw(_("LDAP is not enabled.")) ldap.reset_password(user, password, logout_sessions=int(logout)) From a588879094ec997cd1f134aac72e64bb409ab05d Mon Sep 17 00:00:00 2001 From: Gavin D'souza Date: Mon, 18 Jul 2022 17:30:04 +0530 Subject: [PATCH 4/7] refactor(minor): LDAP Settings Test Suite * Re-write blocks for better readability * De-indent everything * Add typing, etc --- .../ldap_settings/test_ldap_settings.py | 308 ++++++------------ 1 file changed, 101 insertions(+), 207 deletions(-) diff --git a/frappe/integrations/doctype/ldap_settings/test_ldap_settings.py b/frappe/integrations/doctype/ldap_settings/test_ldap_settings.py index f53b5291b3..8587c6c656 100644 --- a/frappe/integrations/doctype/ldap_settings/test_ldap_settings.py +++ b/frappe/integrations/doctype/ldap_settings/test_ldap_settings.py @@ -1,15 +1,16 @@ -# Copyright (c) 2019, Frappe Technologies and Contributors +# Copyright (c) 2022, Frappe Technologies and Contributors # License: MIT. See LICENSE +import contextlib import functools import os import ssl -import unittest -from unittest import mock +from unittest import TestCase, mock import ldap3 from ldap3 import MOCK_SYNC, OFFLINE_AD_2012_R2, OFFLINE_SLAPD_2_4, Connection, Server import frappe +from frappe.exceptions import MandatoryError, ValidationError from frappe.integrations.doctype.ldap_settings.ldap_settings import LDAPSettings @@ -27,10 +28,9 @@ class LDAP_TestCase: def wrapped(self, *args, **kwargs): with mock.patch( - "frappe.integrations.doctype.ldap_settings.ldap_settings.LDAPSettings.connect_to_ldap" - ) as mock_connection: - mock_connection.return_value = self.connection - + "frappe.integrations.doctype.ldap_settings.ldap_settings.LDAPSettings.connect_to_ldap", + return_value=self.connection, + ): self.test_class = LDAPSettings(self.doc) # Create a clean doc @@ -47,80 +47,64 @@ class LDAP_TestCase: return wrapped def clean_test_users(): - try: # clean up test user 1 + with contextlib.suppress(Exception): frappe.get_doc("User", "posix.user1@unit.testing").delete() - except Exception: - pass - - try: # clean up test user 2 + with contextlib.suppress(Exception): frappe.get_doc("User", "posix.user2@unit.testing").delete() - except Exception: - pass @classmethod - def setUpClass(self, ldapServer="OpenLDAP"): - - self.clean_test_users() + def setUpClass(cls): + cls.clean_test_users() # Save user data for restoration in tearDownClass() - self.user_ldap_settings = frappe.get_doc("LDAP Settings") + cls.user_ldap_settings = frappe.get_doc("LDAP Settings") # Create test user1 - self.user1doc = { + cls.user1doc = { "username": "posix.user", "email": "posix.user1@unit.testing", "first_name": "posix", + "doctype": "User", + "send_welcome_email": 0, + "language": "", + "user_type": "System User", } - self.user1doc.update( - { - "doctype": "User", - "send_welcome_email": 0, - "language": "", - "user_type": "System User", - } - ) - user = frappe.get_doc(self.user1doc) + user = frappe.get_doc(cls.user1doc) user.insert(ignore_permissions=True) - # Create test user1 - self.user2doc = { + cls.user2doc = { "username": "posix.user2", "email": "posix.user2@unit.testing", "first_name": "posix", + "doctype": "User", + "send_welcome_email": 0, + "language": "", + "user_type": "System User", } - self.user2doc.update( - { - "doctype": "User", - "send_welcome_email": 0, - "language": "", - "user_type": "System User", - } - ) - - user = frappe.get_doc(self.user2doc) + user = frappe.get_doc(cls.user2doc) user.insert(ignore_permissions=True) # Setup Mock OpenLDAP Directory - self.ldap_dc_path = "dc=unit,dc=testing" - self.ldap_user_path = "ou=users," + self.ldap_dc_path - self.ldap_group_path = "ou=groups," + self.ldap_dc_path - self.base_dn = "cn=base_dn_user," + self.ldap_dc_path - self.base_password = "my_password" - self.ldap_server = "ldap://my_fake_server:389" + cls.ldap_dc_path = "dc=unit,dc=testing" + cls.ldap_user_path = f"ou=users,{cls.ldap_dc_path}" + cls.ldap_group_path = f"ou=groups,{cls.ldap_dc_path}" + cls.base_dn = f"cn=base_dn_user,{cls.ldap_dc_path}" + cls.base_password = "my_password" + cls.ldap_server = "ldap://my_fake_server:389" - self.doc = { + cls.doc = { "doctype": "LDAP Settings", "enabled": True, - "ldap_directory_server": self.TEST_LDAP_SERVER, - "ldap_server_url": self.ldap_server, - "base_dn": self.base_dn, - "password": self.base_password, - "ldap_search_path_user": self.ldap_user_path, - "ldap_search_string": self.TEST_LDAP_SEARCH_STRING, - "ldap_search_path_group": self.ldap_group_path, + "ldap_directory_server": cls.TEST_LDAP_SERVER, + "ldap_server_url": cls.ldap_server, + "base_dn": cls.base_dn, + "password": cls.base_password, + "ldap_search_path_user": cls.ldap_user_path, + "ldap_search_string": cls.TEST_LDAP_SEARCH_STRING, + "ldap_search_path_group": cls.ldap_group_path, "ldap_user_creation_and_mapping_section": "", "ldap_email_field": "mail", - "ldap_username_field": self.LDAP_USERNAME_FIELD, + "ldap_username_field": cls.LDAP_USERNAME_FIELD, "ldap_first_name_field": "givenname", "ldap_middle_name_field": "", "ldap_last_name_field": "sn", @@ -135,50 +119,40 @@ class LDAP_TestCase: "ldap_group_objectclass": "", "ldap_group_member_attribute": "", "default_role": "Newsletter Manager", - "ldap_groups": self.DOCUMENT_GROUP_MAPPINGS, + "ldap_groups": cls.DOCUMENT_GROUP_MAPPINGS, "ldap_group_field": "", } - self.server = Server(host=self.ldap_server, port=389, get_info=self.LDAP_SCHEMA) - - self.connection = Connection( - self.server, - user=self.base_dn, - password=self.base_password, + cls.server = Server(host=cls.ldap_server, port=389, get_info=cls.LDAP_SCHEMA) + cls.connection = Connection( + cls.server, + user=cls.base_dn, + password=cls.base_password, read_only=True, client_strategy=MOCK_SYNC, ) - - self.connection.strategy.entries_from_json( - os.path.abspath(os.path.dirname(__file__)) + "/" + self.LDAP_LDIF_JSON + cls.connection.strategy.entries_from_json( + f"{os.path.abspath(os.path.dirname(__file__))}/{cls.LDAP_LDIF_JSON}" ) - - self.connection.bind() + cls.connection.bind() @classmethod - def tearDownClass(self): - try: + def tearDownClass(cls): + with contextlib.suppress(Exception): frappe.get_doc("LDAP Settings").delete() - except Exception: - pass - - try: - # return doc back to user data - self.user_ldap_settings.save() - - except Exception: - pass + # return doc back to user data + with contextlib.suppress(Exception): + cls.user_ldap_settings.save() # Clean-up test users - self.clean_test_users() + cls.clean_test_users() # Clear OpenLDAP connection - self.connection = None + cls.connection = None @mock_ldap_connection - def test_mandatory_fields(self): - + def test_mandatory_fields(self: TestCase): mandatory_fields = [ "ldap_server_url", "ldap_directory_server", @@ -195,26 +169,14 @@ class LDAP_TestCase: ] # fields that are required to have ldap functioning need to be mandatory for mandatory_field in mandatory_fields: - localdoc = self.doc.copy() localdoc[mandatory_field] = "" - try: - + with contextlib.suppress(MandatoryError, ValidationError): frappe.get_doc(localdoc).save() - self.fail(f"Document LDAP Settings field [{mandatory_field}] is not mandatory") - except frappe.exceptions.MandatoryError: - pass - - except frappe.exceptions.ValidationError: - if mandatory_field == "ldap_search_string": - # additional validation is done on this field, pass in this instance - pass - for non_mandatory_field in self.doc: # Ensure remaining fields have not been made mandatory - if non_mandatory_field == "doctype" or non_mandatory_field in mandatory_fields: continue @@ -222,15 +184,12 @@ class LDAP_TestCase: localdoc[non_mandatory_field] = "" try: - frappe.get_doc(localdoc).save() - - except frappe.exceptions.MandatoryError: + except MandatoryError: self.fail(f"Document LDAP Settings field [{non_mandatory_field}] should not be mandatory") @mock_ldap_connection - def test_validation_ldap_search_string(self): - + def test_validation_ldap_search_string(self: TestCase): invalid_ldap_search_strings = [ "", "uid={0}", @@ -242,19 +201,26 @@ class LDAP_TestCase: ] # ldap search string must be enclosed in '()' for ldap search to work for finding user and have the same number of opening and closing brackets. for invalid_search_string in invalid_ldap_search_strings: - localdoc = self.doc.copy() localdoc["ldap_search_string"] = invalid_search_string - try: + with contextlib.suppress(ValidationError): frappe.get_doc(localdoc).save() - self.fail(f"LDAP search string [{invalid_search_string}] should not validate") - except frappe.exceptions.ValidationError: - pass - - def test_connect_to_ldap(self): + def test_connect_to_ldap(self: TestCase): + # prevent these parameters for security or lack of the und user from being able to configure + prevent_connection_parameters = { + "mode": { + "IP_V4_ONLY": "Locks the user to IPv4 without frappe providing a way to configure", + "IP_V6_ONLY": "Locks the user to IPv6 without frappe providing a way to configure", + }, + "auto_bind": { + "NONE": "ldap3.Connection must autobind with base_dn", + "NO_TLS": "ldap3.Connection must have TLS", + "TLS_AFTER_BIND": "[Security] ldap3.Connection TLS bind must occur before bind", + }, + } # setup a clean doc with ldap disabled so no validation occurs (this is tested seperatly) local_doc = self.doc.copy() @@ -262,48 +228,25 @@ class LDAP_TestCase: self.test_class = LDAPSettings(self.doc) with mock.patch("ldap3.Server") as ldap3_server_method: - - with mock.patch("ldap3.Connection") as ldap3_connection_method: - ldap3_connection_method.return_value = self.connection - + with mock.patch("ldap3.Connection", return_value=self.connection) as ldap3_connection_method: with mock.patch("ldap3.Tls") as ldap3_Tls_method: - function_return = self.test_class.connect_to_ldap( base_dn=self.base_dn, password=self.base_password ) - args, kwargs = ldap3_connection_method.call_args - prevent_connection_parameters = { - # prevent these parameters for security or lack of the und user from being able to configure - "mode": { - "IP_V4_ONLY": "Locks the user to IPv4 without frappe providing a way to configure", - "IP_V6_ONLY": "Locks the user to IPv6 without frappe providing a way to configure", - }, - "auto_bind": { - "NONE": "ldap3.Connection must autobind with base_dn", - "NO_TLS": "ldap3.Connection must have TLS", - "TLS_AFTER_BIND": "[Security] ldap3.Connection TLS bind must occur before bind", - }, - } - for connection_arg in kwargs: - if ( connection_arg in prevent_connection_parameters and kwargs[connection_arg] in prevent_connection_parameters[connection_arg] ): - self.fail( - "ldap3.Connection was called with {}, failed reason: [{}]".format( - kwargs[connection_arg], - prevent_connection_parameters[connection_arg][kwargs[connection_arg]], - ) + f"ldap3.Connection was called with {kwargs[connection_arg]}, failed reason: [{prevent_connection_parameters[connection_arg][kwargs[connection_arg]]}]" ) + tls_version = ssl.PROTOCOL_TLS_CLIENT if local_doc["require_trusted_certificate"] == "Yes": tls_validate = ssl.CERT_REQUIRED - tls_version = ssl.PROTOCOL_TLS_CLIENT tls_configuration = ldap3.Tls(validate=tls_validate, version=tls_version) self.assertTrue( @@ -313,7 +256,6 @@ class LDAP_TestCase: else: tls_validate = ssl.CERT_NONE - tls_version = ssl.PROTOCOL_TLS_CLIENT tls_configuration = ldap3.Tls(validate=tls_validate, version=tls_version) self.assertTrue(kwargs["auto_bind"], "ldap3.Connection must autobind") @@ -347,7 +289,7 @@ class LDAP_TestCase: ) self.assertTrue( - type(function_return) is ldap3.core.connection.Connection, + type(function_return) is Connection, "The return type must be of ldap3.Connection", ) @@ -363,25 +305,21 @@ class LDAP_TestCase: ) @mock_ldap_connection - def test_get_ldap_client_settings(self): - + def test_get_ldap_client_settings(self: TestCase): result = self.test_class.get_ldap_client_settings() self.assertIsInstance(result, dict) - self.assertTrue(result["enabled"] == self.doc["enabled"]) # settings should match doc localdoc = self.doc.copy() localdoc["enabled"] = False frappe.get_doc(localdoc).save() - result = self.test_class.get_ldap_client_settings() self.assertFalse(result["enabled"]) # must match the edited doc @mock_ldap_connection - def test_update_user_fields(self): - + def test_update_user_fields(self: TestCase): test_user_data = { "username": "posix.user", "email": "posix.user1@unit.testing", @@ -391,11 +329,8 @@ class LDAP_TestCase: "phone": "08 1234 5678", "mobile_no": "0421 123 456", } - test_user = frappe.get_doc("User", test_user_data["email"]) - self.test_class.update_user_fields(test_user, test_user_data) - updated_user = frappe.get_doc("User", test_user_data["email"]) self.assertTrue(updated_user.middle_name == test_user_data["middle_name"]) @@ -404,8 +339,7 @@ class LDAP_TestCase: self.assertTrue(updated_user.mobile_no == test_user_data["mobile_no"]) @mock_ldap_connection - def test_sync_roles(self): - + def test_sync_roles(self: TestCase): if self.TEST_LDAP_SERVER.lower() == "openldap": test_user_data = { "posix.user1": [ @@ -457,9 +391,8 @@ class LDAP_TestCase: user.insert(ignore_permissions=True) for test_user in test_user_data: - - test_user_doc = frappe.get_doc("User", test_user + "@unit.testing") - test_user_roles = frappe.get_roles(test_user + "@unit.testing") + test_user_doc = frappe.get_doc("User", f"{test_user}@unit.testing") + test_user_roles = frappe.get_roles(f"{test_user}@unit.testing") self.assertTrue( len(test_user_roles) == 2, "User should only be a part of the All and Guest roles" @@ -467,28 +400,22 @@ class LDAP_TestCase: self.test_class.sync_roles(test_user_doc, test_user_data[test_user]) # update user roles - frappe.get_doc("User", test_user + "@unit.testing") - updated_user_roles = frappe.get_roles(test_user + "@unit.testing") + frappe.get_doc("User", f"{test_user}@unit.testing") + updated_user_roles = frappe.get_roles(f"{test_user}@unit.testing") self.assertTrue( len(updated_user_roles) == len(test_user_data[test_user]), - "syncing of the user roles failed. {} != {} for user {}".format( - len(updated_user_roles), len(test_user_data[test_user]), test_user - ), + f"syncing of the user roles failed. {len(updated_user_roles)} != {len(test_user_data[test_user])} for user {test_user}", ) for user_role in updated_user_roles: # match each users role mapped to ldap groups - self.assertTrue( role_to_group_map[user_role] in test_user_data[test_user], - "during sync_roles(), the user was given role {} which should not have occured".format( - user_role - ), + f"during sync_roles(), the user was given role {user_role} which should not have occured", ) @mock_ldap_connection - def test_create_or_update_user(self): - + def test_create_or_update_user(self: TestCase): test_user_data = { "posix.user1": [ "Users", @@ -498,28 +425,21 @@ class LDAP_TestCase: "frappe_default_guest", ], } - test_user = "posix.user1" - frappe.get_doc("User", test_user + "@unit.testing").delete() # remove user 1 + frappe.get_doc("User", f"{test_user}@unit.testing").delete() with self.assertRaises( frappe.exceptions.DoesNotExistError ): # ensure user deleted so function can be tested - frappe.get_doc("User", test_user + "@unit.testing") + frappe.get_doc("User", f"{test_user}@unit.testing") with mock.patch( "frappe.integrations.doctype.ldap_settings.ldap_settings.LDAPSettings.update_user_fields" ) as update_user_fields_method: - - update_user_fields_method.return_value = None - with mock.patch( "frappe.integrations.doctype.ldap_settings.ldap_settings.LDAPSettings.sync_roles" ) as sync_roles_method: - - sync_roles_method.return_value = None - # New user self.test_class.create_or_update_user(self.user1doc, test_user_data[test_user]) @@ -538,15 +458,12 @@ class LDAP_TestCase: ) @mock_ldap_connection - def test_get_ldap_attributes(self): - + def test_get_ldap_attributes(self: TestCase): method_return = self.test_class.get_ldap_attributes() - self.assertTrue(type(method_return) is list) @mock_ldap_connection - def test_fetch_ldap_groups(self): - + def test_fetch_ldap_groups(self: TestCase): if self.TEST_LDAP_SERVER.lower() == "openldap": test_users = {"posix.user": ["Users", "Administrators"], "posix.user2": ["Users", "Group3"]} elif self.TEST_LDAP_SERVER.lower() == "active directory": @@ -556,7 +473,6 @@ class LDAP_TestCase: } for test_user in test_users: - self.connection.search( search_base=self.ldap_user_path, search_filter=self.TEST_LDAP_SEARCH_STRING.format(test_user), @@ -569,18 +485,13 @@ class LDAP_TestCase: self.assertTrue(len(method_return) == len(test_users[test_user])) for returned_group in method_return: - self.assertTrue(returned_group in test_users[test_user]) @mock_ldap_connection - def test_authenticate(self): - + def test_authenticate(self: TestCase): with mock.patch( "frappe.integrations.doctype.ldap_settings.ldap_settings.LDAPSettings.fetch_ldap_groups" ) as fetch_ldap_groups_function: - - fetch_ldap_groups_function.return_value = None - self.assertTrue(self.test_class.authenticate("posix.user", "posix_user_password")) self.assertTrue( @@ -599,25 +510,19 @@ class LDAP_TestCase: ] # All invalid users should return 'invalid username or password' for username, password in enumerate(invalid_users): - with self.assertRaises(frappe.exceptions.ValidationError) as display_massage: - self.test_class.authenticate(username, password) self.assertTrue( str(display_massage.exception).lower() == "invalid username or password", - "invalid credentials passed authentication [user: {}, password: {}]".format( - username, password - ), + f"invalid credentials passed authentication [user: {username}, password: {password}]", ) @mock_ldap_connection - def test_complex_ldap_search_filter(self): - + def test_complex_ldap_search_filter(self: TestCase): ldap_search_filters = self.TEST_VALUES_LDAP_COMPLEX_SEARCH_STRING for search_filter in ldap_search_filters: - self.test_class.ldap_search_string = search_filter if ( @@ -633,56 +538,45 @@ class LDAP_TestCase: else: self.assertTrue(self.test_class.authenticate("posix.user", "posix_user_password")) - def test_reset_password(self): - + def test_reset_password(self: TestCase): self.test_class = LDAPSettings(self.doc) # Create a clean doc localdoc = self.doc.copy() - localdoc["enabled"] = False frappe.get_doc(localdoc).save() with mock.patch( - "frappe.integrations.doctype.ldap_settings.ldap_settings.LDAPSettings.connect_to_ldap" + "frappe.integrations.doctype.ldap_settings.ldap_settings.LDAPSettings.connect_to_ldap", + return_value=self.connection, ) as connect_to_ldap: - connect_to_ldap.return_value = self.connection - with self.assertRaises( frappe.exceptions.ValidationError ) as validation: # Fail if username string used self.test_class.reset_password("posix.user", "posix_user_password") - self.assertTrue(str(validation.exception) == "No LDAP User found for email: posix.user") - try: + with contextlib.suppress(Exception): self.test_class.reset_password( "posix.user1@unit.testing", "posix_user_password" ) # Change Password - - except Exception: # An exception from the tested class is ok, as long as the connection to LDAP was made writeable - pass - connect_to_ldap.assert_called_with(self.base_dn, self.base_password, read_only=False) @mock_ldap_connection - def test_convert_ldap_entry_to_dict(self): - + def test_convert_ldap_entry_to_dict(self: TestCase): self.connection.search( search_base=self.ldap_user_path, search_filter=self.TEST_LDAP_SEARCH_STRING.format("posix.user"), attributes=self.test_class.get_ldap_attributes(), ) - test_ldap_entry = self.connection.entries[0] - method_return = self.test_class.convert_ldap_entry_to_dict(test_ldap_entry) self.assertTrue(type(method_return) is dict) # must be dict self.assertTrue(len(method_return) == 6) # there are 6 fields in mock_ldap for use -class Test_OpenLDAP(LDAP_TestCase, unittest.TestCase): +class Test_OpenLDAP(LDAP_TestCase, TestCase): TEST_LDAP_SERVER = "OpenLDAP" TEST_LDAP_SEARCH_STRING = "(uid={0})" DOCUMENT_GROUP_MAPPINGS = [ @@ -706,7 +600,7 @@ class Test_OpenLDAP(LDAP_TestCase, unittest.TestCase): ] -class Test_ActiveDirectory(LDAP_TestCase, unittest.TestCase): +class Test_ActiveDirectory(LDAP_TestCase, TestCase): TEST_LDAP_SERVER = "Active Directory" TEST_LDAP_SEARCH_STRING = "(samaccountname={0})" DOCUMENT_GROUP_MAPPINGS = [ From 26f4654b31a72b3b1233ae0ca83b3aa1b5a7d20b Mon Sep 17 00:00:00 2001 From: Gavin D'souza Date: Mon, 18 Jul 2022 18:22:12 +0530 Subject: [PATCH 5/7] test: Check user role & type updated via LDAP --- .../integrations/doctype/ldap_settings/test_ldap_settings.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/frappe/integrations/doctype/ldap_settings/test_ldap_settings.py b/frappe/integrations/doctype/ldap_settings/test_ldap_settings.py index 8587c6c656..a158d42d61 100644 --- a/frappe/integrations/doctype/ldap_settings/test_ldap_settings.py +++ b/frappe/integrations/doctype/ldap_settings/test_ldap_settings.py @@ -121,6 +121,7 @@ class LDAP_TestCase: "default_role": "Newsletter Manager", "ldap_groups": cls.DOCUMENT_GROUP_MAPPINGS, "ldap_group_field": "", + "default_user_type": "System User", } cls.server = Server(host=cls.ldap_server, port=389, get_info=cls.LDAP_SCHEMA) @@ -338,6 +339,9 @@ class LDAP_TestCase: self.assertTrue(updated_user.phone == test_user_data["phone"]) self.assertTrue(updated_user.mobile_no == test_user_data["mobile_no"]) + self.assertEqual(updated_user.user_type, self.test_class.default_user_type) + self.assertIn(self.test_class.default_role, frappe.get_roles(updated_user.name)) + @mock_ldap_connection def test_sync_roles(self: TestCase): if self.TEST_LDAP_SERVER.lower() == "openldap": From c7726d6394d26f7e72a5abb3266b576ef9c4c87c Mon Sep 17 00:00:00 2001 From: Gavin D'souza Date: Tue, 19 Jul 2022 15:00:23 +0530 Subject: [PATCH 6/7] fix: Pick default_role for Sytem User type only --- .../doctype/ldap_settings/ldap_settings.py | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/frappe/integrations/doctype/ldap_settings/ldap_settings.py b/frappe/integrations/doctype/ldap_settings/ldap_settings.py index f9d083a12f..735b96968c 100644 --- a/frappe/integrations/doctype/ldap_settings/ldap_settings.py +++ b/frappe/integrations/doctype/ldap_settings/ldap_settings.py @@ -145,7 +145,10 @@ class LDAPSettings(Document): def sync_roles(self, user: "User", additional_groups: list = None): current_roles = {d.role for d in user.get("roles")} - needed_roles = {self.default_role} + if self.default_user_type == "System User": + needed_roles = {self.default_role} + else: + needed_roles = set() lower_groups = [g.lower() for g in additional_groups or []] all_mapped_roles = {r.erpnext_role for r in self.ldap_groups} @@ -170,15 +173,12 @@ class LDAPSettings(Document): user = frappe.get_doc("User", user_data["email"]) LDAPSettings.update_user_fields(user=user, user_data=user_data) else: - doc = user_data - doc.update( - { - "doctype": "User", - "send_welcome_email": 0, - "language": "", - "user_type": self.default_user_type, - } - ) + doc = user_data | { + "doctype": "User", + "send_welcome_email": 0, + "language": "", + "user_type": self.default_user_type, + } user = frappe.get_doc(doc) user.insert(ignore_permissions=True) From c55bb98482bb32589b46380373de8edd62ab980a Mon Sep 17 00:00:00 2001 From: Gavin D'souza Date: Tue, 19 Jul 2022 15:01:16 +0530 Subject: [PATCH 7/7] test: LDAP test for website user creation --- .../ldap_settings/test_ldap_settings.py | 45 +++++++++++++------ 1 file changed, 32 insertions(+), 13 deletions(-) diff --git a/frappe/integrations/doctype/ldap_settings/test_ldap_settings.py b/frappe/integrations/doctype/ldap_settings/test_ldap_settings.py index a158d42d61..9080e0c82a 100644 --- a/frappe/integrations/doctype/ldap_settings/test_ldap_settings.py +++ b/frappe/integrations/doctype/ldap_settings/test_ldap_settings.py @@ -23,6 +23,11 @@ class LDAP_TestCase: LDAP_LDIF_JSON = None TEST_VALUES_LDAP_COMPLEX_SEARCH_STRING = None + # for adding type hints during development ^_^ + assertTrue = TestCase.assertTrue + assertEqual = TestCase.assertEqual + assertIn = TestCase.assertIn + def mock_ldap_connection(f): @functools.wraps(f) def wrapped(self, *args, **kwargs): @@ -51,6 +56,8 @@ class LDAP_TestCase: frappe.get_doc("User", "posix.user1@unit.testing").delete() with contextlib.suppress(Exception): frappe.get_doc("User", "posix.user2@unit.testing").delete() + with contextlib.suppress(Exception): + frappe.get_doc("User", "website_ldap_user@test.com").delete() @classmethod def setUpClass(cls): @@ -153,7 +160,7 @@ class LDAP_TestCase: cls.connection = None @mock_ldap_connection - def test_mandatory_fields(self: TestCase): + def test_mandatory_fields(self): mandatory_fields = [ "ldap_server_url", "ldap_directory_server", @@ -190,7 +197,7 @@ class LDAP_TestCase: self.fail(f"Document LDAP Settings field [{non_mandatory_field}] should not be mandatory") @mock_ldap_connection - def test_validation_ldap_search_string(self: TestCase): + def test_validation_ldap_search_string(self): invalid_ldap_search_strings = [ "", "uid={0}", @@ -209,7 +216,7 @@ class LDAP_TestCase: frappe.get_doc(localdoc).save() self.fail(f"LDAP search string [{invalid_search_string}] should not validate") - def test_connect_to_ldap(self: TestCase): + def test_connect_to_ldap(self): # prevent these parameters for security or lack of the und user from being able to configure prevent_connection_parameters = { "mode": { @@ -306,7 +313,7 @@ class LDAP_TestCase: ) @mock_ldap_connection - def test_get_ldap_client_settings(self: TestCase): + def test_get_ldap_client_settings(self): result = self.test_class.get_ldap_client_settings() self.assertIsInstance(result, dict) @@ -320,7 +327,7 @@ class LDAP_TestCase: self.assertFalse(result["enabled"]) # must match the edited doc @mock_ldap_connection - def test_update_user_fields(self: TestCase): + def test_update_user_fields(self): test_user_data = { "username": "posix.user", "email": "posix.user1@unit.testing", @@ -343,7 +350,19 @@ class LDAP_TestCase: self.assertIn(self.test_class.default_role, frappe.get_roles(updated_user.name)) @mock_ldap_connection - def test_sync_roles(self: TestCase): + def test_create_website_user(self): + new_test_user_data = { + "username": "website_ldap_user.test", + "email": "website_ldap_user@test.com", + "first_name": "Website User - LDAP Test", + } + self.test_class.default_user_type = "Website User" + self.test_class.create_or_update_user(user_data=new_test_user_data, groups=[]) + new_user = frappe.get_doc("User", new_test_user_data["email"]) + self.assertEqual(new_user.user_type, "Website User") + + @mock_ldap_connection + def test_sync_roles(self): if self.TEST_LDAP_SERVER.lower() == "openldap": test_user_data = { "posix.user1": [ @@ -419,7 +438,7 @@ class LDAP_TestCase: ) @mock_ldap_connection - def test_create_or_update_user(self: TestCase): + def test_create_or_update_user(self): test_user_data = { "posix.user1": [ "Users", @@ -462,12 +481,12 @@ class LDAP_TestCase: ) @mock_ldap_connection - def test_get_ldap_attributes(self: TestCase): + def test_get_ldap_attributes(self): method_return = self.test_class.get_ldap_attributes() self.assertTrue(type(method_return) is list) @mock_ldap_connection - def test_fetch_ldap_groups(self: TestCase): + def test_fetch_ldap_groups(self): if self.TEST_LDAP_SERVER.lower() == "openldap": test_users = {"posix.user": ["Users", "Administrators"], "posix.user2": ["Users", "Group3"]} elif self.TEST_LDAP_SERVER.lower() == "active directory": @@ -492,7 +511,7 @@ class LDAP_TestCase: self.assertTrue(returned_group in test_users[test_user]) @mock_ldap_connection - def test_authenticate(self: TestCase): + def test_authenticate(self): with mock.patch( "frappe.integrations.doctype.ldap_settings.ldap_settings.LDAPSettings.fetch_ldap_groups" ) as fetch_ldap_groups_function: @@ -523,7 +542,7 @@ class LDAP_TestCase: ) @mock_ldap_connection - def test_complex_ldap_search_filter(self: TestCase): + def test_complex_ldap_search_filter(self): ldap_search_filters = self.TEST_VALUES_LDAP_COMPLEX_SEARCH_STRING for search_filter in ldap_search_filters: @@ -542,7 +561,7 @@ class LDAP_TestCase: else: self.assertTrue(self.test_class.authenticate("posix.user", "posix_user_password")) - def test_reset_password(self: TestCase): + def test_reset_password(self): self.test_class = LDAPSettings(self.doc) # Create a clean doc @@ -567,7 +586,7 @@ class LDAP_TestCase: connect_to_ldap.assert_called_with(self.base_dn, self.base_password, read_only=False) @mock_ldap_connection - def test_convert_ldap_entry_to_dict(self: TestCase): + def test_convert_ldap_entry_to_dict(self): self.connection.search( search_base=self.ldap_user_path, search_filter=self.TEST_LDAP_SEARCH_STRING.format("posix.user"),