diff --git a/frappe/core/doctype/user/user.py b/frappe/core/doctype/user/user.py index 5665ec1c84..37abbcca91 100644 --- a/frappe/core/doctype/user/user.py +++ b/frappe/core/doctype/user/user.py @@ -1462,6 +1462,17 @@ def impersonate(user: str, reason: str): ) notification.set("type", "Alert") notification.insert(ignore_permissions=True) + # notify user via email too + user_email = frappe.db.get_value("User", user, "email") + email_message = _( + "User {0} has started an impersonation session as you.

Reason provided: {1}" + ).format(escape_html(impersonator), escape_html(reason)) + + frappe.sendmail( + recipients=[user_email], + subject=_("Security Alert: Your account is being impersonated"), + content=email_message, + ) frappe.local.login_manager.impersonate(user) diff --git a/frappe/tests/test_email.py b/frappe/tests/test_email.py index 6d7a7b2c16..788fe1e476 100644 --- a/frappe/tests/test_email.py +++ b/frappe/tests/test_email.py @@ -308,6 +308,27 @@ class TestEmail(IntegrationTestCase): if changed_flag: email_account.enable_incoming = False + def test_impersonation_alert_queue(self): + """Verifies that impersonation alerts are sent as mail too""" + from frappe.core.doctype.user.user import impersonate + + target_user = "testimpersonate@example.com" + frappe.db.delete("Email Queue Recipient", {"recipient": target_user}) # sanity + if not frappe.db.exists("User", target_user): + frappe.get_doc({"doctype": "User", "email": target_user, "first_name": "Target"}).insert( + ignore_permissions=True + ) + reason = "Testing Security Alert" + impersonate(user=target_user, reason=reason) + self.assertEqual(frappe.session.user, target_user) # test if impersonation worked + self.assertTrue(frappe.db.exists("Activity Log", {"user": target_user, "operation": "Impersonate"})) + email_queued = frappe.db.exists( + "Email Queue Recipient", {"recipient": target_user, "status": "Not Sent"} + ) + self.assertTrue(email_queued, f"Impersonation email was not queued for {target_user}") + + frappe.db.delete("User", {"email": target_user}) + class TestVerifiedRequests(IntegrationTestCase): def test_round_trip(self):