import frappe import frappe.utils from frappe import _ from frappe.core.doctype.user_invitation.user_invitation import UserInvitation @frappe.whitelist(methods=["POST"]) def invite_by_email( emails: str, roles: list[str], redirect_to_path: str, app_name: str = "frappe", **kwargs ) -> dict[str, list[str]]: UserInvitation.validate_role(app_name) # validate emails frappe.utils.validate_email_address(emails, throw=True) email_list = frappe.utils.split_emails(emails) if not email_list: frappe.throw(title=_("Invalid input"), msg=_("No email addresses to invite")) # get relevant data from the database disabled_user_emails = frappe.db.get_all( "User", filters={"email": ["in", email_list], "enabled": 0}, pluck="email", ) accepted_invite_emails = frappe.db.get_all( "User Invitation", filters={ "email": ["in", email_list], "status": "Accepted", "app_name": app_name, "user": ["is", "set"], }, pluck="email", ) pending_invite_emails = frappe.db.get_all( "User Invitation", filters={"email": ["in", email_list], "status": "Pending", "app_name": app_name}, pluck="email", ) # create invitation documents to_invite = list( set(email_list) - set(disabled_user_emails) - set(accepted_invite_emails) - set(pending_invite_emails) ) extra_args = get_allowed_invite_params(app_name, kwargs) for email in to_invite: frappe.get_doc( doctype="User Invitation", email=email, roles=[dict(role=role) for role in roles], app_name=app_name, redirect_to_path=redirect_to_path, **extra_args, ).insert(ignore_permissions=True) return { "disabled_user_emails": disabled_user_emails, "accepted_invite_emails": accepted_invite_emails, "pending_invite_emails": pending_invite_emails, "invited_emails": to_invite, } def get_allowed_invite_params(app_name: str, kwargs: dict) -> dict: # get extra args based on app_name allowed_params = frappe._dict() user_invitation_hook = frappe.get_hooks("user_invitation", app_name=app_name) if not isinstance(user_invitation_hook, dict): return {} extra_invite_params = user_invitation_hook.get("extra_invite_params", []) for param in extra_invite_params: if param in kwargs: allowed_params[param] = kwargs[param] return allowed_params @frappe.whitelist(allow_guest=True, methods=["GET"]) def accept_invitation(key: str) -> None: _accept_invitation(key, False) # `app_name` is required for security @frappe.whitelist(methods=["PATCH", "POST"]) def cancel_invitation(name: str, app_name: str): UserInvitation.validate_role(app_name) if not frappe.db.exists("User Invitation", name): frappe.throw(title=_("Error"), msg=_("Invitation not found")) invitation = frappe.get_doc("User Invitation", name) if invitation.app_name != app_name: # message is not specific enough for security frappe.throw(title=_("Error"), msg=_("Invitation not found")) if invitation.status == "Cancelled": return {"cancelled_now": False} if invitation.status != "Pending": frappe.throw(title=_("Error"), msg=_("Invitation cannot be cancelled")) invitation.flags.ignore_permissions = True return {"cancelled_now": invitation.cancel_invite()} @frappe.whitelist(methods=["GET"]) def get_pending_invitations(app_name: str): UserInvitation.validate_role(app_name) pending_invitations = frappe.db.get_all( "User Invitation", fields=["name", "email"], filters={"status": "Pending", "app_name": app_name} ) res = [] for pending_invitation in pending_invitations: roles = frappe.db.get_all("User Role", fields=["role"], filters={"parent": pending_invitation.name}) res.append( { "name": pending_invitation.name, "email": pending_invitation.email, "roles": [r.role for r in roles], } ) return res def _accept_invitation(key: str, in_test: bool) -> None: # get invitation hashed_key = frappe.utils.sha256_hash(key) invitation_name = frappe.db.get_value("User Invitation", filters={"key": hashed_key}) if not invitation_name: frappe.throw(title=_("Error"), msg=_("Invalid or expired key")) invitation = frappe.get_doc("User Invitation", invitation_name) # accept invitation invitation.accept(ignore_permissions=True) user = frappe.get_doc("User", invitation.email) should_update_password = not user.last_password_reset_date and not bool( frappe.get_system_settings("disable_user_pass_login") ) # set redirect_to redirect_to = frappe.utils.get_url(invitation.get_redirect_to_path()) if should_update_password: redirect_to = f"{user._reset_password()}&redirect_to=/{invitation.get_redirect_to_path()}" # GET requests do not cause an implicit commit frappe.db.commit() # nosemgrep if not in_test and not should_update_password: frappe.local.login_manager.login_as(invitation.email) # set response frappe.local.response["type"] = "redirect" frappe.local.response["location"] = redirect_to