Merge pull request #21304 from phot0n/fix-expiry-for-email-queue

feat: bulk retry for email queue & remove infinite* retry for certain smtp exceptions
This commit is contained in:
Ritwik Puri 2023-06-12 15:08:44 +05:30 committed by GitHub
commit fbc25d206e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 57 additions and 78 deletions

View file

@ -18,7 +18,7 @@ class TestScheduledJobType(FrappeTestCase):
self.assertEqual(all_job.frequency, "All")
daily_job = frappe.get_doc(
"Scheduled Job Type", dict(method="frappe.email.queue.set_expiry_for_email_queue")
"Scheduled Job Type", dict(method="frappe.desk.notifications.clear_notifications")
)
self.assertEqual(daily_job.frequency, "Daily")
@ -37,7 +37,7 @@ class TestScheduledJobType(FrappeTestCase):
def test_daily_job(self):
job = frappe.get_doc(
"Scheduled Job Type", dict(method="frappe.email.queue.set_expiry_for_email_queue")
"Scheduled Job Type", dict(method="frappe.desk.notifications.clear_notifications")
)
job.db_set("last_execution", "2019-01-01 00:00:00")
self.assertTrue(job.is_event_due(get_datetime("2019-01-02 00:00:06")))

View file

@ -17,13 +17,13 @@ frappe.ui.form.on("Email Queue", {
});
});
} else if (frm.doc.status == "Error") {
let button = frm.add_custom_button("Retry Sending", function () {
frm.add_custom_button("Retry Sending", function () {
frm.call({
method: "retry_sending",
doc: frm.doc,
args: {
name: frm.doc.name,
},
btn: button,
callback: function () {
frm.reload_doc();
},

View file

@ -59,7 +59,7 @@
"in_list_view": 1,
"in_standard_filter": 1,
"label": "Status",
"options": "Not Sent\nSending\nSent\nPartially Sent\nError\nExpired"
"options": "Not Sent\nSending\nSent\nPartially Sent\nError"
},
{
"depends_on": "eval:doc.error",
@ -154,7 +154,7 @@
"idx": 1,
"in_create": 1,
"links": [],
"modified": "2023-06-08 15:31:52.789186",
"modified": "2023-06-09 14:31:52.789186",
"modified_by": "Administrator",
"module": "Email",
"name": "Email Queue",

View file

@ -3,14 +3,11 @@
import json
import quopri
import smtplib
import traceback
from contextlib import suppress
from email.parser import Parser
from email.policy import SMTPUTF8
from rq.timeouts import JobTimeoutException
import frappe
from frappe import _, safe_encode, task
from frappe.core.utils import html2text
@ -28,6 +25,7 @@ from frappe.utils import (
get_hook_method,
get_string_between,
get_url,
now,
nowdate,
sbool,
split_emails,
@ -177,6 +175,12 @@ class EmailQueue(Document):
.where(email_recipient.modified < (Now() - Interval(days=days)))
).run()
@frappe.whitelist()
def retry_sending(self):
if self.status == "Error":
self.status = "Not Sent"
self.save(ignore_permissions=True)
@task(queue="short")
def send_mail(email_queue_name, smtp_server_instance: SMTPServer = None):
@ -212,25 +216,11 @@ class SendMailContext:
return self
def __exit__(self, exc_type, exc_val, exc_tb):
exceptions = [
smtplib.SMTPServerDisconnected,
smtplib.SMTPAuthenticationError,
smtplib.SMTPConnectError,
smtplib.SMTPHeloError,
JobTimeoutException,
]
trace = "".join(traceback.format_tb(exc_tb)) if exc_tb else None
if not self.retain_smtp_session:
self.smtp_server.quit()
if exc_type in exceptions:
update_fields = {
"status": "Partially Sent" if self.sent_to_atleast_one_recipient else "Not Sent",
"error": trace,
}
elif exc_type:
update_fields = {"error": trace}
if exc_type:
update_fields = {"error": "".join(traceback.format_tb(exc_tb))}
if self.queue_doc.retry < get_email_retry_limit():
update_fields.update(
{
@ -359,16 +349,26 @@ class SendMailContext:
@frappe.whitelist()
def retry_sending(name):
doc = frappe.get_doc("Email Queue", name)
doc.check_permission()
def bulk_retry(queues):
frappe.only_for("System Manager")
if doc and doc.status == "Error":
doc.status = "Not Sent"
for d in doc.recipients:
if d.status != "Sent":
d.status = "Not Sent"
doc.save(ignore_permissions=True)
if isinstance(queues, str):
queues = json.loads(queues)
if not queues:
return
frappe.msgprint(
_("Updating Email Queue Statuses. The emails will be picked up in the next scheduled run."),
_("Processing..."),
)
email_queue = frappe.qb.DocType("Email Queue")
frappe.qb.update(email_queue).set(email_queue.status, "Not Sent").set(
email_queue.modified, now()
).set(email_queue.modified_by, frappe.session.user).where(
email_queue.name.isin(queues) & email_queue.status == "Error"
).run()
@frappe.whitelist()

View file

@ -9,7 +9,10 @@ frappe.listview_settings["Email Queue"] = {
};
return [__(doc.status), colour[doc.status], "status,=," + doc.status];
},
refresh: show_toggle_sending_button,
refresh: function (listview) {
show_toggle_sending_button(listview);
add_bulk_retry_button_to_actions(listview);
},
onload: function (list_view) {
frappe.require("logtypes.bundle.js", () => {
frappe.utils.logtypes.show_log_retention_message(list_view.doctype);
@ -39,3 +42,21 @@ function show_toggle_sending_button(list_view) {
show_toggle_sending_button(list_view);
});
}
function add_bulk_retry_button_to_actions(list_view) {
if (!has_common(frappe.user_roles, ["Administrator", "System Manager"])) return;
list_view.page.add_actions_menu_item(__("Retry Sending"), () => {
frappe.call({
method: "frappe.email.doctype.email_queue.email_queue.bulk_retry",
args: {
queues: list_view.get_checked_items(true),
},
callback: (r) => {
if (!r.exc) {
list_view.refresh();
}
},
});
});
}

View file

@ -179,19 +179,3 @@ def get_queue():
{"now": now_datetime()},
as_dict=True,
)
def set_expiry_for_email_queue():
"""Mark emails as expire that has not sent for 7 days.
Called daily via scheduler.
"""
frappe.db.sql(
"""
UPDATE `tabEmail Queue`
SET `status`='Expired'
WHERE `modified` < (NOW() - INTERVAL '7' DAY)
AND `status`='Not Sent'
AND (`send_after` IS NULL OR `send_after` < %(now)s)""",
{"now": now_datetime()},
)

View file

@ -223,7 +223,6 @@ scheduler_events = {
"frappe.website.doctype.personal_data_deletion_request.personal_data_deletion_request.process_data_deletion_request",
],
"daily": [
"frappe.email.queue.set_expiry_for_email_queue",
"frappe.desk.notifications.clear_notifications",
"frappe.desk.doctype.event.event.send_event_digest",
"frappe.sessions.clear_expired_sessions",

View file

@ -205,31 +205,6 @@ class TestEmail(FrappeTestCase):
self.assertTrue(verify_request())
break
def test_expired(self):
self.test_email_queue()
frappe.db.sql("UPDATE `tabEmail Queue` SET `modified`=(NOW() - INTERVAL '8' day)")
from frappe.email.queue import set_expiry_for_email_queue
set_expiry_for_email_queue()
email_queue = frappe.db.sql(
"""select name from `tabEmail Queue` where status='Expired'""", as_dict=1
)
self.assertEqual(len(email_queue), 1)
queue_recipients = [
r.recipient
for r in frappe.db.sql(
"""select recipient from `tabEmail Queue Recipient`
where parent = %s""",
email_queue[0].name,
as_dict=1,
)
]
self.assertTrue("test@example.com" in queue_recipients)
self.assertTrue("test1@example.com" in queue_recipients)
self.assertEqual(len(queue_recipients), 2)
def test_sender(self):
def _patched_assertion(email_account, assertion):
with patch.object(QueueBuilder, "get_outgoing_email_account", return_value=email_account):

View file

@ -42,7 +42,7 @@ class TestScheduler(TestCase):
enqueued_jobs = enqueue_events(site=frappe.local.site)
self.assertIn("frappe.email.queue.set_expiry_for_email_queue", enqueued_jobs)
self.assertIn("frappe.desk.notifications.clear_notifications", enqueued_jobs)
self.assertIn("frappe.utils.change_log.check_for_update", enqueued_jobs)
self.assertIn(
"frappe.email.doctype.auto_email_report.auto_email_report.send_monthly",