perf: Reuse SMTP connection when flushing email queue

This depends on https://github.com/frappe/frappe/pull/23070
This commit is contained in:
Ankush Menat 2023-11-02 19:36:01 +05:30
parent 4e318a0280
commit d5d0dfb58b
4 changed files with 41 additions and 4 deletions

View file

@ -348,7 +348,7 @@ class EmailAccount(Document):
return frappe.get_doc(cls.DOCTYPE, name)
@classmethod
def find_one_by_filters(cls, **kwargs):
def find_one_by_filters(cls, **kwargs) -> "EmailAccount":
name = frappe.db.get_value(cls.DOCTYPE, kwargs)
return cls.find(name) if name else None
@ -475,9 +475,22 @@ class EmailAccount(Document):
}
def get_smtp_server(self):
"""Get SMTPServer (wrapper around actual smtplib object) for this account.
Implementation Detail: Since SMTPServer is same for each email connection, the same *instance*
is returned every time this function is called from same EmailAccount object.
This enables reusabilty of connection for better performance."""
return self._smtp_server_instance
@functools.cached_property
def _smtp_server_instance(self):
config = self.sendmail_config()
return SMTPServer(**config)
def remove_unpicklable_values(self, state):
super().remove_unpicklable_values(state)
state.pop("_smtp_server_instance", None)
def handle_incoming_connect_error(self, description):
if test_internet():
if self.get_failed_attempts_count() > 2:

View file

@ -31,6 +31,7 @@ from frappe.utils import (
sbool,
split_emails,
)
from frappe.utils.deprecations import deprecated
from frappe.utils.verified_command import get_signed_params
@ -213,6 +214,7 @@ class EmailQueue(Document):
@task(queue="short")
@deprecated
def send_mail(email_queue_name, smtp_server_instance: SMTPServer = None):
"""This is equivalent to EmailQueue.send.

View file

@ -3,6 +3,7 @@
import frappe
from frappe import _, msgprint
from frappe.email.smtp import SMTPServer
from frappe.utils import cint, cstr, get_url, now_datetime
from frappe.utils.data import getdate
from frappe.utils.verified_command import get_signed_params, verify_request
@ -130,8 +131,11 @@ def return_unsubscribed_page(email, doctype, name):
def flush():
"""flush email queue, every time: called from scheduler"""
from frappe.email.doctype.email_queue.email_queue import send_mail
"""flush email queue, every time: called from scheduler.
This should not be called outside of background jobs.
"""
from frappe.email.doctype.email_queue.email_queue import EmailQueue
# To avoid running jobs inside unit tests
if frappe.are_emails_muted():
@ -144,19 +148,33 @@ def flush():
if not email_queue_batch:
return
opened_connections = set()
failed_email_queues = []
for row in email_queue_batch:
try:
send_mail(email_queue_name=row.name)
email_queue: EmailQueue = frappe.get_doc("Email Queue", row.name)
smtp_server_instance = email_queue.get_email_account().get_smtp_server()
opened_connections.add(smtp_server_instance)
email_queue.send(smtp_server_instance=smtp_server_instance)
except Exception:
frappe.get_doc("Email Queue", row.name).log_error()
failed_email_queues.append(row.name)
if (
len(failed_email_queues) / len(email_queue_batch) > EMAIL_QUEUE_BATCH_FAILURE_THRESHOLD_PERCENT
and len(failed_email_queues) > EMAIL_QUEUE_BATCH_FAILURE_THRESHOLD_COUNT
):
_close_connections(opened_connections)
frappe.throw(_("Email Queue flushing aborted due to too many failures."))
_close_connections(opened_connections)
def _close_connections(smtp_connections: set[SMTPServer]):
for conn in smtp_connections:
conn.quit()
def get_queue():
batch_size = cint(frappe.conf.email_queue_batch_size) or 500

View file

@ -61,6 +61,10 @@ class SMTPServer:
@property
def session(self):
"""Get SMTP session.
We make best effort to revive connection if it's disconnected by checking the connection
health before returning it to user."""
if self.is_session_active():
return self._session