WIP code migration from Celery to RQ
This commit is contained in:
parent
ae0afc73df
commit
6d0563e8ea
5 changed files with 180 additions and 193 deletions
|
|
@ -8,7 +8,10 @@ from email.utils import formataddr, parseaddr
|
|||
from frappe.utils import get_url, get_formatted_email, cint, validate_email_add, split_emails
|
||||
from frappe.utils.file_manager import get_file
|
||||
from frappe.email.bulk import check_bulk_limit
|
||||
from frappe.utils.scheduler import log
|
||||
import frappe.email.smtp
|
||||
import MySQLdb
|
||||
import time
|
||||
from frappe import _
|
||||
|
||||
@frappe.whitelist()
|
||||
|
|
@ -105,9 +108,8 @@ def notify(doc, print_html=None, print_format=None, attachments=None,
|
|||
recipients=recipients, cc=cc)
|
||||
else:
|
||||
check_bulk_limit(list(set(doc.sent_email_addresses)))
|
||||
|
||||
from frappe.tasks import sendmail
|
||||
sendmail.delay(frappe.local.site, doc.name,
|
||||
from frappe.utils.background_jobs import enqueue
|
||||
enqueue(sendmail, communication_name=doc.name,
|
||||
print_html=print_html, print_format=print_format, attachments=attachments,
|
||||
recipients=recipients, cc=cc, lang=frappe.local.lang, session=frappe.local.session)
|
||||
|
||||
|
|
@ -349,3 +351,46 @@ def get_attach_link(doc, print_format):
|
|||
"print_format": print_format,
|
||||
"key": doc.get_parent_doc().get_signature()
|
||||
})
|
||||
|
||||
def sendmail(communication_name, print_html=None, print_format=None, attachments=None,
|
||||
recipients=None, cc=None, lang=None, session=None):
|
||||
try:
|
||||
|
||||
if lang:
|
||||
frappe.local.lang = lang
|
||||
|
||||
if session:
|
||||
# hack to enable access to private files in PDF
|
||||
session['data'] = frappe._dict(session['data'])
|
||||
frappe.local.session.update(session)
|
||||
|
||||
# upto 3 retries
|
||||
for i in xrange(3):
|
||||
try:
|
||||
communication = frappe.get_doc("Communication", communication_name)
|
||||
communication._notify(print_html=print_html, print_format=print_format, attachments=attachments,
|
||||
recipients=recipients, cc=cc)
|
||||
|
||||
except MySQLdb.OperationalError, e:
|
||||
# deadlock, try again
|
||||
if e.args[0]==1213:
|
||||
frappe.db.rollback()
|
||||
time.sleep(1)
|
||||
continue
|
||||
else:
|
||||
raise
|
||||
else:
|
||||
break
|
||||
|
||||
except:
|
||||
traceback = log("frappe.core.doctype.communication.email.sendmail", frappe.as_json({
|
||||
"communication_name": communication_name,
|
||||
"print_html": print_html,
|
||||
"print_format": print_format,
|
||||
"attachments": attachments,
|
||||
"recipients": recipients,
|
||||
"cc": cc,
|
||||
"lang": lang
|
||||
}))
|
||||
frappe.get_logger(__name__).error(traceback)
|
||||
raise
|
||||
181
frappe/tasks.py
181
frappe/tasks.py
|
|
@ -3,86 +3,11 @@
|
|||
|
||||
from __future__ import unicode_literals
|
||||
import frappe
|
||||
from frappe.utils.scheduler import enqueue_events
|
||||
from frappe.celery_app import get_celery, celery_task, task_logger, LONGJOBS_PREFIX, ASYNC_TASKS_PREFIX
|
||||
from frappe.utils import get_sites
|
||||
from frappe.utils.error import make_error_snapshot
|
||||
from frappe.utils.file_lock import create_lock, delete_lock
|
||||
from frappe.handler import execute_cmd
|
||||
from frappe.async import set_task_status, END_LINE, get_std_streams
|
||||
from frappe.utils.scheduler import log
|
||||
import frappe.utils.response
|
||||
import sys
|
||||
import time
|
||||
import json
|
||||
import os
|
||||
import MySQLdb
|
||||
|
||||
@celery_task()
|
||||
def sync_queues():
|
||||
"""notifies workers to monitor newly added sites"""
|
||||
app = get_celery()
|
||||
shortjob_workers, longjob_workers, async_tasks_workers = get_workers(app)
|
||||
|
||||
if shortjob_workers:
|
||||
for worker in shortjob_workers:
|
||||
sync_worker(app, worker)
|
||||
|
||||
if longjob_workers:
|
||||
for worker in longjob_workers:
|
||||
sync_worker(app, worker, prefix=LONGJOBS_PREFIX)
|
||||
|
||||
if async_tasks_workers:
|
||||
for worker in async_tasks_workers:
|
||||
sync_worker(app, worker, prefix=ASYNC_TASKS_PREFIX)
|
||||
|
||||
def get_workers(app):
|
||||
longjob_workers = []
|
||||
shortjob_workers = []
|
||||
async_tasks_workers = []
|
||||
|
||||
active_queues = app.control.inspect().active_queues()
|
||||
for worker in active_queues:
|
||||
if worker.startswith(LONGJOBS_PREFIX):
|
||||
longjob_workers.append(worker)
|
||||
elif worker.startswith(ASYNC_TASKS_PREFIX):
|
||||
async_tasks_workers.append(worker)
|
||||
else:
|
||||
shortjob_workers.append(worker)
|
||||
|
||||
return shortjob_workers, longjob_workers, async_tasks_workers
|
||||
|
||||
def sync_worker(app, worker, prefix=''):
|
||||
active_queues = set(get_active_queues(app, worker))
|
||||
required_queues = set(get_required_queues(app, prefix=prefix))
|
||||
to_add = required_queues - active_queues
|
||||
to_remove = active_queues - required_queues
|
||||
|
||||
for queue in to_add:
|
||||
if is_site_in_maintenance_mode(queue, prefix):
|
||||
continue
|
||||
|
||||
app.control.broadcast('add_consumer', arguments={
|
||||
'queue': queue
|
||||
}, reply=True, destination=[worker])
|
||||
|
||||
for queue in to_remove:
|
||||
app.control.broadcast('cancel_consumer', arguments={
|
||||
'queue': queue
|
||||
}, reply=True, destination=[worker])
|
||||
|
||||
def get_active_queues(app, worker):
|
||||
active_queues = app.control.inspect().active_queues()
|
||||
if not (active_queues and active_queues.get(worker)):
|
||||
return []
|
||||
return [queue['name'] for queue in active_queues[worker]]
|
||||
|
||||
def get_required_queues(app, prefix=''):
|
||||
ret = []
|
||||
for site in get_sites():
|
||||
ret.append('{}{}'.format(prefix, site))
|
||||
ret.append(app.conf['CELERY_DEFAULT_QUEUE'])
|
||||
return ret
|
||||
|
||||
def is_site_in_maintenance_mode(queue, prefix):
|
||||
# check if site is in maintenance mode
|
||||
|
|
@ -101,56 +26,6 @@ def is_site_in_maintenance_mode(queue, prefix):
|
|||
|
||||
return False
|
||||
|
||||
@celery_task()
|
||||
def scheduler_task(site, event, handler, now=False):
|
||||
traceback = ""
|
||||
task_logger.info('running {handler} for {site} for event: {event}'.format(handler=handler, site=site, event=event))
|
||||
try:
|
||||
frappe.init(site=site)
|
||||
if not create_lock(handler):
|
||||
return
|
||||
if not now:
|
||||
frappe.connect(site=site)
|
||||
frappe.flags.in_scheduler = True
|
||||
frappe.get_attr(handler)()
|
||||
|
||||
except Exception:
|
||||
frappe.db.rollback()
|
||||
traceback = log(handler, "Method: {event}, Handler: {handler}".format(event=event, handler=handler))
|
||||
task_logger.warn(traceback)
|
||||
raise
|
||||
|
||||
else:
|
||||
frappe.db.commit()
|
||||
|
||||
finally:
|
||||
delete_lock(handler)
|
||||
|
||||
if not now:
|
||||
frappe.destroy()
|
||||
|
||||
task_logger.info('ran {handler} for {site} for event: {event}'.format(handler=handler, site=site, event=event))
|
||||
|
||||
@celery_task()
|
||||
def enqueue_scheduler_events():
|
||||
for site in get_sites():
|
||||
enqueue_events_for_site.delay(site=site)
|
||||
|
||||
@celery_task()
|
||||
def enqueue_events_for_site(site):
|
||||
try:
|
||||
frappe.init(site=site)
|
||||
if frappe.local.conf.maintenance_mode or frappe.conf.disable_scheduler:
|
||||
return
|
||||
frappe.connect(site=site)
|
||||
enqueue_events(site)
|
||||
except:
|
||||
task_logger.error('Exception in Enqueue Events for Site {0}'.format(site))
|
||||
raise
|
||||
finally:
|
||||
frappe.destroy()
|
||||
|
||||
@celery_task()
|
||||
def pull_from_email_account(site, email_account):
|
||||
try:
|
||||
frappe.init(site=site)
|
||||
|
|
@ -161,7 +36,6 @@ def pull_from_email_account(site, email_account):
|
|||
finally:
|
||||
frappe.destroy()
|
||||
|
||||
@celery_task(bind=True)
|
||||
def run_async_task(self, site=None, user=None, cmd=None, form_dict=None, hijack_std=False):
|
||||
ret = {}
|
||||
frappe.init(site)
|
||||
|
|
@ -191,7 +65,7 @@ def run_async_task(self, site=None, user=None, cmd=None, form_dict=None, hijack_
|
|||
frappe.errprint(frappe.get_traceback())
|
||||
frappe.utils.response.make_logs()
|
||||
set_task_status(self.request.id, "Error", response=ret)
|
||||
task_logger.error('Exception in running {}: {}'.format(cmd, ret['exc']))
|
||||
frappe.get_logger(__name__).error('Exception in running {}: {}'.format(cmd, ret['exc']))
|
||||
else:
|
||||
set_task_status(self.request.id, "Success", response=ret)
|
||||
if not frappe.flags.in_test:
|
||||
|
|
@ -209,56 +83,3 @@ def run_async_task(self, site=None, user=None, cmd=None, form_dict=None, hijack_
|
|||
|
||||
return ret
|
||||
|
||||
|
||||
@celery_task()
|
||||
def sendmail(site, communication_name, print_html=None, print_format=None, attachments=None,
|
||||
recipients=None, cc=None, lang=None, session=None):
|
||||
try:
|
||||
frappe.connect(site=site)
|
||||
|
||||
if lang:
|
||||
frappe.local.lang = lang
|
||||
|
||||
if session:
|
||||
# hack to enable access to private files in PDF
|
||||
session['data'] = frappe._dict(session['data'])
|
||||
frappe.local.session.update(session)
|
||||
|
||||
# upto 3 retries
|
||||
for i in xrange(3):
|
||||
try:
|
||||
communication = frappe.get_doc("Communication", communication_name)
|
||||
communication._notify(print_html=print_html, print_format=print_format, attachments=attachments,
|
||||
recipients=recipients, cc=cc)
|
||||
|
||||
except MySQLdb.OperationalError, e:
|
||||
# deadlock, try again
|
||||
if e.args[0]==1213:
|
||||
frappe.db.rollback()
|
||||
time.sleep(1)
|
||||
continue
|
||||
else:
|
||||
raise
|
||||
else:
|
||||
break
|
||||
|
||||
except:
|
||||
traceback = log("frappe.tasks.sendmail", frappe.as_json({
|
||||
"site": site,
|
||||
"communication_name": communication_name,
|
||||
"print_html": print_html,
|
||||
"print_format": print_format,
|
||||
"attachments": attachments,
|
||||
"recipients": recipients,
|
||||
"cc": cc,
|
||||
"lang": lang
|
||||
}))
|
||||
task_logger.error(traceback)
|
||||
raise
|
||||
|
||||
else:
|
||||
frappe.db.commit()
|
||||
|
||||
finally:
|
||||
frappe.destroy()
|
||||
|
||||
|
|
|
|||
52
frappe/utils/background_jobs.py
Normal file
52
frappe/utils/background_jobs.py
Normal file
|
|
@ -0,0 +1,52 @@
|
|||
from redis import Redis
|
||||
from rq import Queue
|
||||
import frappe
|
||||
from collections import defaultdict
|
||||
from frappe.utils import cstr
|
||||
|
||||
logger = frappe.get_logger(__name__)
|
||||
|
||||
def enqueue(method, queue, timeout, **kwargs):
|
||||
"""queue should be either low, high or default
|
||||
timeout should be set accoridngly"""
|
||||
q = Queue(queue, connection=Redis())
|
||||
q.enqueue(rq_task, args=(frappe.local.site, method, kwargs))
|
||||
|
||||
def get_jobs():
|
||||
job_dict = defaultdict(list)
|
||||
queue_list = ['low', 'default', 'high']
|
||||
for queue in queue_list:
|
||||
q = Queue(queue, connection=Redis())
|
||||
for job in q.jobs:
|
||||
job_dict[job.args[0]].append(job.args[1])
|
||||
return job_dict
|
||||
|
||||
def rq_task(site, method, kwargs):
|
||||
from frappe.utils.scheduler import log
|
||||
frappe.connect(site)
|
||||
|
||||
if isinstance(method, basestring):
|
||||
method_name = method
|
||||
method = frappe.get_attr(method)
|
||||
else:
|
||||
method_name = cstr(method)
|
||||
|
||||
try:
|
||||
method(**kwargs)
|
||||
except:
|
||||
frappe.db.rollback()
|
||||
log(method_name)
|
||||
else:
|
||||
frappe.db.commit()
|
||||
finally:
|
||||
frappe.destroy()
|
||||
|
||||
|
||||
def new_todo(description):
|
||||
doc = frappe.get_doc({
|
||||
"doctype": "ToDo",
|
||||
"title": "My new project",
|
||||
"description": description,
|
||||
"status": "Open"
|
||||
})
|
||||
doc.insert()
|
||||
|
|
@ -13,8 +13,10 @@ from __future__ import unicode_literals
|
|||
import frappe
|
||||
import json
|
||||
import frappe.utils
|
||||
from frappe.utils import get_sites
|
||||
from frappe.utils.file_lock import create_lock, check_lock, delete_lock
|
||||
from datetime import datetime
|
||||
from background_jobs import enqueue, get_jobs
|
||||
|
||||
DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S'
|
||||
|
||||
|
|
@ -23,9 +25,9 @@ def enqueue_events(site):
|
|||
return
|
||||
|
||||
# lock before queuing begins
|
||||
lock = create_lock('scheduler')
|
||||
if not lock:
|
||||
return
|
||||
# lock = create_lock('scheduler')
|
||||
# if not lock:
|
||||
# return
|
||||
|
||||
nowtime = frappe.utils.now_datetime()
|
||||
last = frappe.db.get_value('System Settings', 'System Settings', 'scheduler_last_event')
|
||||
|
|
@ -87,14 +89,28 @@ def enqueue_applicable_events(site, nowtime, last):
|
|||
|
||||
def trigger(site, event, now=False):
|
||||
"""trigger method in startup.schedule_handler"""
|
||||
from frappe.tasks import scheduler_task
|
||||
|
||||
if event.endswith("long"):
|
||||
queue = 'low'
|
||||
timeout = 25
|
||||
else:
|
||||
queue = 'default'
|
||||
timeout = 5
|
||||
for handler in frappe.get_hooks("scheduler_events").get(event, []):
|
||||
if not check_lock(handler):
|
||||
if not now:
|
||||
scheduler_task.delay(site=site, event=event, handler=handler)
|
||||
else:
|
||||
scheduler_task(site=site, event=event, handler=handler, now=True)
|
||||
if job_dict.get(site):
|
||||
if not handler in job_dict.get(site):
|
||||
enqueue_task(handler, queue, timeout, now)
|
||||
else:
|
||||
enqueue_task(handler, queue, timeout, now)
|
||||
|
||||
|
||||
|
||||
def enqueue_task(handler, queue, timeout, now):
|
||||
if not now:
|
||||
enqueue(handler, queue, timeout)
|
||||
else:
|
||||
scheduler_task(site=site, event=event, handler=handler, now=True)
|
||||
|
||||
|
||||
|
||||
if frappe.flags.in_test:
|
||||
frappe.flags.ran_schedulers.append(event)
|
||||
|
|
@ -167,3 +183,53 @@ def get_error_report(from_date=None, to_date=None, limit=10):
|
|||
else:
|
||||
return 0, "<p>Scheduler didn't encounter any problems.</p>"
|
||||
|
||||
|
||||
def scheduler_task(site, event, handler, now=False):
|
||||
traceback = ""
|
||||
frappe.get_logger(__name__).info('running {handler} for {site} for event: {event}'.format(handler=handler, site=site, event=event))
|
||||
try:
|
||||
frappe.init(site=site)
|
||||
if not create_lock(handler):
|
||||
return
|
||||
if not now:
|
||||
frappe.connect(site=site)
|
||||
|
||||
frappe.flags.in_scheduler = True
|
||||
frappe.get_attr(handler)()
|
||||
|
||||
except Exception:
|
||||
frappe.db.rollback()
|
||||
traceback = log(handler, "Method: {event}, Handler: {handler}".format(event=event, handler=handler))
|
||||
frappe.get_logger(__name__).warn(traceback)
|
||||
raise
|
||||
|
||||
else:
|
||||
frappe.db.commit()
|
||||
|
||||
finally:
|
||||
delete_lock(handler)
|
||||
|
||||
if not now:
|
||||
frappe.destroy()
|
||||
|
||||
frappe.get_logger(__name__).info('ran {handler} for {site} for event: {event}'.format(handler=handler, site=site, event=event))
|
||||
|
||||
|
||||
def enqueue_scheduler_events():
|
||||
global job_dict
|
||||
job_dict = get_jobs()
|
||||
for site in get_sites():
|
||||
enqueue_events_for_site(site=site)
|
||||
|
||||
def enqueue_events_for_site(site):
|
||||
try:
|
||||
frappe.init(site=site)
|
||||
if frappe.local.conf.maintenance_mode or frappe.conf.disable_scheduler:
|
||||
return
|
||||
frappe.connect(site=site)
|
||||
enqueue_events(site)
|
||||
except:
|
||||
frappe.get_logger(__name__).error('Exception in Enqueue Events for Site {0}'.format(site))
|
||||
raise
|
||||
finally:
|
||||
frappe.destroy()
|
||||
|
|
|
|||
|
|
@ -32,3 +32,6 @@ bleach
|
|||
bleach-whitelist
|
||||
Pillow
|
||||
beautifulsoup4
|
||||
rq
|
||||
rq-dashboard
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue