[fix] with frappe.init_site, worker and schedule commands, cleanup and fixes to rq code

This commit is contained in:
Anand Doshi 2016-04-12 20:27:46 +05:30
parent a2ad631754
commit a48dd50381
11 changed files with 233 additions and 222 deletions

View file

@ -176,6 +176,27 @@ def get_site_config(sites_path=None, site_path=None):
return _dict(config)
def get_conf(site=None):
if hasattr(local, 'conf'):
return local.conf
else:
# if no site, get from common_site_config.json
with init_site(site):
return local.conf
class init_site:
def __init__(self, site=None):
'''If site==None, initialize it for empty site ('') to load common_site_config.json'''
self.site = site or ''
def __enter__(self):
init(self.site)
return local
def __exit__(self, type, value, traceback):
destroy()
def destroy():
"""Closes connection and releases werkzeug local."""
if db:
@ -191,7 +212,6 @@ def cache():
if not redis_server:
from frappe.utils.redis_wrapper import RedisWrapper
redis_server = RedisWrapper.from_url(conf.get('redis_cache')
or conf.get("cache_redis_server")
or "redis://localhost:11311")
return redis_server

View file

@ -166,7 +166,6 @@ def get_redis_server():
if not redis_server:
from redis import Redis
redis_server = Redis.from_url(conf.get("redis_socketio")
or conf.get("async_redis_server")
or "redis://localhost:12311")
return redis_server

View file

