seitime-frappe/frappe/tests/test_email.py
2026-02-22 11:35:19 +05:30

493 lines
16 KiB
Python

# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and Contributors
# License: MIT. See LICENSE
import email
import re
from unittest.mock import patch
import requests
import frappe
from frappe.core.doctype.communication.email import make
from frappe.desk.form.load import get_attachments
from frappe.email.doctype.email_account.test_email_account import TestEmailAccount
from frappe.email.doctype.email_queue.email_queue import QueueBuilder
from frappe.query_builder.utils import db_type_is
from frappe.tests import IntegrationTestCase
from frappe.tests.test_query_builder import run_only_if
EXTRA_TEST_RECORD_DEPENDENCIES = ["Email Account"]
class TestEmail(IntegrationTestCase):
def setUp(self):
frappe.db.delete("Email Unsubscribe")
frappe.db.delete("Email Queue")
frappe.db.delete("Email Queue Recipient")
def test_email_queue(self, send_after=None):
frappe.sendmail(
recipients=["test@example.com", "test1@example.com"],
sender="admin@example.com",
reference_doctype="User",
reference_name="Administrator",
subject="Testing Queue",
message="This mail is queued!",
unsubscribe_message="Unsubscribe",
send_after=send_after,
)
email_queue = frappe.db.sql(
"""select name,message from `tabEmail Queue` where status='Not Sent'""", as_dict=1
)
self.assertEqual(len(email_queue), 1)
queue_recipients = [
r.recipient
for r in frappe.db.sql(
"""SELECT recipient FROM `tabEmail Queue Recipient`
WHERE status='Not Sent'""",
as_dict=1,
)
]
self.assertTrue("test@example.com" in queue_recipients)
self.assertTrue("test1@example.com" in queue_recipients)
self.assertEqual(len(queue_recipients), 2)
self.assertTrue("<!--unsubscribe_url-->" in email_queue[0]["message"])
def test_send_after(self):
self.test_email_queue(send_after=1)
from frappe.email.queue import flush
flush()
email_queue = frappe.db.sql("""select name from `tabEmail Queue` where status='Sent'""", as_dict=1)
self.assertEqual(len(email_queue), 0)
def test_flush(self):
self.test_email_queue()
from frappe.email.queue import flush
flush()
email_queue = frappe.db.sql("""select name from `tabEmail Queue` where status='Sent'""", as_dict=1)
self.assertEqual(len(email_queue), 1)
queue_recipients = [
r.recipient
for r in frappe.db.sql(
"""select recipient from `tabEmail Queue Recipient`
where status='Sent'""",
as_dict=1,
)
]
self.assertTrue("test@example.com" in queue_recipients)
self.assertTrue("test1@example.com" in queue_recipients)
self.assertEqual(len(queue_recipients), 2)
self.assertTrue("Unsubscribe" in frappe.safe_decode(frappe.flags.sent_mail))
def test_cc_header_always_visible(self):
"""Test that CC header is always visible regardless of expose_recipients setting.
CC (Carbon Copy) should always be visible to all recipients as per email semantics.
This enables 'Reply All' functionality. If sender wants hidden recipients, they should use BCC.
"""
frappe.sendmail(
recipients=["test@example.com"],
cc=["test1@example.com"],
sender="admin@example.com",
reference_doctype="User",
reference_name="Administrator",
subject="Testing CC Header Visibility",
message="CC should be visible without expose_recipients",
unsubscribe_message="Unsubscribe",
# No expose_recipients set - CC should still be visible
)
email_queue = frappe.db.sql(
"""select name from `tabEmail Queue` where status='Not Sent'""", as_dict=1
)
self.assertEqual(len(email_queue), 1)
queue_recipients = [
r.recipient
for r in frappe.db.sql(
"""select recipient from `tabEmail Queue Recipient`
where status='Not Sent'""",
as_dict=1,
)
]
self.assertTrue("test@example.com" in queue_recipients)
self.assertTrue("test1@example.com" in queue_recipients)
message = frappe.db.sql(
"""select message from `tabEmail Queue`
where status='Not Sent'""",
as_dict=1,
)[0].message
# CC should be visible even without expose_recipients
self.assertTrue("CC: test1@example.com" in message)
# TO should use placeholder (hidden) when expose_recipients is not set
self.assertTrue("To: <!--recipient-->" in message)
def test_cc_header_with_expose_recipients(self):
"""Test CC and TO visibility when expose_recipients='header' is set.
With expose_recipients='header', both TO and CC should be visible in headers.
"""
frappe.sendmail(
recipients=["test@example.com"],
cc=["test1@example.com"],
sender="admin@example.com",
reference_doctype="User",
reference_name="Administrator",
subject="Testing Email Queue",
message="This is mail is queued!",
unsubscribe_message="Unsubscribe",
expose_recipients="header",
)
email_queue = frappe.db.sql(
"""select name from `tabEmail Queue` where status='Not Sent'""", as_dict=1
)
self.assertEqual(len(email_queue), 1)
queue_recipients = [
r.recipient
for r in frappe.db.sql(
"""select recipient from `tabEmail Queue Recipient`
where status='Not Sent'""",
as_dict=1,
)
]
self.assertTrue("test@example.com" in queue_recipients)
self.assertTrue("test1@example.com" in queue_recipients)
message = frappe.db.sql(
"""select message from `tabEmail Queue`
where status='Not Sent'""",
as_dict=1,
)[0].message
# Both TO and CC should be visible with expose_recipients="header"
self.assertTrue("To: test@example.com" in message)
self.assertTrue("CC: test1@example.com" in message)
def test_cc_footer(self):
# test if sending with cc's makes it into header
frappe.sendmail(
recipients=["test@example.com"],
cc=["test1@example.com"],
sender="admin@example.com",
reference_doctype="User",
reference_name="Administrator",
subject="Testing Email Queue",
message="This is mail is queued!",
unsubscribe_message="Unsubscribe",
expose_recipients="footer",
now=True,
)
frappe.db.commit()
email_queue = frappe.db.sql("""select name from `tabEmail Queue` where status='Sent'""", as_dict=1)
self.assertEqual(len(email_queue), 1)
queue_recipients = [
r.recipient
for r in frappe.db.sql(
"""select recipient from `tabEmail Queue Recipient`
where status='Sent'""",
as_dict=1,
)
]
self.assertTrue("test@example.com" in queue_recipients)
self.assertTrue("test1@example.com" in queue_recipients)
self.assertTrue(
"This email was sent to test@example.com and copied to test1@example.com"
in frappe.safe_decode(frappe.flags.sent_mail)
)
def test_expose(self):
from frappe.utils import set_request
from frappe.utils.verified_command import verify_request
frappe.sendmail(
recipients=["test@example.com"],
cc=["test1@example.com"],
sender="admin@example.com",
reference_doctype="User",
reference_name="Administrator",
subject="Testing Email Queue",
message="This is mail is queued!",
unsubscribe_message="Unsubscribe",
now=True,
)
frappe.db.commit()
email_queue = frappe.db.sql("""select name from `tabEmail Queue` where status='Sent'""", as_dict=1)
self.assertEqual(len(email_queue), 1)
queue_recipients = [
r.recipient
for r in frappe.db.sql(
"""select recipient from `tabEmail Queue Recipient`
where status='Sent'""",
as_dict=1,
)
]
self.assertTrue("test@example.com" in queue_recipients)
self.assertTrue("test1@example.com" in queue_recipients)
message = frappe.db.sql(
"""select message from `tabEmail Queue`
where status='Sent'""",
as_dict=1,
)[0].message
self.assertTrue("<!--recipient-->" in message)
email_obj = email.message_from_string(frappe.safe_decode(frappe.flags.sent_mail))
for part in email_obj.walk():
content = part.get_payload(decode=True)
if content:
eol = "\r\n"
query_string = re.search(
r"(?<=/api/method/frappe.email.queue.unsubscribe\?).*(?=" + eol + ")", content.decode()
).group(0)
set_request(method="GET", query_string=query_string)
self.assertTrue(verify_request())
break
def test_sender(self):
def _patched_assertion(email_account, assertion):
with patch.object(QueueBuilder, "get_outgoing_email_account", return_value=email_account):
frappe.sendmail(
recipients=["test1@example.com"],
sender="admin@example.com",
subject="Test Email Queue",
message="This mail is queued!",
now=True,
)
frappe.db.commit()
email_queue_sender = frappe.db.get_value("Email Queue", {"status": "Sent"}, "sender")
self.assertEqual(email_queue_sender, assertion)
email_account = frappe.get_doc("Email Account", "_Test Email Account 1")
email_account.default_outgoing = 1
email_account.always_use_account_name_as_sender_name = 0
email_account.always_use_account_email_id_as_sender = 0
_patched_assertion(email_account, "admin@example.com")
email_account.always_use_account_name_as_sender_name = 1
_patched_assertion(email_account, "_Test Email Account 1 <admin@example.com>")
email_account.always_use_account_name_as_sender_name = 0
email_account.always_use_account_email_id_as_sender = 1
_patched_assertion(email_account, '"admin@example.com" <test@example.com>')
email_account.always_use_account_name_as_sender_name = 1
_patched_assertion(email_account, "_Test Email Account 1 <test@example.com>")
def test_unsubscribe(self):
from frappe.email.queue import unsubscribe
unsubscribe(doctype="User", name="Administrator", email="test@example.com")
self.assertTrue(
frappe.db.get_value(
"Email Unsubscribe",
{"reference_doctype": "User", "reference_name": "Administrator", "email": "test@example.com"},
)
)
builder = QueueBuilder(
recipients=["test@example.com", "test1@example.com"],
sender="admin@example.com",
reference_doctype="User",
reference_name="Administrator",
subject="Testing Email Queue",
message="This is mail is queued!",
unsubscribe_message="Unsubscribe",
)
# don't send right now
builder.process()
email_queue = frappe.db.get_value("Email Queue", {"status": "Not Sent"})
queue_recipients = [
r.recipient
for r in frappe.db.sql(
"""select recipient from `tabEmail Queue Recipient`
where status='Not Sent'""",
as_dict=1,
)
]
self.assertFalse("test@example.com" in queue_recipients)
self.assertTrue("test1@example.com" in queue_recipients)
self.assertEqual(len(queue_recipients), 1)
frappe.get_doc("Email Queue", email_queue).send()
self.assertTrue("Unsubscribe" in frappe.safe_decode(frappe.flags.sent_mail))
def test_image_parsing(self):
import re
email_account = frappe.get_doc("Email Account", "_Test Email Account 1")
frappe.db.delete("Communication", {"sender": "sukh@yyy.com"})
with open(frappe.get_app_path("frappe", "tests", "data", "email_with_image.txt")) as raw:
messages = {
'"INBOX"': {"latest_messages": [raw.read()], "seen_status": {2: "UNSEEN"}, "uid_list": [2]}
}
email_account = frappe.get_doc("Email Account", "_Test Email Account 1")
changed_flag = False
if not email_account.enable_incoming:
email_account.enable_incoming = True
changed_flag = True
mails = TestEmailAccount.mocked_get_inbound_mails(email_account, messages)
# TODO: fix this flaky test! - 'IndexError: list index out of range' for `.process()` line
if not mails:
raise self.skipTest("No inbound mails found / Email Account wasn't patched properly")
communication = mails[0].process()
self.assertTrue(
re.search("""<img[^>]*src=["']/private/files/rtco1.png[^>]*>""", communication.content)
)
self.assertTrue(
re.search("""<img[^>]*src=["']/private/files/rtco2.png[^>]*>""", communication.content)
)
if changed_flag:
email_account.enable_incoming = False
def test_impersonation_alert_queue(self):
"""Verifies that impersonation alerts are sent as mail too"""
from frappe.core.doctype.user.user import impersonate
target_user = "testimpersonate@example.com"
frappe.db.delete("Email Queue Recipient", {"recipient": target_user}) # sanity
if not frappe.db.exists("User", target_user):
frappe.get_doc(
{"doctype": "User", "email": target_user, "first_name": "Target", "enabled": 1}
).insert(ignore_permissions=True)
with (
patch("frappe.sendmail") as mocked_sendmail,
patch("frappe.local.login_manager", create=True) as mocked_lm,
):
with patch("frappe.db.exists", return_value=True):
reason = "Testing Security Alert"
impersonate(user=target_user, reason=reason)
self.assertTrue(mocked_sendmail.called)
_, kwargs = mocked_sendmail.call_args
self.assertIn(target_user, kwargs.get("recipients"))
self.assertIn(reason, kwargs.get("content"))
mocked_lm.impersonate.assert_called_with(target_user)
# Cleanup
frappe.db.delete("User", {"email": target_user})
class TestVerifiedRequests(IntegrationTestCase):
def test_round_trip(self):
from frappe.utils import set_request
from frappe.utils.verified_command import get_signed_params, verify_request
test_cases = [{"xyz": "abc"}, {"email": "a@b.com", "user": "xyz"}]
for params in test_cases:
signed_url = get_signed_params(params)
set_request(method="GET", query_string=signed_url)
self.assertTrue(verify_request())
frappe.local.request = None
class TestEmailIntegrationTest(IntegrationTestCase):
"""Sends email to local SMTP server and verifies correctness.
SMTP4Dev runs as a service in unit test CI job.
If you need to run this test locally, you must setup SMTP4dev locally.
WARNING: SMTP4dev doesn't have stable API, it can break anytime.
"""
SMTP4DEV_WEB = "http://localhost:3000"
def setUp(self) -> None:
# Frappe code is configured to not attempting sending emails during test.
frappe.flags.testing_email = True
requests.delete(f"{self.SMTP4DEV_WEB}/api/Messages/*")
return super().setUp()
def tearDown(self) -> None:
frappe.flags.testing_email = False
return super().tearDown()
@classmethod
def get_last_sent_emails(cls):
return requests.get(f"{cls.SMTP4DEV_WEB}/api/Messages").json().get("results")
@classmethod
def get_message(cls, message_id):
return requests.get(f"{cls.SMTP4DEV_WEB}/api/Messages/{message_id}").json()
def test_send_email(self):
sender = "a@example.io"
recipients = "b@example.io,c@example.io"
subject = "checking if email works"
content = "is email working?"
email = frappe.sendmail(
sender=sender, recipients=recipients, subject=subject, content=content, now=True
)
frappe.db.commit()
email.reload()
self.assertEqual(email.sender, sender)
self.assertEqual(len(email.recipients), 2)
self.assertEqual(email.status, "Sent")
sent_mails = self.get_last_sent_emails()
self.assertEqual(len(sent_mails), 2)
for sent_mail in sent_mails:
self.assertEqual(sent_mail["from"], sender)
self.assertEqual(sent_mail["subject"], subject)
self.assertSetEqual(set(recipients.split(",")), {m["to"][0] for m in sent_mails})
@IntegrationTestCase.change_settings("System Settings", store_attached_pdf_document=1)
def test_store_attachments(self):
""" "attach print" feature just tells email queue which document to attach, this is not
actually stored unless system setting says so."""
name = make(
sender="test_sender@example.com",
recipients="test_recipient@example.com,test_recipient2@example.com",
content="test mail 001",
subject="test-mail-002",
doctype="Email Account",
name="_Test Email Account 1",
print_format="Standard",
send_email=True,
now=True,
).get("name")
frappe.db.commit()
communication = frappe.get_doc("Communication", name)
attachments = get_attachments(communication.doctype, communication.name)
self.assertEqual(len(attachments), 1)
file = frappe.get_doc("File", attachments[0].name)
self.assertGreater(file.file_size, 1000)
self.assertIn("pdf", file.file_name.lower())
sent_mails = self.get_last_sent_emails()
self.assertEqual(len(sent_mails), 2)
for mail in sent_mails:
email_content = self.get_message(mail["id"])
attachment_found = False
for part in email_content["parts"]:
for child_part in part["childParts"]:
if child_part["isAttachment"]:
attachment_found = True
self.assertIn("pdf", child_part["name"])
break
if not attachment_found:
self.fail("Attachment not found", email_content)