diff --git a/frappe/commands/scheduler.py b/frappe/commands/scheduler.py index 6af3a2403e..5d453e3568 100755 --- a/frappe/commands/scheduler.py +++ b/frappe/commands/scheduler.py @@ -240,14 +240,14 @@ def start_worker_pool(queue, quiet=False, num_workers=2, burst=False): @click.option("--site", help="site name") @pass_context def ready_for_migration(context, site=None): - from frappe.utils.doctor import get_pending_jobs + from frappe.utils.doctor import any_job_pending if not site: site = get_site(context) try: frappe.init(site=site) - pending_jobs = get_pending_jobs(site=site) + pending_jobs = any_job_pending(site=site) if pending_jobs: print(f"NOT READY for migration: site {site} has pending background jobs") diff --git a/frappe/core/doctype/docshare/test_docshare.py b/frappe/core/doctype/docshare/test_docshare.py index e080b0d4ff..125e829d9b 100644 --- a/frappe/core/doctype/docshare/test_docshare.py +++ b/frappe/core/doctype/docshare/test_docshare.py @@ -33,13 +33,28 @@ class TestDocShare(FrappeTestCase): def test_doc_permission(self): frappe.set_user(self.user) + self.assertFalse(self.event.has_permission()) frappe.set_user("Administrator") frappe.share.add("Event", self.event.name, self.user) frappe.set_user(self.user) - self.assertTrue(self.event.has_permission()) + # PERF: All share permission check should happen with maximum 1 query. + with self.assertRowsRead(1): + self.assertTrue(self.event.has_permission()) + + second_event = frappe.get_doc( + { + "doctype": "Event", + "subject": "test share event 2", + "starts_on": "2015-01-01 10:00:00", + "event_type": "Private", + } + ).insert() + frappe.share.add("Event", second_event.name, self.user) + with self.assertRowsRead(1): + self.assertTrue(self.event.has_permission()) def test_share_permission(self): frappe.share.add("Event", self.event.name, self.user, write=1, share=1) diff --git a/frappe/core/doctype/rq_job/rq_job.py b/frappe/core/doctype/rq_job/rq_job.py index 391ccd8dfd..659c81e5c4 100644 --- a/frappe/core/doctype/rq_job/rq_job.py +++ b/frappe/core/doctype/rq_job/rq_job.py @@ -63,23 +63,15 @@ class RQJob(Document): order_desc = "desc" in args.get("order_by", "") - matched_job_ids = RQJob.get_matching_job_ids(args) + matched_job_ids = RQJob.get_matching_job_ids(args)[start : start + page_length] - jobs = [] - for job_ids in create_batch(matched_job_ids, 100): - jobs.extend( - serialize_job(job) - for job in Job.fetch_many(job_ids=job_ids, connection=get_redis_conn()) - if job and for_current_site(job) - ) - if len(jobs) > start + page_length: - # we have fetched enough. This is inefficient but because of site filtering TINA - break + conn = get_redis_conn() + jobs = [serialize_job(job) for job in Job.fetch_many(job_ids=matched_job_ids, connection=conn)] - return sorted(jobs, key=lambda j: j.modified, reverse=order_desc)[start : start + page_length] + return sorted(jobs, key=lambda j: j.modified, reverse=order_desc) @staticmethod - def get_matching_job_ids(args): + def get_matching_job_ids(args) -> list[str]: filters = make_filter_dict(args.get("filters")) queues = _eval_filters(filters.get("queue"), QUEUES) @@ -92,7 +84,7 @@ class RQJob(Document): for status in statuses: matched_job_ids.extend(fetch_job_ids(queue, status)) - return matched_job_ids + return filter_current_site_jobs(matched_job_ids) @check_permissions def delete(self): @@ -107,8 +99,7 @@ class RQJob(Document): @staticmethod def get_count(args) -> int: - # Can not be implemented efficiently due to site filtering hence ignored. - return 0 + return len(RQJob.get_matching_job_ids(args)) # None of these methods apply to virtual job doctype, overriden for sanity. @staticmethod @@ -155,6 +146,12 @@ def for_current_site(job: Job) -> bool: return job.kwargs.get("site") == frappe.local.site +def filter_current_site_jobs(job_ids: list[str]) -> list[str]: + site = frappe.local.site + + return [j for j in job_ids if j.startswith(site)] + + def _eval_filters(filter, values: list[str]) -> list[str]: if filter: operator, operand = filter @@ -186,10 +183,13 @@ def remove_failed_jobs(): frappe.only_for("System Manager") for queue in get_queues(): fail_registry = queue.failed_job_registry - for job_ids in create_batch(fail_registry.get_job_ids(), 100): - for job in Job.fetch_many(job_ids=job_ids, connection=get_redis_conn()): - if job and for_current_site(job): - fail_registry.remove(job, delete_job=True) + failed_jobs = filter_current_site_jobs(fail_registry.get_job_ids()) + + # Delete in batches to avoid loading too many things in memory + conn = get_redis_conn() + for job_ids in create_batch(failed_jobs, 100): + for job in Job.fetch_many(job_ids=job_ids, connection=conn): + job and fail_registry.remove(job, delete_job=True) def get_all_queued_jobs(): diff --git a/frappe/core/doctype/rq_job/rq_job_list.js b/frappe/core/doctype/rq_job/rq_job_list.js index aa05b411ba..7d140d668f 100644 --- a/frappe/core/doctype/rq_job/rq_job_list.js +++ b/frappe/core/doctype/rq_job/rq_job_list.js @@ -15,7 +15,6 @@ frappe.listview_settings["RQ Job"] = { ); if (listview.list_view_settings) { - listview.list_view_settings.disable_count = 1; listview.list_view_settings.disable_sidebar_stats = 1; } @@ -57,6 +56,6 @@ frappe.listview_settings["RQ Job"] = { } listview.refresh(); - }, 5000); + }, 15000); }, }; diff --git a/frappe/core/doctype/rq_job/test_rq_job.py b/frappe/core/doctype/rq_job/test_rq_job.py index 6512902fb3..f5d5f89ed4 100644 --- a/frappe/core/doctype/rq_job/test_rq_job.py +++ b/frappe/core/doctype/rq_job/test_rq_job.py @@ -108,13 +108,27 @@ class TestRQJob(FrappeTestCase): self.assertIn("quitting", cstr(stderr)) @timeout(20) - def test_job_id_dedup(self): + def test_job_id_manual_dedup(self): job_id = "test_dedup" job = frappe.enqueue(self.BG_JOB, sleep=5, job_id=job_id) self.assertTrue(is_job_enqueued(job_id)) self.check_status(job, "finished") self.assertFalse(is_job_enqueued(job_id)) + @timeout(20) + def test_auto_job_dedup(self): + job_id = "test_dedup" + job1 = frappe.enqueue(self.BG_JOB, sleep=2, job_id=job_id, deduplicate=True) + job2 = frappe.enqueue(self.BG_JOB, sleep=5, job_id=job_id, deduplicate=True) + self.assertIsNone(job2) + self.check_status(job1, "finished") # wait + + # Failed jobs last longer, subsequent job should still pass with same ID. + job3 = frappe.enqueue(self.BG_JOB, fail=True, job_id=job_id, deduplicate=True) + self.check_status(job3, "failed") + job4 = frappe.enqueue(self.BG_JOB, sleep=1, job_id=job_id, deduplicate=True) + self.check_status(job4, "finished") + @timeout(20) def test_enqueue_after_commit(self): job_id = frappe.generate_hash() diff --git a/frappe/email/queue.py b/frappe/email/queue.py index 0df88ebd5c..500a126f72 100755 --- a/frappe/email/queue.py +++ b/frappe/email/queue.py @@ -132,7 +132,6 @@ def return_unsubscribed_page(email, doctype, name): def flush(from_test=False): """flush email queue, every time: called from scheduler""" from frappe.email.doctype.email_queue.email_queue import send_mail - from frappe.utils.background_jobs import get_jobs # To avoid running jobs inside unit tests if frappe.are_emails_muted(): @@ -142,24 +141,16 @@ def flush(from_test=False): if cint(frappe.db.get_default("suspend_email_queue")) == 1: return - try: - queued_jobs = set(get_jobs(site=frappe.local.site, key="job_name")[frappe.local.site]) - except Exception: - queued_jobs = set() - for row in get_queue(): try: - job_name = f"email_queue_sendmail_{row.name}" - if job_name not in queued_jobs: - frappe.enqueue( - method=send_mail, - email_queue_name=row.name, - now=from_test, - job_name=job_name, - queue="short", - ) - else: - frappe.logger().debug(f"Not queueing job {job_name} because it is in queue already") + frappe.enqueue( + method=send_mail, + email_queue_name=row.name, + now=from_test, + job_id=f"email_queue_sendmail_{row.name}", + queue="short", + dedupicate=True, + ) except Exception: frappe.get_doc("Email Queue", row.name).log_error() diff --git a/frappe/integrations/doctype/webhook/webhook.py b/frappe/integrations/doctype/webhook/webhook.py index 6fa24bfb67..5b8c653198 100644 --- a/frappe/integrations/doctype/webhook/webhook.py +++ b/frappe/integrations/doctype/webhook/webhook.py @@ -26,6 +26,7 @@ class Webhook(Document): self.validate_request_url() self.validate_request_body() self.validate_repeating_fields() + self.validate_secret() self.preview_document = None def on_update(self): @@ -74,6 +75,13 @@ class Webhook(Document): if len(webhook_data) != len(set(webhook_data)): frappe.throw(_("Same Field is entered more than once")) + def validate_secret(self): + if self.enable_security: + try: + self.get_password("webhook_secret", False).encode("utf8") + except Exception: + frappe.throw(_("Invalid Webhook Secret")) + @frappe.whitelist() def generate_preview(self): # This function doesn't need to do anything specific as virtual fields @@ -112,16 +120,21 @@ def get_context(doc): def enqueue_webhook(doc, webhook) -> None: - webhook: Webhook = frappe.get_doc("Webhook", webhook.get("name")) - headers = get_webhook_headers(doc, webhook) - data = get_webhook_data(doc, webhook) + try: + webhook: Webhook = frappe.get_doc("Webhook", webhook.get("name")) + headers = get_webhook_headers(doc, webhook) + data = get_webhook_data(doc, webhook) - if webhook.is_dynamic_url: - request_url = frappe.render_template(webhook.request_url, get_context(doc)) - else: - request_url = webhook.request_url + if webhook.is_dynamic_url: + request_url = frappe.render_template(webhook.request_url, get_context(doc)) + else: + request_url = webhook.request_url + + except Exception as e: + frappe.logger().debug({"enqueue_webhook_error": e}) + log_request(webhook.name, doc.name, request_url, headers, data) + return - r = None for i in range(3): try: r = requests.request( diff --git a/frappe/permissions.py b/frappe/permissions.py index 633d0e278d..e8ca0ecb3c 100644 --- a/frappe/permissions.py +++ b/frappe/permissions.py @@ -118,17 +118,24 @@ def has_permission( def false_if_not_shared(): if ptype in ("read", "write", "share", "submit", "email", "print"): - shared = frappe.share.get_shared( - doctype, user, ["read" if ptype in ("email", "print") else ptype] - ) + + rights = ["read" if ptype in ("email", "print") else ptype] if doc: doc_name = get_doc_name(doc) - if doc_name in shared: + shared = frappe.share.get_shared( + doctype, + user, + rights=rights, + filters=[["share_name", "=", doc_name]], + limit=1, + ) + + if shared: if ptype in ("read", "write", "share", "submit") or meta.permissions[0].get(ptype): return True - elif shared: + elif frappe.share.get_shared(doctype, user, rights=rights, limit=1): # if atleast one shared doc of that type, then return True # this is used in db_query to check if permission on DocType return True diff --git a/frappe/public/js/frappe/form/controls/autocomplete.js b/frappe/public/js/frappe/form/controls/autocomplete.js index 27bf75e807..7b91a2031c 100644 --- a/frappe/public/js/frappe/form/controls/autocomplete.js +++ b/frappe/public/js/frappe/form/controls/autocomplete.js @@ -174,6 +174,15 @@ frappe.ui.form.ControlAutocomplete = class ControlAutoComplete extends frappe.ui if (typeof options[0] === "string") { options = options.map((o) => ({ label: o, value: o })); } + + options = options.map((o) => { + if (typeof o !== "string") { + o.label = o.label.toString(); + o.value = o.value.toString(); + } + return o; + }); + return options; } diff --git a/frappe/public/js/frappe/list/list_view.js b/frappe/public/js/frappe/list/list_view.js index c0eef27b9d..1ca72a4e45 100644 --- a/frappe/public/js/frappe/list/list_view.js +++ b/frappe/public/js/frappe/list/list_view.js @@ -711,7 +711,7 @@ frappe.views.ListView = class ListView extends frappe.views.BaseList { } get_column_html(col, doc) { - if (col.type === "Status") { + if (col.type === "Status" || col.df?.options == "Workflow State") { return `