From d5d0dfb58b801981d9a07201cf1a96af9eecb0ec Mon Sep 17 00:00:00 2001 From: Ankush Menat Date: Thu, 2 Nov 2023 19:36:01 +0530 Subject: [PATCH] perf: Reuse SMTP connection when flushing email queue This depends on https://github.com/frappe/frappe/pull/23070 --- .../doctype/email_account/email_account.py | 15 +++++++++++- .../email/doctype/email_queue/email_queue.py | 2 ++ frappe/email/queue.py | 24 ++++++++++++++++--- frappe/email/smtp.py | 4 ++++ 4 files changed, 41 insertions(+), 4 deletions(-) diff --git a/frappe/email/doctype/email_account/email_account.py b/frappe/email/doctype/email_account/email_account.py index 24d548cb37..e7faf6100c 100755 --- a/frappe/email/doctype/email_account/email_account.py +++ b/frappe/email/doctype/email_account/email_account.py @@ -348,7 +348,7 @@ class EmailAccount(Document): return frappe.get_doc(cls.DOCTYPE, name) @classmethod - def find_one_by_filters(cls, **kwargs): + def find_one_by_filters(cls, **kwargs) -> "EmailAccount": name = frappe.db.get_value(cls.DOCTYPE, kwargs) return cls.find(name) if name else None @@ -475,9 +475,22 @@ class EmailAccount(Document): } def get_smtp_server(self): + """Get SMTPServer (wrapper around actual smtplib object) for this account. + + Implementation Detail: Since SMTPServer is same for each email connection, the same *instance* + is returned every time this function is called from same EmailAccount object. + This enables reusabilty of connection for better performance.""" + return self._smtp_server_instance + + @functools.cached_property + def _smtp_server_instance(self): config = self.sendmail_config() return SMTPServer(**config) + def remove_unpicklable_values(self, state): + super().remove_unpicklable_values(state) + state.pop("_smtp_server_instance", None) + def handle_incoming_connect_error(self, description): if test_internet(): if self.get_failed_attempts_count() > 2: diff --git a/frappe/email/doctype/email_queue/email_queue.py b/frappe/email/doctype/email_queue/email_queue.py index 7dc5ef27a3..8ca3c94b40 100644 --- a/frappe/email/doctype/email_queue/email_queue.py +++ b/frappe/email/doctype/email_queue/email_queue.py @@ -31,6 +31,7 @@ from frappe.utils import ( sbool, split_emails, ) +from frappe.utils.deprecations import deprecated from frappe.utils.verified_command import get_signed_params @@ -213,6 +214,7 @@ class EmailQueue(Document): @task(queue="short") +@deprecated def send_mail(email_queue_name, smtp_server_instance: SMTPServer = None): """This is equivalent to EmailQueue.send. diff --git a/frappe/email/queue.py b/frappe/email/queue.py index b9e03a3e04..25c6a18cd0 100755 --- a/frappe/email/queue.py +++ b/frappe/email/queue.py @@ -3,6 +3,7 @@ import frappe from frappe import _, msgprint +from frappe.email.smtp import SMTPServer from frappe.utils import cint, cstr, get_url, now_datetime from frappe.utils.data import getdate from frappe.utils.verified_command import get_signed_params, verify_request @@ -130,8 +131,11 @@ def return_unsubscribed_page(email, doctype, name): def flush(): - """flush email queue, every time: called from scheduler""" - from frappe.email.doctype.email_queue.email_queue import send_mail + """flush email queue, every time: called from scheduler. + + This should not be called outside of background jobs. + """ + from frappe.email.doctype.email_queue.email_queue import EmailQueue # To avoid running jobs inside unit tests if frappe.are_emails_muted(): @@ -144,19 +148,33 @@ def flush(): if not email_queue_batch: return + opened_connections = set() + failed_email_queues = [] for row in email_queue_batch: try: - send_mail(email_queue_name=row.name) + email_queue: EmailQueue = frappe.get_doc("Email Queue", row.name) + smtp_server_instance = email_queue.get_email_account().get_smtp_server() + opened_connections.add(smtp_server_instance) + email_queue.send(smtp_server_instance=smtp_server_instance) except Exception: frappe.get_doc("Email Queue", row.name).log_error() failed_email_queues.append(row.name) + if ( len(failed_email_queues) / len(email_queue_batch) > EMAIL_QUEUE_BATCH_FAILURE_THRESHOLD_PERCENT and len(failed_email_queues) > EMAIL_QUEUE_BATCH_FAILURE_THRESHOLD_COUNT ): + _close_connections(opened_connections) frappe.throw(_("Email Queue flushing aborted due to too many failures.")) + _close_connections(opened_connections) + + +def _close_connections(smtp_connections: set[SMTPServer]): + for conn in smtp_connections: + conn.quit() + def get_queue(): batch_size = cint(frappe.conf.email_queue_batch_size) or 500 diff --git a/frappe/email/smtp.py b/frappe/email/smtp.py index 8b0a6d9416..ed213ed860 100644 --- a/frappe/email/smtp.py +++ b/frappe/email/smtp.py @@ -61,6 +61,10 @@ class SMTPServer: @property def session(self): + """Get SMTP session. + + We make best effort to revive connection if it's disconnected by checking the connection + health before returning it to user.""" if self.is_session_active(): return self._session