[optimization] bulk flush index
This commit is contained in:
parent
fd46206abe
commit
f57ad0dae4
3 changed files with 35 additions and 39 deletions
|
|
@ -12,6 +12,7 @@ from frappe.utils.verified_command import get_signed_params, verify_request
|
|||
from html2text import html2text
|
||||
from frappe.utils import get_url, nowdate, encode, now_datetime, add_days, split_emails, cstr
|
||||
from rq.timeouts import JobTimeoutException
|
||||
from frappe.utils.scheduler import log
|
||||
|
||||
class BulkLimitCrossedError(frappe.ValidationError): pass
|
||||
|
||||
|
|
@ -137,10 +138,6 @@ def add(email, sender, subject, formatted, text_content=None,
|
|||
e.insert(ignore_permissions=True)
|
||||
|
||||
def check_bulk_limit(recipients):
|
||||
# get count of mails sent this month
|
||||
this_month = frappe.db.sql("""select count(name) from `tabBulk Email` where
|
||||
status='Sent' and MONTH(creation)=MONTH(CURDATE())""")[0][0]
|
||||
|
||||
# if using settings from site_config.json, check bulk limit
|
||||
# No limit for own email settings
|
||||
smtp_server = SMTPServer()
|
||||
|
|
@ -149,6 +146,10 @@ def check_bulk_limit(recipients):
|
|||
and getattr(smtp_server.email_account, "from_site_config", False)
|
||||
or frappe.flags.in_test):
|
||||
|
||||
# get count of mails sent this month
|
||||
this_month = frappe.db.sql("""select count(name) from `tabBulk Email` where
|
||||
status='Sent' and MONTH(creation)=MONTH(CURDATE())""")[0][0]
|
||||
|
||||
monthly_bulk_mail_limit = frappe.conf.get('monthly_bulk_mail_limit') or 500
|
||||
|
||||
if (this_month + len(recipients)) > monthly_bulk_mail_limit:
|
||||
|
|
@ -239,43 +240,33 @@ def return_unsubscribed_page(email, doctype, name):
|
|||
|
||||
def flush(from_test=False):
|
||||
"""flush email queue, every time: called from scheduler"""
|
||||
smtpserver = SMTPServer()
|
||||
|
||||
auto_commit = not from_test
|
||||
|
||||
# additional check
|
||||
check_bulk_limit([])
|
||||
|
||||
auto_commit = not from_test
|
||||
if frappe.are_emails_muted():
|
||||
msgprint(_("Emails are muted"))
|
||||
from_test = True
|
||||
|
||||
frappe.db.sql("""update `tabBulk Email` set status='Expired'
|
||||
where datediff(curdate(), creation) > 3 and status='Not Sent'""", auto_commit=auto_commit)
|
||||
where datediff(curdate(), creation) > 7 and status='Not Sent'""", auto_commit=auto_commit)
|
||||
|
||||
def _get_email(priority):
|
||||
use_index=''
|
||||
if priority:
|
||||
use_index = 'use index (priority)'
|
||||
out = frappe.db.sql("""select * from `tabBulk Email` {use_index} where
|
||||
status='Not Sent' and send_after < %s and priority = %s
|
||||
order by creation asc limit 1 for update""".format(use_index=use_index),
|
||||
(now_datetime(), priority), as_dict=1)
|
||||
return out and out[0] or None
|
||||
smtpserver = SMTPServer()
|
||||
|
||||
for i in xrange(500):
|
||||
# indexing on multiple keys is slow, so query twice
|
||||
# get high priority emails
|
||||
email = _get_email(1)
|
||||
if not email:
|
||||
# get low priority emails
|
||||
email = _get_email(0)
|
||||
# don't use for update here, as it leads deadlocks
|
||||
email = frappe.db.sql('''select * from `tabBulk Email`
|
||||
where status='Not Sent' and (send_after is null or send_after < %(now)s)
|
||||
order by priority desc, creation asc
|
||||
limit 1''', { 'now': now_datetime() }, as_dict=True)
|
||||
|
||||
if not email:
|
||||
# done, no more emails to fly
|
||||
if email:
|
||||
email = email[0]
|
||||
else:
|
||||
break
|
||||
|
||||
send_one(email, smtpserver, auto_commit)
|
||||
|
||||
# NOTE: removing commit here because we pass auto_commit
|
||||
# finally:
|
||||
# frappe.db.commit()
|
||||
|
|
@ -283,8 +274,11 @@ def flush(from_test=False):
|
|||
def send_one(email, smtpserver=None, auto_commit=True, now=False):
|
||||
'''Send bulk email with given smtpserver'''
|
||||
|
||||
if not smtpserver:
|
||||
smtpserver = SMTPServer()
|
||||
status = frappe.db.sql('''select status from `tabBulk Email` where name=%s for update''', email.name)[0][0]
|
||||
if status != 'Not Sent':
|
||||
# rollback to release lock and return
|
||||
frappe.db.rollback()
|
||||
return
|
||||
|
||||
frappe.db.sql("""update `tabBulk Email` set status='Sending', modified=%s where name=%s""",
|
||||
(now_datetime(), email.name), auto_commit=auto_commit)
|
||||
|
|
@ -294,6 +288,7 @@ def send_one(email, smtpserver=None, auto_commit=True, now=False):
|
|||
|
||||
try:
|
||||
if auto_commit:
|
||||
if not smtpserver: smtpserver = SMTPServer()
|
||||
smtpserver.setup_email_account(email.reference_doctype)
|
||||
smtpserver.sess.sendmail(email.sender, email.recipient, encode(email.message))
|
||||
|
||||
|
|
@ -320,6 +315,8 @@ def send_one(email, smtpserver=None, auto_commit=True, now=False):
|
|||
return
|
||||
|
||||
except Exception, e:
|
||||
frappe.db.rollback()
|
||||
|
||||
frappe.db.sql("""update `tabBulk Email` set status='Error', error=%s
|
||||
where name=%s""", (unicode(e), email.name), auto_commit=auto_commit)
|
||||
|
||||
|
|
@ -329,6 +326,10 @@ def send_one(email, smtpserver=None, auto_commit=True, now=False):
|
|||
if now:
|
||||
raise e
|
||||
|
||||
else:
|
||||
# log to scheduler log
|
||||
log('frappe.email.bulk.flush', unicode(e))
|
||||
|
||||
def clear_outbox():
|
||||
"""Remove mails older than 31 days in Outbox. Called daily via scheduler."""
|
||||
frappe.db.sql("""delete from `tabBulk Email` where
|
||||
|
|
|
|||
|
|
@ -107,7 +107,7 @@
|
|||
"read_only": 0,
|
||||
"report_hide": 0,
|
||||
"reqd": 0,
|
||||
"search_index": 1,
|
||||
"search_index": 0,
|
||||
"set_only_once": 0,
|
||||
"unique": 0
|
||||
},
|
||||
|
|
@ -231,7 +231,7 @@
|
|||
"read_only": 1,
|
||||
"report_hide": 0,
|
||||
"reqd": 0,
|
||||
"search_index": 1,
|
||||
"search_index": 0,
|
||||
"set_only_once": 0,
|
||||
"unique": 0
|
||||
},
|
||||
|
|
@ -257,7 +257,7 @@
|
|||
"read_only": 1,
|
||||
"report_hide": 0,
|
||||
"reqd": 0,
|
||||
"search_index": 1,
|
||||
"search_index": 0,
|
||||
"set_only_once": 0,
|
||||
"unique": 0
|
||||
}
|
||||
|
|
@ -272,7 +272,7 @@
|
|||
"issingle": 0,
|
||||
"istable": 0,
|
||||
"max_attachments": 0,
|
||||
"modified": "2016-05-25 12:27:26.045643",
|
||||
"modified": "2016-05-26 06:00:18.596285",
|
||||
"modified_by": "Administrator",
|
||||
"module": "Email",
|
||||
"name": "Bulk Email",
|
||||
|
|
|
|||
|
|
@ -9,12 +9,7 @@ from frappe.email.bulk import send_one
|
|||
from frappe.utils import now_datetime
|
||||
|
||||
class BulkEmail(Document):
|
||||
def validate(self):
|
||||
if self.priority not in (0, 1):
|
||||
frappe.throw(frappe._('Bulk Email priority must be 0 or 1'))
|
||||
|
||||
if not self.send_after:
|
||||
self.send_after = now_datetime()
|
||||
pass
|
||||
|
||||
@frappe.whitelist()
|
||||
def retry_sending(name):
|
||||
|
|
@ -30,4 +25,4 @@ def send_now(name):
|
|||
|
||||
def on_doctype_update():
|
||||
"""Add index in `tabCommunication` for `(reference_doctype, reference_name)`"""
|
||||
frappe.db.add_index("Bulk Email", ["creation"])
|
||||
frappe.db.add_index('Bulk Email', ('status', 'send_after', 'priority', 'creation'), 'index_bulk_flush')
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue