diff --git a/frappe/__init__.py b/frappe/__init__.py index 9d3e52800e..a585a5d241 100644 --- a/frappe/__init__.py +++ b/frappe/__init__.py @@ -641,6 +641,8 @@ def format_value(value, df, doc=None, currency=None): def get_print_format(doctype, name, print_format=None, style=None, as_pdf=False): from frappe.website.render import build_page + from frappe.utils.pdf import get_pdf + local.form_dict.doctype = doctype local.form_dict.name = name local.form_dict.format = print_format @@ -649,15 +651,25 @@ def get_print_format(doctype, name, print_format=None, style=None, as_pdf=False) html = build_page("print") if as_pdf: - print_settings = db.get_singles_dict("Print Settings") - if int(print_settings.send_print_as_pdf or 0): - from utils.pdf import get_pdf - return get_pdf(html, {"page-size": print_settings.pdf_page_size}) - else: - return html + return get_pdf(html) else: return html +def attach_print(doctype, name, file_name): + from frappe.utils import scrub_urls + + print_settings = db.get_singles_dict("Print Settings") + if int(print_settings.send_print_as_pdf or 0): + return { + "fname": file_name + ".pdf", + "fcontent": get_print_format(doctype, name, as_pdf=True) + } + else: + return { + "fname": file_name + ".html", + "fcontent": scrub_urls(get_print_format(doctype, name)).encode("utf-8") + } + logging_setup_complete = False def get_logger(module=None): from frappe.setup_logging import setup_logging diff --git a/frappe/__version__.py b/frappe/__version__.py index cc72154c1b..ea674c5c54 100644 --- a/frappe/__version__.py +++ b/frappe/__version__.py @@ -1 +1 @@ -__version__ = "4.7.1" +__version__ = "4.8.0" diff --git a/frappe/cli.py b/frappe/cli.py index b4cc5813f2..dabb160b16 100755 --- a/frappe/cli.py +++ b/frappe/cli.py @@ -7,9 +7,10 @@ from __future__ import unicode_literals import os import subprocess import frappe +import json from frappe.utils import cint -site_arg_optional = ['serve', 'build', 'watch', 'celery', 'resize_images'] +site_arg_optional = ['serve', 'build', 'watch', 'celery', 'resize_images', 'doctor', 'dump_queue_status', 'purge_all_tasks'] def get_site(parsed_args): if not parsed_args.get("site") and os.path.exists(os.path.join(parsed_args["sites_path"], "currentsite.txt")): @@ -273,6 +274,14 @@ def setup_utilities(parser): parser.add_argument("--import_doc", nargs=1, metavar="PATH", help="""Import (insert/update) doclist. If the argument is a directory, all files ending with .json are imported""") + # doctor + parser.add_argument("--doctor", default=False, action="store_true", + help="Print diagnostic information for task queues") + parser.add_argument("--purge_all_tasks", default=False, action="store_true", + help="Purge any pending periodic tasks of 'all' event. Doesn't purge hourly, daily and weekly") + parser.add_argument("--dump_queue_status", default=False, action="store_true", + help="Dump detailed diagnostic infomation for task queues in JSON format") + def setup_translation(parser): parser.add_argument("--build_message_files", default=False, action="store_true", help="Build message files for translation.") @@ -949,68 +958,20 @@ def update_site_config(site_config, verbose=False): frappe.destroy() @cmd -def bump(repo, bump_type): +def doctor(): + from frappe.utils.doctor import doctor as _doctor + return _doctor() - import json - assert repo in ['lib', 'app'] - assert bump_type in ['minor', 'major', 'patch'] - - def validate(repo_path): - import git - repo = git.Repo(repo_path) - if repo.active_branch != 'master': - raise Exception, "Current branch not master in {}".format(repo_path) - - def bump_version(version, version_type): - import semantic_version - v = semantic_version.Version(version) - if version_type == 'minor': - v.minor += 1 - elif version_type == 'major': - v.major += 1 - elif version_type == 'patch': - v.patch += 1 - return unicode(v) - - def add_tag(repo_path, version): - import git - repo = git.Repo(repo_path) - repo.index.add(['config.json']) - repo.index.commit('bumped to version {}'.format(version)) - repo.create_tag('v' + version, repo.head) - - def update_framework_requirement(version): - with open('app/config.json') as f: - config = json.load(f) - config['requires_framework_version'] = '==' + version - with open('app/config.json', 'w') as f: - json.dump(config, f, indent=1, sort_keys=True) - - validate('lib/') - validate('app/') - - if repo == 'app': - with open('app/config.json') as f: - config = json.load(f) - new_version = bump_version(config['app_version'], bump_type) - config['app_version'] = new_version - with open('app/config.json', 'w') as f: - json.dump(config, f, indent=1, sort_keys=True) - add_tag('app/', new_version) - - elif repo == 'lib': - with open('lib/config.json') as f: - config = json.load(f) - new_version = bump_version(config['framework_version'], bump_type) - config['framework_version'] = new_version - with open('lib/config.json', 'w') as f: - json.dump(config, f, indent=1, sort_keys=True) - add_tag('lib/', new_version) - - update_framework_requirement(new_version) - - bump('app', bump_type) +@cmd +def purge_all_tasks(): + from frappe.utils.doctor import purge_pending_tasks + count = purge_pending_tasks() + print "Purged {} tasks".format(count) +@cmd +def dump_queue_status(): + from frappe.utils.doctor import dump_queue_status as _dump_queue_status + print json.dumps(_dump_queue_status(), indent=1) if __name__=="__main__": out = main() diff --git a/frappe/core/doctype/communication/communication.py b/frappe/core/doctype/communication/communication.py index af98c0f076..f18c89a5b8 100644 --- a/frappe/core/doctype/communication/communication.py +++ b/frappe/core/doctype/communication/communication.py @@ -25,8 +25,9 @@ class Communication(Document): def update_parent(self): """update status of parent Lead or Contact based on who is replying""" - parent_doc = self.get_parent_doc() - parent_doc.run_method("on_communication") + if self.parenttype and self.parent: + parent_doc = self.get_parent_doc() + parent_doc.run_method("on_communication") def on_update(self): self.update_parent() diff --git a/frappe/hooks.py b/frappe/hooks.py index 190a34a1f2..f080427d1c 100644 --- a/frappe/hooks.py +++ b/frappe/hooks.py @@ -3,7 +3,7 @@ app_title = "Frappe Framework" app_publisher = "Web Notes Technologies Pvt. Ltd." app_description = "Full Stack Web Application Framework in Python" app_icon = "assets/frappe/images/frappe.svg" -app_version = "4.7.1" +app_version = "4.8.0" app_color = "#3498db" app_email = "support@frappe.io" diff --git a/frappe/utils/doctor.py b/frappe/utils/doctor.py new file mode 100644 index 0000000000..1f2e00de3c --- /dev/null +++ b/frappe/utils/doctor.py @@ -0,0 +1,122 @@ +import json, base64, os +import frappe.cli +from frappe.celery_app import get_celery +from frappe.utils.file_lock import check_lock, LockTimeoutError +from collections import Counter +from operator import itemgetter + +def get_redis_conn(): + "Returns the redis connection that celery would use" + app = get_celery() + with app.connection() as conn: + r = conn.default_channel.client + return r + +def get_queues(): + "Returns the name of queues where frappe enqueues tasks as per the configuration" + queues = ["celery"] + if frappe.conf.celery_queue_per_site: + for site in frappe.cli.get_sites(): + queues.append(site) + queues.append('longjobs@' + site) + return queues + +def get_task_body(taskstr): + return json.loads(base64.decodestring(json.loads(taskstr)['body'])) + +def purge_pending_tasks(event='all'): + """ + Purge tasks of the event event type. Passing 'all' will not purge all + events but of the all event type, ie. the ones that are enqueued every five + mintues and would any leave daily, hourly and weekly tasks + """ + r = get_redis_conn() + event_tasks = frappe.get_hooks()['scheduler_events'][event] + count = 0 + + for queue in get_queues(): + for taskstr in r.lrange(queue, 0 , -1): + taskbody = get_task_body(taskstr) + kwargs = taskbody.get('kwargs') + if kwargs and kwargs.get('handler') and kwargs.get('handler') in event_tasks: + r.lrem(queue, taskstr) + count += 1 + return count + +def get_pending_task_count(): + "Get count of pending tasks" + r = get_redis_conn() + pending = 0 + for queue in get_queues(): + pending += r.llen(queue) + return pending + +def get_timedout_locks(): + "Get list of stale locks from all sites" + old_locks=[] + for site in frappe.cli.get_sites(): + locksdir = os.path.join(site, 'locks') + for lock in os.listdir(locksdir): + lock_path = os.path.join(locksdir, lock) + try: + check_lock(lock_path) + except LockTimeoutError: + old_locks.append(lock_path) + return old_locks + +def check_if_workers_online(): + app = get_celery() + if app.control.inspect().ping(): + return True + return False + +def dump_queue_status(): + """ + Dumps pending events and tasks per queue + """ + ret = [] + r = get_redis_conn() + for queue in get_queues(): + queue_details = { + 'queue': queue, + 'len': r.llen(queue), + } + queue_details.update(get_task_count_for_queue(queue)) + ret.append(queue_details) + + ret = sorted(ret, key=itemgetter('len'), reverse=True) + ret.insert(0, { + 'total': get_pending_task_count() + }) + return ret + +def get_task_count_for_queue(queue): + """ + For a given queue, returns the count of every pending task and aggregate of + events pending + """ + r = get_redis_conn() + tasks = [get_task_body(taskstr) for taskstr in r.lrange(queue, 0, -1)] + task_names = [task['task'] for task in tasks] + task_counts = Counter(task_names) + event_counts = Counter(task['kwargs'].get('event') for task in tasks) + return { + 'task_counts': task_counts, + 'event_counts': event_counts + } + +def doctor(): + """ + Prints diagnostic information for the scheduler + """ + flag = False + workers_online = check_if_workers_online() + pending_tasks = get_pending_task_count() + locks = get_timedout_locks() + print "Workers online:", workers_online + print "Pending tasks", pending_tasks + print "Timed out locks:" + print "\n".join(locks) + if (not workers_online) or (pending_tasks > 4000) or locks: + return 1 + return True diff --git a/setup.py b/setup.py index bf3e070c0e..fcec2f4173 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,7 @@ from setuptools import setup, find_packages import os -version = "4.7.1" +version = "4.8.0" with open("requirements.txt", "r") as f: install_requires = f.readlines()