diff --git a/frappe/__init__.py b/frappe/__init__.py index c506c8775e..12835b447c 100644 --- a/frappe/__init__.py +++ b/frappe/__init__.py @@ -176,6 +176,27 @@ def get_site_config(sites_path=None, site_path=None): return _dict(config) +def get_conf(site=None): + if hasattr(local, 'conf'): + return local.conf + + else: + # if no site, get from common_site_config.json + with init_site(site): + return local.conf + +class init_site: + def __init__(self, site=None): + '''If site==None, initialize it for empty site ('') to load common_site_config.json''' + self.site = site or '' + + def __enter__(self): + init(self.site) + return local + + def __exit__(self, type, value, traceback): + destroy() + def destroy(): """Closes connection and releases werkzeug local.""" if db: @@ -191,7 +212,6 @@ def cache(): if not redis_server: from frappe.utils.redis_wrapper import RedisWrapper redis_server = RedisWrapper.from_url(conf.get('redis_cache') - or conf.get("cache_redis_server") or "redis://localhost:11311") return redis_server diff --git a/frappe/async.py b/frappe/async.py index 68ddce4e08..5e7274b89e 100644 --- a/frappe/async.py +++ b/frappe/async.py @@ -166,7 +166,6 @@ def get_redis_server(): if not redis_server: from redis import Redis redis_server = Redis.from_url(conf.get("redis_socketio") - or conf.get("async_redis_server") or "redis://localhost:12311") return redis_server diff --git a/frappe/commands.py b/frappe/commands.py index e13163f30f..d408f694e7 100755 --- a/frappe/commands.py +++ b/frappe/commands.py @@ -97,6 +97,8 @@ def _new_site(db_name, site, mariadb_root_username=None, mariadb_root_password=N _install_app(app, verbose=verbose, set_as_patched=not source_sql) frappe.utils.scheduler.toggle_scheduler(enable_scheduler) + frappe.db.commit() + scheduler_status = "disabled" if frappe.utils.scheduler.is_scheduler_disabled() else "enabled" print "*** Scheduler is", scheduler_status, "***" frappe.destroy() @@ -1012,11 +1014,16 @@ def get_version(): if hasattr(module, "__version__"): print "{0} {1}".format(m, module.__version__) +@click.command('schedule') +def start_scheduler(): + from frappe.utils.scheduler import start_scheduler + start_scheduler() -@click.command('start-worker') -def start_rq_worker(): - from frappe.utils.background_jobs import start_all_workers - start_all_workers() +@click.command('worker') +@click.option('--queue', type=str) +def start_worker(queue): + from frappe.utils.background_jobs import start_worker + start_worker(queue) commands = [ new_site, @@ -1074,5 +1081,6 @@ commands = [ set_config, get_version, new_language, - start_rq_worker, + start_worker, + start_scheduler, ] diff --git a/frappe/core/doctype/communication/email.py b/frappe/core/doctype/communication/email.py index 4e6ecf6315..1ab67c950a 100755 --- a/frappe/core/doctype/communication/email.py +++ b/frappe/core/doctype/communication/email.py @@ -13,6 +13,7 @@ import frappe.email.smtp import MySQLdb import time from frappe import _ +from frappe.utils.background_jobs import enqueue @frappe.whitelist() def make(doctype=None, name=None, content=None, subject=None, sent_or_received = "Sent", @@ -107,8 +108,8 @@ def notify(doc, print_html=None, print_format=None, attachments=None, recipients=recipients, cc=cc) else: check_bulk_limit(list(set(doc.sent_email_addresses))) - from frappe.utils.background_jobs import enqueue - enqueue(sendmail, queue="short", timeout=300, event="email", communication_name=doc.name, + enqueue(sendmail, queue="default", timeout=300, event="sendmail", + communication_name=doc.name, print_html=print_html, print_format=print_format, attachments=attachments, recipients=recipients, cc=cc, lang=frappe.local.lang, session=frappe.local.session) @@ -392,4 +393,4 @@ def sendmail(communication_name, print_html=None, print_format=None, attachments "lang": lang })) frappe.get_logger(__name__).error(traceback) - raise \ No newline at end of file + raise diff --git a/frappe/email/doctype/email_account/email_account.py b/frappe/email/doctype/email_account/email_account.py index 313a4add55..ff203828df 100755 --- a/frappe/email/doctype/email_account/email_account.py +++ b/frappe/email/doctype/email_account/email_account.py @@ -18,8 +18,9 @@ from dateutil.relativedelta import relativedelta from datetime import datetime, timedelta from frappe.desk.form import assign_to from frappe.utils.user import get_system_managers -from frappe.utils.background_jobs import enqueue +from frappe.utils.background_jobs import enqueue, get_jobs from frappe.core.doctype.communication.email import set_incoming_outgoing_accounts +from frappe.utils.scheduler import log class SentEmailInInbox(Exception): pass @@ -184,11 +185,14 @@ class EmailAccount(Document): except Exception: frappe.db.rollback() + log('email_account.receive') exceptions.append(frappe.get_traceback()) else: frappe.db.commit() attachments = [d.file_name for d in communication._attachments] + + # TODO fix bug where it sends emails to 'Adminsitrator' during testing communication.notify(attachments=attachments, fetched_from_email_account=True) if exceptions: @@ -371,15 +375,6 @@ def get_append_to(doctype=None, txt=None, searchfield=None, start=None, page_len if not txt: txt = "" return [[d] for d in frappe.get_hooks("email_append_to") if txt in d] -def pull(now=False): - """Will be called via scheduler, pull emails from all enabled Email accounts.""" - import frappe.tasks - for email_account in frappe.get_list("Email Account", filters={"enable_incoming": 1}): - if now: - pull_from_email_account(frappe.local.site, email_account.name) - else: - enqueue(pull_from_email_account, frappe.local.site, email_account.name) - def notify_unreplied(): """Sends email notifications if there are unreplied Communications and `notify_if_unreplied` is set as true.""" @@ -407,14 +402,23 @@ def notify_unreplied(): # update flag comm.db_set("unread_notification_sent", 1) +def pull(now=False): + """Will be called via scheduler, pull emails from all enabled Email accounts.""" + queued_jobs = get_jobs(site=frappe.local.site, key='job_name')[frappe.local.site] + for email_account in frappe.get_list("Email Account", filters={"enable_incoming": 1}): + if now: + pull_from_email_account(email_account.name) -def pull_from_email_account(site, email_account): - try: - frappe.init(site=site) - frappe.connect(site=site) - email_account = frappe.get_doc("Email Account", email_account) - email_account.receive() - frappe.db.commit() - finally: - frappe.destroy() + else: + # job_name is used to prevent duplicates in queue + job_name = 'pull_from_email_account|{0}'.format(email_account.name) + + if job_name not in queued_jobs: + enqueue(pull_from_email_account, 'short', event='all', job_name=job_name, + email_account=email_account.name) + +def pull_from_email_account(email_account): + '''Runs within a worker process''' + email_account = frappe.get_doc("Email Account", email_account) + email_account.receive() diff --git a/frappe/tasks.py b/frappe/tasks.py deleted file mode 100644 index 7208ffce31..0000000000 --- a/frappe/tasks.py +++ /dev/null @@ -1,85 +0,0 @@ -# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors -# MIT License. See license.txt - -from __future__ import unicode_literals -import frappe -from frappe.handler import execute_cmd -from frappe.async import set_task_status, END_LINE, get_std_streams -import frappe.utils.response -import sys - - -def is_site_in_maintenance_mode(queue, prefix): - # check if site is in maintenance mode - site = queue.replace(prefix, "") - try: - frappe.init(site=site) - if not frappe.local.conf.db_name or frappe.local.conf.maintenance_mode or frappe.conf.disable_scheduler: - # don't add site if in maintenance mode - return True - - except frappe.IncorrectSitePath: - return True - - finally: - frappe.destroy() - - return False - -def pull_from_email_account(site, email_account): - try: - frappe.init(site=site) - frappe.connect(site=site) - email_account = frappe.get_doc("Email Account", email_account) - email_account.receive() - frappe.db.commit() - finally: - frappe.destroy() - -def run_async_task(self, site=None, user=None, cmd=None, form_dict=None, hijack_std=False): - ret = {} - frappe.init(site) - frappe.connect() - - frappe.local.task_id = self.request.id - - if hijack_std: - original_stdout, original_stderr = sys.stdout, sys.stderr - sys.stdout, sys.stderr = get_std_streams(self.request.id) - frappe.local.stdout, frappe.local.stderr = sys.stdout, sys.stderr - - try: - set_task_status(self.request.id, "Running") - frappe.db.commit() - frappe.set_user(user) - # sleep(60) - frappe.local.form_dict = frappe._dict(form_dict) - execute_cmd(cmd, from_async=True) - ret = frappe.local.response - - except Exception, e: - frappe.db.rollback() - ret = frappe.local.response - http_status_code = getattr(e, "http_status_code", 500) - ret['status_code'] = http_status_code - frappe.errprint(frappe.get_traceback()) - frappe.utils.response.make_logs() - set_task_status(self.request.id, "Error", response=ret) - frappe.get_logger(__name__).error('Exception in running {}: {}'.format(cmd, ret['exc'])) - else: - set_task_status(self.request.id, "Success", response=ret) - if not frappe.flags.in_test: - frappe.db.commit() - finally: - if not frappe.flags.in_test: - frappe.destroy() - if hijack_std: - sys.stdout.write('\n' + END_LINE) - sys.stderr.write('\n' + END_LINE) - sys.stdout.close() - sys.stderr.close() - - sys.stdout, sys.stderr = original_stdout, original_stderr - - return ret - diff --git a/frappe/tests/test_scheduler.py b/frappe/tests/test_scheduler.py index 8913836544..627a4b75c2 100644 --- a/frappe/tests/test_scheduler.py +++ b/frappe/tests/test_scheduler.py @@ -2,11 +2,12 @@ from __future__ import unicode_literals from unittest import TestCase from frappe.utils.scheduler import enqueue_applicable_events +from frappe.utils.background_jobs import enqueue from frappe.utils import now_datetime from dateutil.relativedelta import relativedelta import frappe -import json +import json, time class TestScheduler(TestCase): @@ -44,6 +45,20 @@ class TestScheduler(TestCase): self.assertTrue("all" in frappe.flags.ran_schedulers) self.assertTrue("hourly" in frappe.flags.ran_schedulers) + def test_job_timeout(self): + job = enqueue(test_timeout, timeout=10) + count = 5 + while count > 0: + count -= 1 + time.sleep(5) + if job.get_status()=='failed': + break + + self.assertTrue(job.is_failed) + def tearDown(self): frappe.flags.ran_schedulers = [] +def test_timeout(): + '''This function needs to be pickleable''' + time.sleep(100) diff --git a/frappe/utils/__init__.py b/frappe/utils/__init__.py index 401ec1f025..25bc2f7b8e 100644 --- a/frappe/utils/__init__.py +++ b/frappe/utils/__init__.py @@ -380,11 +380,19 @@ def is_markdown(text): def get_sites(sites_path=None): import os if not sites_path: - sites_path = '.' - return [site for site in os.listdir(sites_path) - if os.path.isdir(os.path.join(sites_path, site)) - and not site in ('assets',)] + sites_path = getattr(frappe.local, 'sites_path', None) or '.' + sites = [] + for site in os.listdir(sites_path): + path = os.path.join(sites_path, site) + + if (os.path.isdir(path) + and not os.path.islink(path) + and os.path.exists(os.path.join(path, 'site_config.json'))): + # is a dir and has site_config.json + sites.append(site) + + return sites def get_request_session(max_retries=3): from requests.packages.urllib3.util import Retry diff --git a/frappe/utils/background_jobs.py b/frappe/utils/background_jobs.py index 24641162b8..80c78fc49d 100755 --- a/frappe/utils/background_jobs.py +++ b/frappe/utils/background_jobs.py @@ -1,29 +1,48 @@ -from __future__ import unicode_literals, print_function -from redis import Redis +from __future__ import unicode_literals +import redis from rq import Connection, Queue, Worker from frappe.utils import cstr from collections import defaultdict -from urlparse import urlparse -import multiprocessing import frappe - -#For worker - +import os, socket logger = frappe.get_logger(__name__) -def enqueue(method, queue, timeout, event, **kwargs): - """queue should be either long, high or short timeout should be set accoridngly""" +default_timeout = 300 +queue_timeout = { + 'long': 1500, + 'default': 300, + 'short': 300 +} + +def enqueue(method, queue='default', timeout=300, event=None, + async=True, job_name=None, **kwargs): + ''' + Enqueue method to be executed using a background worker + + :param method: method string or method object + :param queue: should be either long, default or short + :param timeout: should be set according to the functions + :param event: this is passed to enable clearing of jobs from queues + :param async: if async=False, the method is executed immediately, else via a worker + :param job_name: can be used to name an enqueue call, which can be used to prevent duplicate calls + :param kwargs: keyword arguments to be passed to the method + ''' q = get_queue(queue) - q.enqueue_call(execute_job, timeout=timeout, + if not timeout: + timeout = queue_timeout.get(queue) or 300 + + return q.enqueue_call(execute_job, timeout=timeout, kwargs={ "site": frappe.local.site, "method": method, "event": event, + "job_name": job_name or cstr(method), "kwargs":kwargs }) -def execute_job(site, method, event, kwargs): +def execute_job(site, method, event, job_name, kwargs): + '''Executes job in a worker, performs commit/rollback and logs if there is any error''' from frappe.utils.scheduler import log frappe.connect(site) @@ -38,78 +57,86 @@ def execute_job(site, method, event, kwargs): except: frappe.db.rollback() log(method_name) + raise else: frappe.db.commit() finally: frappe.destroy() +def start_worker(queue=None): + '''Wrapper to start rq worker. Connects to redis and monitors these queues.''' + with frappe.init_site(): + # empty init is required to get redis_queue from common_site_config.json + redis_connection = get_redis_conn() + + with Connection(redis_connection): + queues = get_queue_list(queue) + Worker(queues, name=get_worker_name(queue)).work() + +def get_worker_name(queue): + '''When limiting worker to a specific queue, also append queue name to default worker name''' + name = None -def get_queue_list(queue=None): - queue_list = ['long', 'default', 'short'] if queue: - if queue not in queue_list: - frappe.throw("Queue should be one of {0}".format(', '.join(queue_list))) - else: - return [queue] - else: - return queue_list + # hostname.pid is the default worker name + name = '{hostname}.{pid}.{queue}'.format( + hostname=socket.gethostname(), + pid=os.getpid(), + queue=queue) + return name -def get_jobs(site=None, queue=None): +def get_jobs(site=None, queue=None, key='method'): + '''Gets jobs per queue or per site or both''' jobs_per_site = defaultdict(list) for queue in get_queue_list(queue): q = get_queue(queue) for job in q.jobs: if site is None: - jobs_per_site[job.kwargs['site']].append(job.kwargs['method']) - + # get jobs for all sites + jobs_per_site[job.kwargs['site']].append(job.kwargs[key]) + elif job.kwargs['site'] == site: - jobs_per_site[site].append(job.kwargs['method']) + # get jobs only for given site + jobs_per_site[site].append(job.kwargs[key]) + return jobs_per_site +def get_queue_list(queue_list=None): + '''Defines possible queues. Also wraps a given queue in a list after validating.''' + default_queue_list = queue_timeout.keys() + if queue_list: + if isinstance(queue_list, basestring): + queue_list = [queue_list] -def start_worker(): - with Connection(get_redis_conn()): - qs = ['short', 'default', 'long'] - Worker(qs).work() + for queue in queue_list: + validate_queue(queue, default_queue_list) + return queue_list + + else: + return default_queue_list def get_queue(queue): + '''Returns a Queue object tied to a redis connection''' + validate_queue(queue) + return Queue(queue, connection=get_redis_conn()) +def validate_queue(queue, default_queue_list=None): + if not default_queue_list: + default_queue_list = queue_timeout.keys() + + if queue not in default_queue_list: + frappe.throw("Queue should be one of {0}".format(', '.join(default_queue_list))) def get_redis_conn(): - hostname, port = get_redis_config() - return Redis(hostname, port) - - -def get_redis_config(): - #TEMPORARY MEGA HACK FOR TESTING - initialized = False if not hasattr(frappe.local, 'conf'): - frappe.init('') + raise Exception('You need to call frappe.init') - conf = frappe.local.conf - - if initialized: - frappe.destroy() - - try: - url = urlparse(conf.redis_queue).hostname - port = urlparse(conf.redis_queue).port - return url, port - except: + elif not frappe.local.conf.redis_queue: raise Exception('redis_queue missing in common_site_config.json') + return redis.from_url(frappe.local.conf.redis_queue) - -#For Workers - -def start_all_workers(): - for i in range(3): - try: - p = multiprocessing.Process(target=start_worker) - p.start() - except KeyboardInterrupt: - print("Caught KeyboardInterrupt, terminating workers") diff --git a/frappe/utils/scheduler.py b/frappe/utils/scheduler.py index 4bfda876f2..299ef49806 100755 --- a/frappe/utils/scheduler.py +++ b/frappe/utils/scheduler.py @@ -17,21 +17,66 @@ import time import frappe.utils from frappe.utils import get_sites from datetime import datetime -from background_jobs import enqueue, get_jobs - +from background_jobs import enqueue, get_jobs, queue_timeout DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S' -def enqueue_events(queued_jobs, site): - if is_scheduler_disabled(): - return +def start_scheduler(): + '''Run enqueue_events_for_all_sites every 2 minutes (default). + Specify scheduler_interval in seconds in common_site_config.json''' + interval = frappe.get_conf().scheduler_interval or 120 + schedule.every(interval).seconds.do(enqueue_events_for_all_sites) + + while True: + schedule.run_pending() + time.sleep(1) + +def enqueue_events_for_all_sites(): + '''Loop through sites and enqueue events that are not already queued''' + with frappe.init_site(): + jobs_per_site = get_jobs() + sites = get_sites() + + for site in sites: + try: + enqueue_events_for_site(site=site, queued_jobs=jobs_per_site[site]) + except: + # it should try to enqueue other sites + print frappe.get_traceback() + +def enqueue_events_for_site(site, queued_jobs): + try: + frappe.init(site=site) + if frappe.local.conf.maintenance_mode: + return + + frappe.connect() + if is_scheduler_disabled(): + return + + enqueue_events(site=site, queued_jobs=queued_jobs) + + # TODO this print call is a tempfix till logging is fixed! + print 'Queued events for site {0}'.format(site) + frappe.get_logger(__name__).debug('Queued events for site {0}'.format(site)) + + except: + frappe.get_logger(__name__).error('Exception in Enqueue Events for Site {0}'.format(site)) + raise + + finally: + frappe.destroy() + +def enqueue_events(site, queued_jobs): nowtime = frappe.utils.now_datetime() last = frappe.db.get_value('System Settings', 'System Settings', 'scheduler_last_event') # set scheduler last event frappe.db.begin() - frappe.db.set_value('System Settings', 'System Settings', 'scheduler_last_event', nowtime.strftime(DATETIME_FORMAT), update_modified=False) + frappe.db.set_value('System Settings', 'System Settings', + 'scheduler_last_event', nowtime.strftime(DATETIME_FORMAT), + update_modified=False) frappe.db.commit() out = [] @@ -83,13 +128,10 @@ def enqueue_applicable_events(nowtime, last, queued_jobs, site): return out def trigger(site, event, queued_jobs, now=False): - """trigger method in startup.schedule_handler""" - if event.endswith("long"): - queue = 'long' - timeout = 1500 - else: - queue = 'default' - timeout = 300 + """trigger method in hooks.scheduler_events""" + queue = 'long' if event.endswith('_long') else 'short' + timeout = queue_timeout[queue] + for handler in frappe.get_hooks("scheduler_events").get(event, []): if not now: if handler not in queued_jobs: @@ -124,11 +166,14 @@ def get_enabled_scheduler_events(): enabled_events = frappe.db.get_global("enabled_scheduler_events") if enabled_events: return json.loads(enabled_events) - return ["all", "hourly", "hourly_long", "daily", "daily_long", "weekly", "weekly_long", "monthly", "monthly_long"] + + return ["all", "hourly", "hourly_long", "daily", "daily_long", + "weekly", "weekly_long", "monthly", "monthly_long"] def is_scheduler_disabled(): if frappe.conf.disable_scheduler: return True + return not frappe.utils.cint(frappe.db.get_single_value("System Settings", "enable_scheduler")) def toggle_scheduler(enable): @@ -170,7 +215,7 @@ def get_error_report(from_date=None, to_date=None, limit=10): def scheduler_task(site, event, handler, now=False): - traceback = "" + '''This is a wrapper function that runs a hooks.scheduler_events method''' frappe.get_logger(__name__).info('running {handler} for {site} for event: {event}'.format(handler=handler, site=site, event=event)) try: if not now: @@ -182,42 +227,10 @@ def scheduler_task(site, event, handler, now=False): except Exception: frappe.db.rollback() traceback = log(handler, "Method: {event}, Handler: {handler}".format(event=event, handler=handler)) - frappe.get_logger(__name__).warn(traceback) + frappe.get_logger(__name__).error(traceback) raise else: frappe.db.commit() frappe.get_logger(__name__).info('ran {handler} for {site} for event: {event}'.format(handler=handler, site=site, event=event)) - - -def enqueue_scheduler_events(): - frappe.init('') - - jobs_per_site = get_jobs() - sites = get_sites() - - frappe.destroy() - - for site in sites: - enqueue_events_for_site(site=site, queued_jobs=jobs_per_site[site]) - -def enqueue_events_for_site(site, queued_jobs): - try: - frappe.init(site=site) - if frappe.local.conf.maintenance_mode or frappe.conf.disable_scheduler: - return - frappe.connect() - enqueue_events(queued_jobs=queued_jobs, site=site) - except: - frappe.get_logger(__name__).error('Exception in Enqueue Events for Site {0}'.format(site)) - raise - finally: - frappe.destroy() - -def start_scheduler(): - schedule.every(5).seconds.do(enqueue_scheduler_events) - while True: - schedule.run_pending() - time.sleep(1) - diff --git a/requirements.txt b/requirements.txt index a85289ac8e..3933b10f0d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -33,3 +33,4 @@ bleach-whitelist Pillow beautifulsoup4 rq +schedule