diff --git a/frappe/commands.py b/frappe/commands.py index 4f5c316af6..6351029411 100644 --- a/frappe/commands.py +++ b/frappe/commands.py @@ -103,7 +103,7 @@ def _is_scheduler_enabled(): enable_scheduler = False try: frappe.connect() - enable_scheduler = cint(frappe.db.get_default("enable_scheduler")) + enable_scheduler = cint(frappe.db.get_single_value("System Settings", "enable_scheduler")) and True or False except: pass finally: diff --git a/frappe/core/doctype/communication/communication.py b/frappe/core/doctype/communication/communication.py index 6c3877698f..1115ce9007 100644 --- a/frappe/core/doctype/communication/communication.py +++ b/frappe/core/doctype/communication/communication.py @@ -72,6 +72,26 @@ class Communication(Document): ["email_id", "always_use_account_email_id_as_sender"], as_dict=True) or frappe._dict() def notify(self, print_html=None, print_format=None, attachments=None, recipients=None, except_recipient=False): + """Calls a delayed celery task 'sendmail' that enqueus email in Bulk Email queue + + :param print_html: Send given value as HTML attachment + :param print_format: Attach print format of parent document + :param attachments: A list of filenames that should be attached when sending this email + :param recipients: Email recipients + :param except_recipient: True when pulling email, the notification shouldn't go to the main recipient + + """ + if frappe.flags.in_test: + # for test cases, run synchronously + self._notify(print_html=print_html, print_format=print_format, attachments=attachments, + recipients=recipients, except_recipient=except_recipient) + else: + from frappe.tasks import sendmail + sendmail.delay(frappe.local.site, self.name, + print_html=print_html, print_format=print_format, attachments=attachments, + recipients=recipients, except_recipient=except_recipient) + + def _notify(self, print_html=None, print_format=None, attachments=None, recipients=None, except_recipient=False): self.prepare_to_notify(print_html, print_format, attachments) if not recipients: recipients = self.get_recipients(except_recipient=except_recipient) @@ -256,6 +276,10 @@ def make(doctype=None, name=None, content=None, subject=None, sent_or_received = }) comm.insert(ignore_permissions=True) + # needed for communication.notify which uses celery delay + # if not committed, delayed task doesn't find the communication + frappe.db.commit() + recipients = None if send_email: comm.send_me_a_copy = send_me_a_copy diff --git a/frappe/email/doctype/email_account/email_account.py b/frappe/email/doctype/email_account/email_account.py index 2ed930386a..349017f6ca 100644 --- a/frappe/email/doctype/email_account/email_account.py +++ b/frappe/email/doctype/email_account/email_account.py @@ -124,7 +124,7 @@ class EmailAccount(Document): exceptions = [] for raw in incoming_mails: try: - self.insert_communication(raw) + communication = self.insert_communication(raw) except Exception: frappe.db.rollback() @@ -132,6 +132,7 @@ class EmailAccount(Document): else: frappe.db.commit() + communication.notify(attachments=communication._attachments, except_recipient=True) if exceptions: raise Exception, frappe.as_json(exceptions) @@ -156,7 +157,7 @@ class EmailAccount(Document): communication.insert(ignore_permissions = 1) # save attachments - email.save_attachments_in_doc(communication) + communication._attachments = email.save_attachments_in_doc(communication) if self.enable_auto_reply and getattr(communication, "is_first", False): self.send_auto_reply(communication, email) @@ -164,7 +165,8 @@ class EmailAccount(Document): # notify all participants of this thread # convert content to HTML - by default text parts of replies are used. communication.content = markdown2.markdown(communication.content) - communication.notify(attachments=email.attachments, except_recipient = True) + + return communication def set_thread(self, communication, email): """Appends communication to parent based on thread ID. Will extract diff --git a/frappe/email/receive.py b/frappe/email/receive.py index d53453053a..3042c12d0e 100644 --- a/frappe/email/receive.py +++ b/frappe/email/receive.py @@ -296,10 +296,13 @@ class Email: def save_attachments_in_doc(self, doc): """Save email attachments in given document.""" from frappe.utils.file_manager import save_file, MaxFileSizeReachedError + saved_attachments = [] + for attachment in self.attachments: try: - save_file(attachment['fname'], attachment['fcontent'], + file_data = save_file(attachment['fname'], attachment['fcontent'], doc.doctype, doc.name) + saved_attachments.append(file_data.file_name) except MaxFileSizeReachedError: # WARNING: bypass max file size exception pass @@ -307,6 +310,8 @@ class Email: # same file attached twice?? pass + return saved_attachments + def get_thread_id(self): """Extract thread ID from `[]`""" import re diff --git a/frappe/tasks.py b/frappe/tasks.py index 60e5cfe7dd..bd9e49b593 100644 --- a/frappe/tasks.py +++ b/frappe/tasks.py @@ -7,6 +7,8 @@ from frappe.utils.scheduler import enqueue_events from frappe.celery_app import get_celery, celery_task, task_logger, LONGJOBS_PREFIX from frappe.utils import get_sites from frappe.utils.file_lock import create_lock, delete_lock +import time +import MySQLdb @celery_task() def sync_queues(): @@ -122,3 +124,33 @@ def pull_from_email_account(site, email_account): frappe.db.commit() finally: frappe.destroy() + +@celery_task() +def sendmail(site, communication_name, print_html=None, print_format=None, attachments=None, recipients=None, except_recipient=False): + try: + frappe.connect(site=site) + + # upto 3 retries + for i in xrange(3): + try: + communication = frappe.get_doc("Communication", communication_name) + communication._notify(print_html=print_html, print_format=print_format, attachments=attachments, recipients=recipients, except_recipient=except_recipient) + except MySQLdb.OperationalError, e: + # deadlock, try again + if e.args[0]==1213: + frappe.db.rollback() + time.sleep(1) + continue + else: + raise + else: + break + + except: + frappe.db.rollback() + + else: + frappe.db.commit() + + finally: + frappe.destroy()