diff --git a/frappe/email/doctype/email_queue/email_queue.py b/frappe/email/doctype/email_queue/email_queue.py index 221f3fbb31..dfad6d9ce8 100644 --- a/frappe/email/doctype/email_queue/email_queue.py +++ b/frappe/email/doctype/email_queue/email_queue.py @@ -16,6 +16,7 @@ from frappe.core.utils import html2text from frappe.email.doctype.email_account.email_account import EmailAccount from frappe.email.email_body import add_attachment, get_email, get_formatted_html from frappe.email.queue import get_unsubcribed_url, get_unsubscribe_message +from frappe.email.smtp import SMTPServer from frappe.model.document import Document from frappe.query_builder import DocType, Interval from frappe.query_builder.functions import Now @@ -99,7 +100,7 @@ class EmailQueue(Document): def get_email_account(self): if self.email_account: - return frappe.get_doc("Email Account", self.email_account) + return frappe.get_cached_doc("Email Account", self.email_account) return EmailAccount.find_outgoing( match_by_email=self.sender, match_by_doctype=self.reference_doctype @@ -115,12 +116,12 @@ class EmailQueue(Document): return True - def send(self, is_background_task=False): + def send(self, is_background_task: bool = False, smtp_server_instance: SMTPServer = None): """Send emails to recipients.""" if not self.can_send_now(): return - with SendMailContext(self, is_background_task) as ctx: + with SendMailContext(self, is_background_task, smtp_server_instance) as ctx: message = None for recipient in self.recipients: if not recipient.is_mail_to_be_sent(): @@ -169,21 +170,32 @@ class EmailQueue(Document): @task(queue="short") -def send_mail(email_queue_name, is_background_task=False): - """This is equalent to EmqilQueue.send. +def send_mail(email_queue_name, is_background_task=False, smtp_server_instance: SMTPServer = None): + """This is equivalent to EmailQueue.send. This provides a way to make sending mail as a background job. """ record = EmailQueue.find(email_queue_name) - record.send(is_background_task=is_background_task) + record.send(is_background_task=is_background_task, smtp_server_instance=smtp_server_instance) class SendMailContext: - def __init__(self, queue_doc: Document, is_background_task: bool = False): + def __init__( + self, + queue_doc: Document, + is_background_task: bool = False, + smtp_server_instance: SMTPServer = None, + ): self.queue_doc: EmailQueue = queue_doc self.is_background_task = is_background_task self.email_account_doc = queue_doc.get_email_account() - self.smtp_server = self.email_account_doc.get_smtp_server() + + self.smtp_server = smtp_server_instance or self.email_account_doc.get_smtp_server() + + # if smtp_server_instance is passed, then retain smtp session + # Note: smtp session will have to be manually closed + self.retain_smtp_session = bool(smtp_server_instance) + self.sent_to = [rec.recipient for rec in self.queue_doc.recipients if rec.is_main_sent()] def __enter__(self): @@ -200,11 +212,13 @@ class SendMailContext: JobTimeoutException, ] - self.smtp_server.quit() + if not self.retain_smtp_session: + self.smtp_server.quit() + self.log_exception(exc_type, exc_val, exc_tb) if exc_type in exceptions: - email_status = (self.sent_to and "Partially Sent") or "Not Sent" + email_status = "Partially Sent" if self.sent_to else "Not Sent" self.queue_doc.update_status(status=email_status, commit=True) elif exc_type: if self.queue_doc.retry < get_email_retry_limit(): @@ -216,12 +230,12 @@ class SendMailContext: email_status = self.is_mail_sent_to_all() and "Sent" email_status = email_status or (self.sent_to and "Partially Sent") or "Not Sent" - update_fields = {"status": email_status} - if self.email_account_doc.is_exists_in_db(): - update_fields["email_account"] = self.email_account_doc.name - else: - update_fields["email_account"] = None - + update_fields = { + "status": email_status, + "email_account": self.email_account_doc.name + if self.email_account_doc.is_exists_in_db() + else None, + } self.queue_doc.update_status(**update_fields, commit=True) def log_exception(self, exc_type, exc_val, exc_tb): @@ -249,6 +263,7 @@ class SendMailContext: return Parser(policy=SMTPUTF8).parsestr(message) def message_placeholder(self, placeholder_key): + # sourcery skip: avoid-builtin-shadow map = { "tracker": "", "unsubscribe_url": "", @@ -269,7 +284,7 @@ class SendMailContext: ) message = message.replace(self.message_placeholder("cc"), self.get_receivers_str()) message = message.replace( - self.message_placeholder("recipient"), self.get_receipient_str(recipient_email) + self.message_placeholder("recipient"), self.get_recipient_str(recipient_email) ) message = self.include_attachments(message) return message @@ -304,14 +319,11 @@ class SendMailContext: to_str = ", ".join(self.queue_doc.to) cc_str = ", ".join(self.queue_doc.cc) message = f"This email was sent to {to_str}" - message = message + f" and copied to {cc_str}" if cc_str else message + message = f"{message} and copied to {cc_str}" if cc_str else message return message - def get_receipient_str(self, recipient_email): - message = "" - if self.queue_doc.expose_recipients != "header": - message = recipient_email - return message + def get_recipient_str(self, recipient_email): + return recipient_email if self.queue_doc.expose_recipients != "header" else "" def include_attachments(self, message): message_obj = self.get_message_object(message) @@ -628,7 +640,6 @@ class QueueBuilder: if not (final_recipients + self.final_cc()): return [] - email_queues = [] queue_data = self.as_dict(include_recipients=False) if not queue_data: return [] @@ -636,17 +647,31 @@ class QueueBuilder: if not queue_separately: recipients = list(set(final_recipients + self.final_cc() + self.bcc)) q = EmailQueue.new({**queue_data, **{"recipients": recipients}}, ignore_permissions=True) - email_queues.append(q) + send_now and q.send() else: - for r in final_recipients: - recipients = [r] if email_queues else list(set([r] + self.final_cc() + self.bcc)) - q = EmailQueue.new({**queue_data, **{"recipients": recipients}}, ignore_permissions=True) - email_queues.append(q) + if send_now and len(recipients) >= 1000: + # force queueing if there are too many recipients to avoid timeouts + send_now = False + for recipients in frappe.utils.create_batch(final_recipients, 1000): + frappe.enqueue( + self.send_emails, + queue_data=queue_data, + final_recipients=recipients, + now=send_now, + ) - if send_now: - for doc in email_queues: - doc.send() - return email_queues + def send_emails(self, queue_data, final_recipients): + # This is used to bulk send emails from same sender to multiple recipients separately + # This re-uses smtp server instance to minimize the cost of new session creation + smtp_server_instance = None + for r in final_recipients: + recipients = list(set([r] + self.final_cc() + self.bcc)) + q = EmailQueue.new({**queue_data, **{"recipients": recipients}}, ignore_permissions=True) + if not smtp_server_instance: + email_account = q.get_email_account() + smtp_server_instance = email_account.get_smtp_server() + q.send(smtp_server_instance=smtp_server_instance) + smtp_server_instance.quit() def as_dict(self, include_recipients=True): email_account = self.get_outgoing_email_account()