[minor] email queue system more optimized
This commit is contained in:
parent
2bd27c0844
commit
561b2490c4
3 changed files with 29 additions and 13 deletions
|
|
@ -37,7 +37,7 @@ class Newsletter(Document):
|
|||
self.validate_send()
|
||||
|
||||
# using default queue with a longer timeout as this isn't a scheduled task
|
||||
enqueue(send_newsletter, queue='default', timeout=1500, event='send_newsletter', newsletter=self.name)
|
||||
enqueue(send_newsletter, queue='default', timeout=3000, event='send_newsletter', newsletter=self.name)
|
||||
|
||||
else:
|
||||
self.queue_all()
|
||||
|
|
|
|||
|
|
@ -135,7 +135,7 @@ def add(email, sender, subject, formatted, text_content=None,
|
|||
e.reference_name = reference_name
|
||||
e.communication = communication
|
||||
e.send_after = send_after
|
||||
e.insert(ignore_permissions=True)
|
||||
e.db_insert()
|
||||
|
||||
def check_email_limit(recipients):
|
||||
# if using settings from site_config.json, check email limit
|
||||
|
|
@ -151,6 +151,9 @@ def check_email_limit(recipients):
|
|||
|
||||
monthly_email_limit = frappe.conf.get('limits', {}).get('emails') or 500
|
||||
|
||||
if frappe.flags.in_test:
|
||||
monthly_email_limit = 500
|
||||
|
||||
if (this_month + len(recipients)) > monthly_email_limit:
|
||||
throw(_("Cannot send this email. You have crossed the sending limit of {0} emails for this month.").format(monthly_email_limit),
|
||||
EmailLimitCrossedError)
|
||||
|
|
@ -256,16 +259,12 @@ def flush(from_test=False):
|
|||
|
||||
smtpserver = SMTPServer()
|
||||
|
||||
for i in xrange(500):
|
||||
# don't use for update here, as it leads deadlocks
|
||||
email = frappe.db.sql('''select * from `tabEmail Queue`
|
||||
where status='Not Sent' and (send_after is null or send_after < %(now)s)
|
||||
order by priority desc, creation asc
|
||||
limit 1''', { 'now': now_datetime() }, as_dict=True)
|
||||
make_cache_queue()
|
||||
|
||||
if email:
|
||||
email = email[0]
|
||||
else:
|
||||
for i in xrange(500):
|
||||
email = frappe.cache().lpop('cache_email_queue')
|
||||
|
||||
if not email:
|
||||
break
|
||||
|
||||
send_one(email, smtpserver, auto_commit)
|
||||
|
|
@ -273,12 +272,23 @@ def flush(from_test=False):
|
|||
# NOTE: removing commit here because we pass auto_commit
|
||||
# finally:
|
||||
# frappe.db.commit()
|
||||
def make_cache_queue():
|
||||
'''cache values in queue before sendign'''
|
||||
cache = frappe.cache()
|
||||
cache.delete_value('cache_email_queue')
|
||||
|
||||
for l in frappe.db.sql('''select name from `tabEmail Queue`
|
||||
where status='Not Sent' and (send_after is null or send_after < %(now)s)
|
||||
order by priority desc, creation asc
|
||||
limit 500''', { 'now': now_datetime() }):
|
||||
cache.lpush('cache_email_queue', l[0])
|
||||
|
||||
def send_one(email, smtpserver=None, auto_commit=True, now=False):
|
||||
'''Send Email Queue with given smtpserver'''
|
||||
|
||||
status = frappe.db.sql('''select status from `tabEmail Queue` where name=%s for update''', email.name)[0][0]
|
||||
if status != 'Not Sent':
|
||||
email = frappe.db.sql('''select name, status, communication, message, sender, recipient
|
||||
from `tabEmail Queue` where name=%s for update''', email, as_dict=True)[0]
|
||||
if email.status != 'Not Sent':
|
||||
# rollback to release lock and return
|
||||
frappe.db.rollback()
|
||||
return
|
||||
|
|
|
|||
|
|
@ -117,6 +117,12 @@ class RedisWrapper(redis.Redis):
|
|||
if key in frappe.local.cache:
|
||||
del frappe.local.cache[key]
|
||||
|
||||
def lpush(self, key, value):
|
||||
super(redis.Redis, self).lpush(self.make_key(key), value)
|
||||
|
||||
def lpop(self, key):
|
||||
return super(redis.Redis, self).lpop(self.make_key(key))
|
||||
|
||||
def hset(self, name, key, value):
|
||||
if not name in frappe.local.cache:
|
||||
frappe.local.cache[name] = {}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue