From 6d0563e8ea9ec0c8fe908459bb3cf9cd76e385c1 Mon Sep 17 00:00:00 2001 From: Valmik Jangla Date: Thu, 17 Mar 2016 18:06:14 +0530 Subject: [PATCH] WIP code migration from Celery to RQ --- frappe/core/doctype/communication/email.py | 51 +++++- frappe/tasks.py | 181 +-------------------- frappe/utils/background_jobs.py | 52 ++++++ frappe/utils/scheduler.py | 86 ++++++++-- requirements.txt | 3 + 5 files changed, 180 insertions(+), 193 deletions(-) create mode 100644 frappe/utils/background_jobs.py diff --git a/frappe/core/doctype/communication/email.py b/frappe/core/doctype/communication/email.py index dbe8fdbc27..4272dd5b47 100644 --- a/frappe/core/doctype/communication/email.py +++ b/frappe/core/doctype/communication/email.py @@ -8,7 +8,10 @@ from email.utils import formataddr, parseaddr from frappe.utils import get_url, get_formatted_email, cint, validate_email_add, split_emails from frappe.utils.file_manager import get_file from frappe.email.bulk import check_bulk_limit +from frappe.utils.scheduler import log import frappe.email.smtp +import MySQLdb +import time from frappe import _ @frappe.whitelist() @@ -105,9 +108,8 @@ def notify(doc, print_html=None, print_format=None, attachments=None, recipients=recipients, cc=cc) else: check_bulk_limit(list(set(doc.sent_email_addresses))) - - from frappe.tasks import sendmail - sendmail.delay(frappe.local.site, doc.name, + from frappe.utils.background_jobs import enqueue + enqueue(sendmail, communication_name=doc.name, print_html=print_html, print_format=print_format, attachments=attachments, recipients=recipients, cc=cc, lang=frappe.local.lang, session=frappe.local.session) @@ -349,3 +351,46 @@ def get_attach_link(doc, print_format): "print_format": print_format, "key": doc.get_parent_doc().get_signature() }) + +def sendmail(communication_name, print_html=None, print_format=None, attachments=None, + recipients=None, cc=None, lang=None, session=None): + try: + + if lang: + frappe.local.lang = lang + + if session: + # hack to enable access to private files in PDF + session['data'] = frappe._dict(session['data']) + frappe.local.session.update(session) + + # upto 3 retries + for i in xrange(3): + try: + communication = frappe.get_doc("Communication", communication_name) + communication._notify(print_html=print_html, print_format=print_format, attachments=attachments, + recipients=recipients, cc=cc) + + except MySQLdb.OperationalError, e: + # deadlock, try again + if e.args[0]==1213: + frappe.db.rollback() + time.sleep(1) + continue + else: + raise + else: + break + + except: + traceback = log("frappe.core.doctype.communication.email.sendmail", frappe.as_json({ + "communication_name": communication_name, + "print_html": print_html, + "print_format": print_format, + "attachments": attachments, + "recipients": recipients, + "cc": cc, + "lang": lang + })) + frappe.get_logger(__name__).error(traceback) + raise \ No newline at end of file diff --git a/frappe/tasks.py b/frappe/tasks.py index c6b10cf17c..7208ffce31 100644 --- a/frappe/tasks.py +++ b/frappe/tasks.py @@ -3,86 +3,11 @@ from __future__ import unicode_literals import frappe -from frappe.utils.scheduler import enqueue_events -from frappe.celery_app import get_celery, celery_task, task_logger, LONGJOBS_PREFIX, ASYNC_TASKS_PREFIX -from frappe.utils import get_sites -from frappe.utils.error import make_error_snapshot -from frappe.utils.file_lock import create_lock, delete_lock from frappe.handler import execute_cmd from frappe.async import set_task_status, END_LINE, get_std_streams -from frappe.utils.scheduler import log import frappe.utils.response import sys -import time -import json -import os -import MySQLdb -@celery_task() -def sync_queues(): - """notifies workers to monitor newly added sites""" - app = get_celery() - shortjob_workers, longjob_workers, async_tasks_workers = get_workers(app) - - if shortjob_workers: - for worker in shortjob_workers: - sync_worker(app, worker) - - if longjob_workers: - for worker in longjob_workers: - sync_worker(app, worker, prefix=LONGJOBS_PREFIX) - - if async_tasks_workers: - for worker in async_tasks_workers: - sync_worker(app, worker, prefix=ASYNC_TASKS_PREFIX) - -def get_workers(app): - longjob_workers = [] - shortjob_workers = [] - async_tasks_workers = [] - - active_queues = app.control.inspect().active_queues() - for worker in active_queues: - if worker.startswith(LONGJOBS_PREFIX): - longjob_workers.append(worker) - elif worker.startswith(ASYNC_TASKS_PREFIX): - async_tasks_workers.append(worker) - else: - shortjob_workers.append(worker) - - return shortjob_workers, longjob_workers, async_tasks_workers - -def sync_worker(app, worker, prefix=''): - active_queues = set(get_active_queues(app, worker)) - required_queues = set(get_required_queues(app, prefix=prefix)) - to_add = required_queues - active_queues - to_remove = active_queues - required_queues - - for queue in to_add: - if is_site_in_maintenance_mode(queue, prefix): - continue - - app.control.broadcast('add_consumer', arguments={ - 'queue': queue - }, reply=True, destination=[worker]) - - for queue in to_remove: - app.control.broadcast('cancel_consumer', arguments={ - 'queue': queue - }, reply=True, destination=[worker]) - -def get_active_queues(app, worker): - active_queues = app.control.inspect().active_queues() - if not (active_queues and active_queues.get(worker)): - return [] - return [queue['name'] for queue in active_queues[worker]] - -def get_required_queues(app, prefix=''): - ret = [] - for site in get_sites(): - ret.append('{}{}'.format(prefix, site)) - ret.append(app.conf['CELERY_DEFAULT_QUEUE']) - return ret def is_site_in_maintenance_mode(queue, prefix): # check if site is in maintenance mode @@ -101,56 +26,6 @@ def is_site_in_maintenance_mode(queue, prefix): return False -@celery_task() -def scheduler_task(site, event, handler, now=False): - traceback = "" - task_logger.info('running {handler} for {site} for event: {event}'.format(handler=handler, site=site, event=event)) - try: - frappe.init(site=site) - if not create_lock(handler): - return - if not now: - frappe.connect(site=site) - frappe.flags.in_scheduler = True - frappe.get_attr(handler)() - - except Exception: - frappe.db.rollback() - traceback = log(handler, "Method: {event}, Handler: {handler}".format(event=event, handler=handler)) - task_logger.warn(traceback) - raise - - else: - frappe.db.commit() - - finally: - delete_lock(handler) - - if not now: - frappe.destroy() - - task_logger.info('ran {handler} for {site} for event: {event}'.format(handler=handler, site=site, event=event)) - -@celery_task() -def enqueue_scheduler_events(): - for site in get_sites(): - enqueue_events_for_site.delay(site=site) - -@celery_task() -def enqueue_events_for_site(site): - try: - frappe.init(site=site) - if frappe.local.conf.maintenance_mode or frappe.conf.disable_scheduler: - return - frappe.connect(site=site) - enqueue_events(site) - except: - task_logger.error('Exception in Enqueue Events for Site {0}'.format(site)) - raise - finally: - frappe.destroy() - -@celery_task() def pull_from_email_account(site, email_account): try: frappe.init(site=site) @@ -161,7 +36,6 @@ def pull_from_email_account(site, email_account): finally: frappe.destroy() -@celery_task(bind=True) def run_async_task(self, site=None, user=None, cmd=None, form_dict=None, hijack_std=False): ret = {} frappe.init(site) @@ -191,7 +65,7 @@ def run_async_task(self, site=None, user=None, cmd=None, form_dict=None, hijack_ frappe.errprint(frappe.get_traceback()) frappe.utils.response.make_logs() set_task_status(self.request.id, "Error", response=ret) - task_logger.error('Exception in running {}: {}'.format(cmd, ret['exc'])) + frappe.get_logger(__name__).error('Exception in running {}: {}'.format(cmd, ret['exc'])) else: set_task_status(self.request.id, "Success", response=ret) if not frappe.flags.in_test: @@ -209,56 +83,3 @@ def run_async_task(self, site=None, user=None, cmd=None, form_dict=None, hijack_ return ret - -@celery_task() -def sendmail(site, communication_name, print_html=None, print_format=None, attachments=None, - recipients=None, cc=None, lang=None, session=None): - try: - frappe.connect(site=site) - - if lang: - frappe.local.lang = lang - - if session: - # hack to enable access to private files in PDF - session['data'] = frappe._dict(session['data']) - frappe.local.session.update(session) - - # upto 3 retries - for i in xrange(3): - try: - communication = frappe.get_doc("Communication", communication_name) - communication._notify(print_html=print_html, print_format=print_format, attachments=attachments, - recipients=recipients, cc=cc) - - except MySQLdb.OperationalError, e: - # deadlock, try again - if e.args[0]==1213: - frappe.db.rollback() - time.sleep(1) - continue - else: - raise - else: - break - - except: - traceback = log("frappe.tasks.sendmail", frappe.as_json({ - "site": site, - "communication_name": communication_name, - "print_html": print_html, - "print_format": print_format, - "attachments": attachments, - "recipients": recipients, - "cc": cc, - "lang": lang - })) - task_logger.error(traceback) - raise - - else: - frappe.db.commit() - - finally: - frappe.destroy() - diff --git a/frappe/utils/background_jobs.py b/frappe/utils/background_jobs.py new file mode 100644 index 0000000000..ac1f251f85 --- /dev/null +++ b/frappe/utils/background_jobs.py @@ -0,0 +1,52 @@ +from redis import Redis +from rq import Queue +import frappe +from collections import defaultdict +from frappe.utils import cstr + +logger = frappe.get_logger(__name__) + +def enqueue(method, queue, timeout, **kwargs): + """queue should be either low, high or default + timeout should be set accoridngly""" + q = Queue(queue, connection=Redis()) + q.enqueue(rq_task, args=(frappe.local.site, method, kwargs)) + +def get_jobs(): + job_dict = defaultdict(list) + queue_list = ['low', 'default', 'high'] + for queue in queue_list: + q = Queue(queue, connection=Redis()) + for job in q.jobs: + job_dict[job.args[0]].append(job.args[1]) + return job_dict + +def rq_task(site, method, kwargs): + from frappe.utils.scheduler import log + frappe.connect(site) + + if isinstance(method, basestring): + method_name = method + method = frappe.get_attr(method) + else: + method_name = cstr(method) + + try: + method(**kwargs) + except: + frappe.db.rollback() + log(method_name) + else: + frappe.db.commit() + finally: + frappe.destroy() + + +def new_todo(description): + doc = frappe.get_doc({ + "doctype": "ToDo", + "title": "My new project", + "description": description, + "status": "Open" + }) + doc.insert() \ No newline at end of file diff --git a/frappe/utils/scheduler.py b/frappe/utils/scheduler.py index 5242f10612..b50bad8951 100644 --- a/frappe/utils/scheduler.py +++ b/frappe/utils/scheduler.py @@ -13,8 +13,10 @@ from __future__ import unicode_literals import frappe import json import frappe.utils +from frappe.utils import get_sites from frappe.utils.file_lock import create_lock, check_lock, delete_lock from datetime import datetime +from background_jobs import enqueue, get_jobs DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S' @@ -23,9 +25,9 @@ def enqueue_events(site): return # lock before queuing begins - lock = create_lock('scheduler') - if not lock: - return + # lock = create_lock('scheduler') + # if not lock: + # return nowtime = frappe.utils.now_datetime() last = frappe.db.get_value('System Settings', 'System Settings', 'scheduler_last_event') @@ -87,14 +89,28 @@ def enqueue_applicable_events(site, nowtime, last): def trigger(site, event, now=False): """trigger method in startup.schedule_handler""" - from frappe.tasks import scheduler_task - + if event.endswith("long"): + queue = 'low' + timeout = 25 + else: + queue = 'default' + timeout = 5 for handler in frappe.get_hooks("scheduler_events").get(event, []): - if not check_lock(handler): - if not now: - scheduler_task.delay(site=site, event=event, handler=handler) - else: - scheduler_task(site=site, event=event, handler=handler, now=True) + if job_dict.get(site): + if not handler in job_dict.get(site): + enqueue_task(handler, queue, timeout, now) + else: + enqueue_task(handler, queue, timeout, now) + + + +def enqueue_task(handler, queue, timeout, now): + if not now: + enqueue(handler, queue, timeout) + else: + scheduler_task(site=site, event=event, handler=handler, now=True) + + if frappe.flags.in_test: frappe.flags.ran_schedulers.append(event) @@ -167,3 +183,53 @@ def get_error_report(from_date=None, to_date=None, limit=10): else: return 0, "

Scheduler didn't encounter any problems.

" + +def scheduler_task(site, event, handler, now=False): + traceback = "" + frappe.get_logger(__name__).info('running {handler} for {site} for event: {event}'.format(handler=handler, site=site, event=event)) + try: + frappe.init(site=site) + if not create_lock(handler): + return + if not now: + frappe.connect(site=site) + + frappe.flags.in_scheduler = True + frappe.get_attr(handler)() + + except Exception: + frappe.db.rollback() + traceback = log(handler, "Method: {event}, Handler: {handler}".format(event=event, handler=handler)) + frappe.get_logger(__name__).warn(traceback) + raise + + else: + frappe.db.commit() + + finally: + delete_lock(handler) + + if not now: + frappe.destroy() + + frappe.get_logger(__name__).info('ran {handler} for {site} for event: {event}'.format(handler=handler, site=site, event=event)) + + +def enqueue_scheduler_events(): + global job_dict + job_dict = get_jobs() + for site in get_sites(): + enqueue_events_for_site(site=site) + +def enqueue_events_for_site(site): + try: + frappe.init(site=site) + if frappe.local.conf.maintenance_mode or frappe.conf.disable_scheduler: + return + frappe.connect(site=site) + enqueue_events(site) + except: + frappe.get_logger(__name__).error('Exception in Enqueue Events for Site {0}'.format(site)) + raise + finally: + frappe.destroy() diff --git a/requirements.txt b/requirements.txt index 8b74dcf845..5ce552f40c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -32,3 +32,6 @@ bleach bleach-whitelist Pillow beautifulsoup4 +rq +rq-dashboard +