diff --git a/frappe/email/bulk.py b/frappe/email/bulk.py index eaf1cb062a..5d8d411f86 100644 --- a/frappe/email/bulk.py +++ b/frappe/email/bulk.py @@ -12,6 +12,7 @@ from frappe.utils.verified_command import get_signed_params, verify_request from html2text import html2text from frappe.utils import get_url, nowdate, encode, now_datetime, add_days, split_emails, cstr from rq.timeouts import JobTimeoutException +from frappe.utils.scheduler import log class BulkLimitCrossedError(frappe.ValidationError): pass @@ -137,10 +138,6 @@ def add(email, sender, subject, formatted, text_content=None, e.insert(ignore_permissions=True) def check_bulk_limit(recipients): - # get count of mails sent this month - this_month = frappe.db.sql("""select count(name) from `tabBulk Email` where - status='Sent' and MONTH(creation)=MONTH(CURDATE())""")[0][0] - # if using settings from site_config.json, check bulk limit # No limit for own email settings smtp_server = SMTPServer() @@ -149,6 +146,10 @@ def check_bulk_limit(recipients): and getattr(smtp_server.email_account, "from_site_config", False) or frappe.flags.in_test): + # get count of mails sent this month + this_month = frappe.db.sql("""select count(name) from `tabBulk Email` where + status='Sent' and MONTH(creation)=MONTH(CURDATE())""")[0][0] + monthly_bulk_mail_limit = frappe.conf.get('monthly_bulk_mail_limit') or 500 if (this_month + len(recipients)) > monthly_bulk_mail_limit: @@ -239,43 +240,33 @@ def return_unsubscribed_page(email, doctype, name): def flush(from_test=False): """flush email queue, every time: called from scheduler""" - smtpserver = SMTPServer() - - auto_commit = not from_test - # additional check check_bulk_limit([]) + auto_commit = not from_test if frappe.are_emails_muted(): msgprint(_("Emails are muted")) from_test = True frappe.db.sql("""update `tabBulk Email` set status='Expired' - where datediff(curdate(), creation) > 3 and status='Not Sent'""", auto_commit=auto_commit) + where datediff(curdate(), creation) > 7 and status='Not Sent'""", auto_commit=auto_commit) - def _get_email(priority): - use_index='' - if priority: - use_index = 'use index (priority)' - out = frappe.db.sql("""select * from `tabBulk Email` {use_index} where - status='Not Sent' and send_after < %s and priority = %s - order by creation asc limit 1 for update""".format(use_index=use_index), - (now_datetime(), priority), as_dict=1) - return out and out[0] or None + smtpserver = SMTPServer() for i in xrange(500): - # indexing on multiple keys is slow, so query twice - # get high priority emails - email = _get_email(1) - if not email: - # get low priority emails - email = _get_email(0) + # don't use for update here, as it leads deadlocks + email = frappe.db.sql('''select * from `tabBulk Email` + where status='Not Sent' and (send_after is null or send_after < %(now)s) + order by priority desc, creation asc + limit 1''', { 'now': now_datetime() }, as_dict=True) - if not email: - # done, no more emails to fly + if email: + email = email[0] + else: break send_one(email, smtpserver, auto_commit) + # NOTE: removing commit here because we pass auto_commit # finally: # frappe.db.commit() @@ -283,8 +274,11 @@ def flush(from_test=False): def send_one(email, smtpserver=None, auto_commit=True, now=False): '''Send bulk email with given smtpserver''' - if not smtpserver: - smtpserver = SMTPServer() + status = frappe.db.sql('''select status from `tabBulk Email` where name=%s for update''', email.name)[0][0] + if status != 'Not Sent': + # rollback to release lock and return + frappe.db.rollback() + return frappe.db.sql("""update `tabBulk Email` set status='Sending', modified=%s where name=%s""", (now_datetime(), email.name), auto_commit=auto_commit) @@ -294,6 +288,7 @@ def send_one(email, smtpserver=None, auto_commit=True, now=False): try: if auto_commit: + if not smtpserver: smtpserver = SMTPServer() smtpserver.setup_email_account(email.reference_doctype) smtpserver.sess.sendmail(email.sender, email.recipient, encode(email.message)) @@ -320,6 +315,8 @@ def send_one(email, smtpserver=None, auto_commit=True, now=False): return except Exception, e: + frappe.db.rollback() + frappe.db.sql("""update `tabBulk Email` set status='Error', error=%s where name=%s""", (unicode(e), email.name), auto_commit=auto_commit) @@ -329,6 +326,10 @@ def send_one(email, smtpserver=None, auto_commit=True, now=False): if now: raise e + else: + # log to scheduler log + log('frappe.email.bulk.flush', unicode(e)) + def clear_outbox(): """Remove mails older than 31 days in Outbox. Called daily via scheduler.""" frappe.db.sql("""delete from `tabBulk Email` where diff --git a/frappe/email/doctype/bulk_email/bulk_email.json b/frappe/email/doctype/bulk_email/bulk_email.json index c7b7747307..a9a07b5694 100644 --- a/frappe/email/doctype/bulk_email/bulk_email.json +++ b/frappe/email/doctype/bulk_email/bulk_email.json @@ -107,7 +107,7 @@ "read_only": 0, "report_hide": 0, "reqd": 0, - "search_index": 1, + "search_index": 0, "set_only_once": 0, "unique": 0 }, @@ -231,7 +231,7 @@ "read_only": 1, "report_hide": 0, "reqd": 0, - "search_index": 1, + "search_index": 0, "set_only_once": 0, "unique": 0 }, @@ -257,7 +257,7 @@ "read_only": 1, "report_hide": 0, "reqd": 0, - "search_index": 1, + "search_index": 0, "set_only_once": 0, "unique": 0 } @@ -272,7 +272,7 @@ "issingle": 0, "istable": 0, "max_attachments": 0, - "modified": "2016-05-25 12:27:26.045643", + "modified": "2016-05-26 06:00:18.596285", "modified_by": "Administrator", "module": "Email", "name": "Bulk Email", diff --git a/frappe/email/doctype/bulk_email/bulk_email.py b/frappe/email/doctype/bulk_email/bulk_email.py index af9d76debf..991a78482b 100644 --- a/frappe/email/doctype/bulk_email/bulk_email.py +++ b/frappe/email/doctype/bulk_email/bulk_email.py @@ -9,12 +9,7 @@ from frappe.email.bulk import send_one from frappe.utils import now_datetime class BulkEmail(Document): - def validate(self): - if self.priority not in (0, 1): - frappe.throw(frappe._('Bulk Email priority must be 0 or 1')) - - if not self.send_after: - self.send_after = now_datetime() + pass @frappe.whitelist() def retry_sending(name): @@ -30,4 +25,4 @@ def send_now(name): def on_doctype_update(): """Add index in `tabCommunication` for `(reference_doctype, reference_name)`""" - frappe.db.add_index("Bulk Email", ["creation"]) + frappe.db.add_index('Bulk Email', ('status', 'send_after', 'priority', 'creation'), 'index_bulk_flush')