perf: Batch email queuing while sending emails to large no. of recipients separately

- Batch email queue
- Re-use smtp_session while sending bulk emails (from same sender)
instead of creating new session for everytime.
Note: creation of new email session is very expensive.
This commit is contained in:
Suraj Shetty 2022-07-11 12:24:25 +05:30
parent fc99c22ea2
commit da52098bd3

View file

@ -16,6 +16,7 @@ from frappe.core.utils import html2text
from frappe.email.doctype.email_account.email_account import EmailAccount
from frappe.email.email_body import add_attachment, get_email, get_formatted_html
from frappe.email.queue import get_unsubcribed_url, get_unsubscribe_message
from frappe.email.smtp import SMTPServer
from frappe.model.document import Document
from frappe.query_builder import DocType, Interval
from frappe.query_builder.functions import Now
@ -99,7 +100,7 @@ class EmailQueue(Document):
def get_email_account(self):
if self.email_account:
return frappe.get_doc("Email Account", self.email_account)
return frappe.get_cached_doc("Email Account", self.email_account)
return EmailAccount.find_outgoing(
match_by_email=self.sender, match_by_doctype=self.reference_doctype
@ -115,12 +116,12 @@ class EmailQueue(Document):
return True
def send(self, is_background_task=False):
def send(self, is_background_task: bool = False, smtp_server_instance: SMTPServer = None):
"""Send emails to recipients."""
if not self.can_send_now():
return
with SendMailContext(self, is_background_task) as ctx:
with SendMailContext(self, is_background_task, smtp_server_instance) as ctx:
message = None
for recipient in self.recipients:
if not recipient.is_mail_to_be_sent():
@ -169,21 +170,32 @@ class EmailQueue(Document):
@task(queue="short")
def send_mail(email_queue_name, is_background_task=False):
"""This is equalent to EmqilQueue.send.
def send_mail(email_queue_name, is_background_task=False, smtp_server_instance: SMTPServer = None):
"""This is equivalent to EmailQueue.send.
This provides a way to make sending mail as a background job.
"""
record = EmailQueue.find(email_queue_name)
record.send(is_background_task=is_background_task)
record.send(is_background_task=is_background_task, smtp_server_instance=smtp_server_instance)
class SendMailContext:
def __init__(self, queue_doc: Document, is_background_task: bool = False):
def __init__(
self,
queue_doc: Document,
is_background_task: bool = False,
smtp_server_instance: SMTPServer = None,
):
self.queue_doc: EmailQueue = queue_doc
self.is_background_task = is_background_task
self.email_account_doc = queue_doc.get_email_account()
self.smtp_server = self.email_account_doc.get_smtp_server()
self.smtp_server = smtp_server_instance or self.email_account_doc.get_smtp_server()
# if smtp_server_instance is passed, then retain smtp session
# Note: smtp session will have to be manually closed
self.retain_smtp_session = bool(smtp_server_instance)
self.sent_to = [rec.recipient for rec in self.queue_doc.recipients if rec.is_main_sent()]
def __enter__(self):
@ -200,11 +212,13 @@ class SendMailContext:
JobTimeoutException,
]
self.smtp_server.quit()
if not self.retain_smtp_session:
self.smtp_server.quit()
self.log_exception(exc_type, exc_val, exc_tb)
if exc_type in exceptions:
email_status = (self.sent_to and "Partially Sent") or "Not Sent"
email_status = "Partially Sent" if self.sent_to else "Not Sent"
self.queue_doc.update_status(status=email_status, commit=True)
elif exc_type:
if self.queue_doc.retry < get_email_retry_limit():
@ -216,12 +230,12 @@ class SendMailContext:
email_status = self.is_mail_sent_to_all() and "Sent"
email_status = email_status or (self.sent_to and "Partially Sent") or "Not Sent"
update_fields = {"status": email_status}
if self.email_account_doc.is_exists_in_db():
update_fields["email_account"] = self.email_account_doc.name
else:
update_fields["email_account"] = None
update_fields = {
"status": email_status,
"email_account": self.email_account_doc.name
if self.email_account_doc.is_exists_in_db()
else None,
}
self.queue_doc.update_status(**update_fields, commit=True)
def log_exception(self, exc_type, exc_val, exc_tb):
@ -249,6 +263,7 @@ class SendMailContext:
return Parser(policy=SMTPUTF8).parsestr(message)
def message_placeholder(self, placeholder_key):
# sourcery skip: avoid-builtin-shadow
map = {
"tracker": "<!--email_open_check-->",
"unsubscribe_url": "<!--unsubscribe_url-->",
@ -269,7 +284,7 @@ class SendMailContext:
)
message = message.replace(self.message_placeholder("cc"), self.get_receivers_str())
message = message.replace(
self.message_placeholder("recipient"), self.get_receipient_str(recipient_email)
self.message_placeholder("recipient"), self.get_recipient_str(recipient_email)
)
message = self.include_attachments(message)
return message
@ -304,14 +319,11 @@ class SendMailContext:
to_str = ", ".join(self.queue_doc.to)
cc_str = ", ".join(self.queue_doc.cc)
message = f"This email was sent to {to_str}"
message = message + f" and copied to {cc_str}" if cc_str else message
message = f"{message} and copied to {cc_str}" if cc_str else message
return message
def get_receipient_str(self, recipient_email):
message = ""
if self.queue_doc.expose_recipients != "header":
message = recipient_email
return message
def get_recipient_str(self, recipient_email):
return recipient_email if self.queue_doc.expose_recipients != "header" else ""
def include_attachments(self, message):
message_obj = self.get_message_object(message)
@ -628,7 +640,6 @@ class QueueBuilder:
if not (final_recipients + self.final_cc()):
return []
email_queues = []
queue_data = self.as_dict(include_recipients=False)
if not queue_data:
return []
@ -636,17 +647,31 @@ class QueueBuilder:
if not queue_separately:
recipients = list(set(final_recipients + self.final_cc() + self.bcc))
q = EmailQueue.new({**queue_data, **{"recipients": recipients}}, ignore_permissions=True)
email_queues.append(q)
send_now and q.send()
else:
for r in final_recipients:
recipients = [r] if email_queues else list(set([r] + self.final_cc() + self.bcc))
q = EmailQueue.new({**queue_data, **{"recipients": recipients}}, ignore_permissions=True)
email_queues.append(q)
if send_now and len(recipients) >= 1000:
# force queueing if there are too many recipients to avoid timeouts
send_now = False
for recipients in frappe.utils.create_batch(final_recipients, 1000):
frappe.enqueue(
self.send_emails,
queue_data=queue_data,
final_recipients=recipients,
now=send_now,
)
if send_now:
for doc in email_queues:
doc.send()
return email_queues
def send_emails(self, queue_data, final_recipients):
# This is used to bulk send emails from same sender to multiple recipients separately
# This re-uses smtp server instance to minimize the cost of new session creation
smtp_server_instance = None
for r in final_recipients:
recipients = list(set([r] + self.final_cc() + self.bcc))
q = EmailQueue.new({**queue_data, **{"recipients": recipients}}, ignore_permissions=True)
if not smtp_server_instance:
email_account = q.get_email_account()
smtp_server_instance = email_account.get_smtp_server()
q.send(smtp_server_instance=smtp_server_instance)
smtp_server_instance.quit()
def as_dict(self, include_recipients=True):
email_account = self.get_outgoing_email_account()