@ -97,6 +97,8 @@ def _new_site(db_name, site, mariadb_root_username=None, mariadb_root_password=N
_install_app(app, verbose=verbose, set_as_patched=not source_sql)
frappe.utils.scheduler.toggle_scheduler(enable_scheduler)
frappe.db.commit()
scheduler_status = "disabled" if frappe.utils.scheduler.is_scheduler_disabled() else "enabled"
print "*** Scheduler is", scheduler_status, "***"
frappe.destroy()
@ -1012,11 +1014,16 @@ def get_version():
if hasattr(module, "__version__"):
print "{0} {1}".format(m, module.__version__)
@click.command('schedule')
def start_scheduler():
from frappe.utils.scheduler import start_scheduler
start_scheduler()
@click.command('start-worker')
def start_rq_worker():
from frappe.utils.background_jobs import start_all_workers
start_all_workers()
@click.command('worker')
@click.option('--queue', type=str)
def start_worker(queue):
from frappe.utils.background_jobs import start_worker
start_worker(queue)
commands = [
new_site,
@ -1074,5 +1081,6 @@ commands = [
set_config,
get_version,
new_language,
start_rq_worker,
start_worker,
start_scheduler,
]

View file

@ -13,6 +13,7 @@ import frappe.email.smtp
import MySQLdb
import time
from frappe import _
from frappe.utils.background_jobs import enqueue
@frappe.whitelist()
def make(doctype=None, name=None, content=None, subject=None, sent_or_received = "Sent",
@ -107,8 +108,8 @@ def notify(doc, print_html=None, print_format=None, attachments=None,
recipients=recipients, cc=cc)
else:
check_bulk_limit(list(set(doc.sent_email_addresses)))
from frappe.utils.background_jobs import enqueue
enqueue(sendmail, queue="short", timeout=300, event="email", communication_name=doc.name,
enqueue(sendmail, queue="default", timeout=300, event="sendmail",
communication_name=doc.name,
print_html=print_html, print_format=print_format, attachments=attachments,
recipients=recipients, cc=cc, lang=frappe.local.lang, session=frappe.local.session)
@ -392,4 +393,4 @@ def sendmail(communication_name, print_html=None, print_format=None, attachments
"lang": lang
}))
frappe.get_logger(__name__).error(traceback)
raise
raise

View file

@ -18,8 +18,9 @@ from dateutil.relativedelta import relativedelta
from datetime import datetime, timedelta
from frappe.desk.form import assign_to
from frappe.utils.user import get_system_managers
from frappe.utils.background_jobs import enqueue
from frappe.utils.background_jobs import enqueue, get_jobs
from frappe.core.doctype.communication.email import set_incoming_outgoing_accounts
from frappe.utils.scheduler import log
class SentEmailInInbox(Exception): pass
@ -184,11 +185,14 @@ class EmailAccount(Document):
except Exception:
frappe.db.rollback()
log('email_account.receive')
exceptions.append(frappe.get_traceback())
else:
frappe.db.commit()
attachments = [d.file_name for d in communication._attachments]
# TODO fix bug where it sends emails to 'Adminsitrator' during testing
communication.notify(attachments=attachments, fetched_from_email_account=True)
if exceptions:
@ -371,15 +375,6 @@ def get_append_to(doctype=None, txt=None, searchfield=None, start=None, page_len
if not txt: txt = ""
return [[d] for d in frappe.get_hooks("email_append_to") if txt in d]
def pull(now=False):
"""Will be called via scheduler, pull emails from all enabled Email accounts."""
import frappe.tasks
for email_account in frappe.get_list("Email Account", filters={"enable_incoming": 1}):
if now:
pull_from_email_account(frappe.local.site, email_account.name)
else:
enqueue(pull_from_email_account, frappe.local.site, email_account.name)
def notify_unreplied():
"""Sends email notifications if there are unreplied Communications
and `notify_if_unreplied` is set as true."""
@ -407,14 +402,23 @@ def notify_unreplied():
# update flag
comm.db_set("unread_notification_sent", 1)
def pull(now=False):
"""Will be called via scheduler, pull emails from all enabled Email accounts."""
queued_jobs = get_jobs(site=frappe.local.site, key='job_name')[frappe.local.site]
for email_account in frappe.get_list("Email Account", filters={"enable_incoming": 1}):
if now:
pull_from_email_account(email_account.name)
def pull_from_email_account(site, email_account):
try:
frappe.init(site=site)
frappe.connect(site=site)
email_account = frappe.get_doc("Email Account", email_account)
email_account.receive()
frappe.db.commit()
finally:
frappe.destroy()
else:
# job_name is used to prevent duplicates in queue
job_name = 'pull_from_email_account|{0}'.format(email_account.name)
if job_name not in queued_jobs:
enqueue(pull_from_email_account, 'short', event='all', job_name=job_name,
email_account=email_account.name)
def pull_from_email_account(email_account):
'''Runs within a worker process'''
email_account = frappe.get_doc("Email Account", email_account)
email_account.receive()

View file

@ -1,85 +0,0 @@
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# MIT License. See license.txt
from __future__ import unicode_literals
import frappe
from frappe.handler import execute_cmd
from frappe.async import set_task_status, END_LINE, get_std_streams
import frappe.utils.response
import sys
def is_site_in_maintenance_mode(queue, prefix):
# check if site is in maintenance mode
site = queue.replace(prefix, "")
try:
frappe.init(site=site)
if not frappe.local.conf.db_name or frappe.local.conf.maintenance_mode or frappe.conf.disable_scheduler:
# don't add site if in maintenance mode
return True
except frappe.IncorrectSitePath:
return True
finally:
frappe.destroy()
return False
def pull_from_email_account(site, email_account):
try:
frappe.init(site=site)
frappe.connect(site=site)
email_account = frappe.get_doc("Email Account", email_account)
email_account.receive()
frappe.db.commit()
finally:
frappe.destroy()
def run_async_task(self, site=None, user=None, cmd=None, form_dict=None, hijack_std=False):
ret = {}
frappe.init(site)
frappe.connect()
frappe.local.task_id = self.request.id
if hijack_std:
original_stdout, original_stderr = sys.stdout, sys.stderr
sys.stdout, sys.stderr = get_std_streams(self.request.id)
frappe.local.stdout, frappe.local.stderr = sys.stdout, sys.stderr
try:
set_task_status(self.request.id, "Running")
frappe.db.commit()
frappe.set_user(user)
# sleep(60)
frappe.local.form_dict = frappe._dict(form_dict)
execute_cmd(cmd, from_async=True)
ret = frappe.local.response
except Exception, e:
frappe.db.rollback()
ret = frappe.local.response
http_status_code = getattr(e, "http_status_code", 500)
ret['status_code'] = http_status_code
frappe.errprint(frappe.get_traceback())
frappe.utils.response.make_logs()
set_task_status(self.request.id, "Error", response=ret)
frappe.get_logger(__name__).error('Exception in running {}: {}'.format(cmd, ret['exc']))
else:
set_task_status(self.request.id, "Success", response=ret)
if not frappe.flags.in_test:
frappe.db.commit()
finally:
if not frappe.flags.in_test:
frappe.destroy()
if hijack_std:
sys.stdout.write('\n' + END_LINE)
sys.stderr.write('\n' + END_LINE)
sys.stdout.close()
sys.stderr.close()
sys.stdout, sys.stderr = original_stdout, original_stderr
return ret

View file

@ -2,11 +2,12 @@ from __future__ import unicode_literals
from unittest import TestCase
from frappe.utils.scheduler import enqueue_applicable_events
from frappe.utils.background_jobs import enqueue
from frappe.utils import now_datetime
from dateutil.relativedelta import relativedelta
import frappe
import json
import json, time
class TestScheduler(TestCase):
@ -44,6 +45,20 @@ class TestScheduler(TestCase):
self.assertTrue("all" in frappe.flags.ran_schedulers)
self.assertTrue("hourly" in frappe.flags.ran_schedulers)
def test_job_timeout(self):
job = enqueue(test_timeout, timeout=10)
count = 5
while count > 0:
count -= 1
time.sleep(5)
if job.get_status()=='failed':
break
self.assertTrue(job.is_failed)
def tearDown(self):
frappe.flags.ran_schedulers = []
def test_timeout():
'''This function needs to be pickleable'''
time.sleep(100)

View file

@ -380,11 +380,19 @@ def is_markdown(text):
def get_sites(sites_path=None):
import os
if not sites_path:
sites_path = '.'
return [site for site in os.listdir(sites_path)
if os.path.isdir(os.path.join(sites_path, site))
and not site in ('assets',)]
sites_path = getattr(frappe.local, 'sites_path', None) or '.'
sites = []
for site in os.listdir(sites_path):
path = os.path.join(sites_path, site)
if (os.path.isdir(path)
and not os.path.islink(path)
and os.path.exists(os.path.join(path, 'site_config.json'))):
# is a dir and has site_config.json
sites.append(site)
return sites
def get_request_session(max_retries=3):
from requests.packages.urllib3.util import Retry

View file

@ -1,29 +1,48 @@
from __future__ import unicode_literals, print_function
from redis import Redis
from __future__ import unicode_literals
import redis
from rq import Connection, Queue, Worker
from frappe.utils import cstr
from collections import defaultdict
from urlparse import urlparse
import multiprocessing
import frappe
#For worker
import os, socket
logger = frappe.get_logger(__name__)
def enqueue(method, queue, timeout, event, **kwargs):
"""queue should be either long, high or short timeout should be set accoridngly"""
default_timeout = 300
queue_timeout = {
'long': 1500,
'default': 300,
'short': 300
}
def enqueue(method, queue='default', timeout=300, event=None,
async=True, job_name=None, **kwargs):
'''
Enqueue method to be executed using a background worker
:param method: method string or method object
:param queue: should be either long, default or short
:param timeout: should be set according to the functions
:param event: this is passed to enable clearing of jobs from queues
:param async: if async=False, the method is executed immediately, else via a worker
:param job_name: can be used to name an enqueue call, which can be used to prevent duplicate calls
:param kwargs: keyword arguments to be passed to the method
'''
q = get_queue(queue)
q.enqueue_call(execute_job, timeout=timeout,
if not timeout:
timeout = queue_timeout.get(queue) or 300
return q.enqueue_call(execute_job, timeout=timeout,
kwargs={
"site": frappe.local.site,
"method": method,
"event": event,
"job_name": job_name or cstr(method),
"kwargs":kwargs
})
def execute_job(site, method, event, kwargs):
def execute_job(site, method, event, job_name, kwargs):
'''Executes job in a worker, performs commit/rollback and logs if there is any error'''
from frappe.utils.scheduler import log
frappe.connect(site)
@ -38,78 +57,86 @@ def execute_job(site, method, event, kwargs):
except:
frappe.db.rollback()
log(method_name)
raise
else:
frappe.db.commit()
finally:
frappe.destroy()
def start_worker(queue=None):
'''Wrapper to start rq worker. Connects to redis and monitors these queues.'''
with frappe.init_site():
# empty init is required to get redis_queue from common_site_config.json
redis_connection = get_redis_conn()
with Connection(redis_connection):
queues = get_queue_list(queue)
Worker(queues, name=get_worker_name(queue)).work()
def get_worker_name(queue):
'''When limiting worker to a specific queue, also append queue name to default worker name'''
name = None
def get_queue_list(queue=None):
queue_list = ['long', 'default', 'short']
if queue:
if queue not in queue_list:
frappe.throw("Queue should be one of {0}".format(', '.join(queue_list)))
else:
return [queue]
else:
return queue_list
# hostname.pid is the default worker name
name = '{hostname}.{pid}.{queue}'.format(
hostname=socket.gethostname(),
pid=os.getpid(),
queue=queue)
return name
def get_jobs(site=None, queue=None):
def get_jobs(site=None, queue=None, key='method'):
'''Gets jobs per queue or per site or both'''
jobs_per_site = defaultdict(list)
for queue in get_queue_list(queue):
q = get_queue(queue)
for job in q.jobs:
if site is None:
jobs_per_site[job.kwargs['site']].append(job.kwargs['method'])
# get jobs for all sites
jobs_per_site[job.kwargs['site']].append(job.kwargs[key])
elif job.kwargs['site'] == site:
jobs_per_site[site].append(job.kwargs['method'])
# get jobs only for given site
jobs_per_site[site].append(job.kwargs[key])
return jobs_per_site
def get_queue_list(queue_list=None):
'''Defines possible queues. Also wraps a given queue in a list after validating.'''
default_queue_list = queue_timeout.keys()
if queue_list:
if isinstance(queue_list, basestring):
queue_list = [queue_list]
def start_worker():
with Connection(get_redis_conn()):
qs = ['short', 'default', 'long']
Worker(qs).work()
for queue in queue_list:
validate_queue(queue, default_queue_list)
return queue_list
else:
return default_queue_list
def get_queue(queue):
'''Returns a Queue object tied to a redis connection'''
validate_queue(queue)
return Queue(queue, connection=get_redis_conn())
def validate_queue(queue, default_queue_list=None):
if not default_queue_list:
default_queue_list = queue_timeout.keys()
if queue not in default_queue_list:
frappe.throw("Queue should be one of {0}".format(', '.join(default_queue_list)))
def get_redis_conn():
hostname, port = get_redis_config()
return Redis(hostname, port)
def get_redis_config():
#TEMPORARY MEGA HACK FOR TESTING
initialized = False
if not hasattr(frappe.local, 'conf'):
frappe.init('')
raise Exception('You need to call frappe.init')
conf = frappe.local.conf
if initialized:
frappe.destroy()
try:
url = urlparse(conf.redis_queue).hostname
port = urlparse(conf.redis_queue).port
return url, port
except:
elif not frappe.local.conf.redis_queue:
raise Exception('redis_queue missing in common_site_config.json')
return redis.from_url(frappe.local.conf.redis_queue)
#For Workers
def start_all_workers():
for i in range(3):
try:
p = multiprocessing.Process(target=start_worker)
p.start()
except KeyboardInterrupt:
print("Caught KeyboardInterrupt, terminating workers")

View file

@ -17,21 +17,66 @@ import time
import frappe.utils
from frappe.utils import get_sites
from datetime import datetime
from background_jobs import enqueue, get_jobs
from background_jobs import enqueue, get_jobs, queue_timeout
DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S'
def enqueue_events(queued_jobs, site):
if is_scheduler_disabled():
return
def start_scheduler():
'''Run enqueue_events_for_all_sites every 2 minutes (default).
Specify scheduler_interval in seconds in common_site_config.json'''
interval = frappe.get_conf().scheduler_interval or 120
schedule.every(interval).seconds.do(enqueue_events_for_all_sites)
while True:
schedule.run_pending()
time.sleep(1)
def enqueue_events_for_all_sites():
'''Loop through sites and enqueue events that are not already queued'''
with frappe.init_site():
jobs_per_site = get_jobs()
sites = get_sites()
for site in sites:
try:
enqueue_events_for_site(site=site, queued_jobs=jobs_per_site[site])
except:
# it should try to enqueue other sites
print frappe.get_traceback()
def enqueue_events_for_site(site, queued_jobs):
try:
frappe.init(site=site)
if frappe.local.conf.maintenance_mode:
return
frappe.connect()
if is_scheduler_disabled():
return
enqueue_events(site=site, queued_jobs=queued_jobs)
# TODO this print call is a tempfix till logging is fixed!
print 'Queued events for site {0}'.format(site)
frappe.get_logger(__name__).debug('Queued events for site {0}'.format(site))
except:
frappe.get_logger(__name__).error('Exception in Enqueue Events for Site {0}'.format(site))
raise
finally:
frappe.destroy()
def enqueue_events(site, queued_jobs):
nowtime = frappe.utils.now_datetime()
last = frappe.db.get_value('System Settings', 'System Settings', 'scheduler_last_event')
# set scheduler last event
frappe.db.begin()
frappe.db.set_value('System Settings', 'System Settings', 'scheduler_last_event', nowtime.strftime(DATETIME_FORMAT), update_modified=False)
frappe.db.set_value('System Settings', 'System Settings',
'scheduler_last_event', nowtime.strftime(DATETIME_FORMAT),
update_modified=False)
frappe.db.commit()
out = []
@ -83,13 +128,10 @@ def enqueue_applicable_events(nowtime, last, queued_jobs, site):
return out
def trigger(site, event, queued_jobs, now=False):
"""trigger method in startup.schedule_handler"""
if event.endswith("long"):
queue = 'long'
timeout = 1500
else:
queue = 'default'
timeout = 300
"""trigger method in hooks.scheduler_events"""
queue = 'long' if event.endswith('_long') else 'short'
timeout = queue_timeout[queue]
for handler in frappe.get_hooks("scheduler_events").get(event, []):
if not now:
if handler not in queued_jobs:
@ -124,11 +166,14 @@ def get_enabled_scheduler_events():
enabled_events = frappe.db.get_global("enabled_scheduler_events")
if enabled_events:
return json.loads(enabled_events)
return ["all", "hourly", "hourly_long", "daily", "daily_long", "weekly", "weekly_long", "monthly", "monthly_long"]
return ["all", "hourly", "hourly_long", "daily", "daily_long",
"weekly", "weekly_long", "monthly", "monthly_long"]
def is_scheduler_disabled():
if frappe.conf.disable_scheduler:
return True
return not frappe.utils.cint(frappe.db.get_single_value("System Settings", "enable_scheduler"))
def toggle_scheduler(enable):
@ -170,7 +215,7 @@ def get_error_report(from_date=None, to_date=None, limit=10):
def scheduler_task(site, event, handler, now=False):
traceback = ""
'''This is a wrapper function that runs a hooks.scheduler_events method'''
frappe.get_logger(__name__).info('running {handler} for {site} for event: {event}'.format(handler=handler, site=site, event=event))
try:
if not now:
@ -182,42 +227,10 @@ def scheduler_task(site, event, handler, now=False):
except Exception:
frappe.db.rollback()
traceback = log(handler, "Method: {event}, Handler: {handler}".format(event=event, handler=handler))
frappe.get_logger(__name__).warn(traceback)
frappe.get_logger(__name__).error(traceback)
raise
else:
frappe.db.commit()
frappe.get_logger(__name__).info('ran {handler} for {site} for event: {event}'.format(handler=handler, site=site, event=event))
def enqueue_scheduler_events():
frappe.init('')
jobs_per_site = get_jobs()
sites = get_sites()
frappe.destroy()
for site in sites:
enqueue_events_for_site(site=site, queued_jobs=jobs_per_site[site])
def enqueue_events_for_site(site, queued_jobs):
try:
frappe.init(site=site)
if frappe.local.conf.maintenance_mode or frappe.conf.disable_scheduler:
return
frappe.connect()
enqueue_events(queued_jobs=queued_jobs, site=site)
except:
frappe.get_logger(__name__).error('Exception in Enqueue Events for Site {0}'.format(site))
raise
finally:
frappe.destroy()
def start_scheduler():
schedule.every(5).seconds.do(enqueue_scheduler_events)
while True:
schedule.run_pending()
time.sleep(1)

View file

@ -33,3 +33,4 @@ bleach-whitelist
Pillow
beautifulsoup4
rq
schedule