Merge pull request #23414 from ankush/bulk_actions_bg

perf: run bulk actions using background jobs
This commit is contained in:
Ankush Menat 2023-11-24 18:33:38 +05:30 committed by GitHub
commit e8fe3b2166
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 93 additions and 17 deletions

View file

@ -46,8 +46,29 @@ class BulkUpdate(Document):
@frappe.whitelist()
def submit_cancel_or_update_docs(doctype, docnames, action="submit", data=None):
docnames = frappe.parse_json(docnames)
if isinstance(docnames, str):
docnames = frappe.parse_json(docnames)
if len(docnames) < 20:
return _bulk_action(doctype, docnames, action, data)
elif len(docnames) <= 500:
frappe.msgprint(_("Bulk operation is enqueued in background."), alert=True)
frappe.enqueue(
_bulk_action,
doctype=doctype,
docnames=docnames,
action=action,
data=data,
queue="short",
timeout=1000,
)
else:
frappe.throw(
_("Bulk operations only support up to 500 documents."), title=_("Too Many Documents")
)
def _bulk_action(doctype, docnames, action, data):
if data:
data = frappe.parse_json(data)
@ -85,5 +106,4 @@ def submit_cancel_or_update_docs(doctype, docnames, action="submit", data=None):
def show_progress(docnames, message, i, description):
n = len(docnames)
if n >= 10:
frappe.publish_progress(float(i) * 100 / n, title=message, description=description)
frappe.publish_progress(float(i) * 100 / n, title=message, description=description)

View file

@ -0,0 +1,48 @@
# Copyright (c) 2023, Frappe Technologies and Contributors
# See LICENSE
import time
import frappe
from frappe.core.doctype.doctype.test_doctype import new_doctype
from frappe.desk.doctype.bulk_update.bulk_update import submit_cancel_or_update_docs
from frappe.tests.utils import FrappeTestCase, timeout
class TestBulkUpdate(FrappeTestCase):
@classmethod
def setUpClass(cls) -> None:
super().setUpClass()
cls.doctype = new_doctype(is_submittable=1, custom=1).insert().name
frappe.db.commit()
for _ in range(50):
frappe.new_doc(cls.doctype, some_fieldname=frappe.mock("name")).insert()
@timeout()
def wait_for_assertion(self, assertion):
"""Wait till an assertion becomes True"""
while True:
if assertion():
break
time.sleep(0.2)
def test_bulk_submit_in_background(self):
unsubmitted = frappe.get_all(self.doctype, {"docstatus": 0}, limit=5, pluck="name")
failed = submit_cancel_or_update_docs(self.doctype, unsubmitted, action="submit")
self.assertEqual(failed, [])
def check_docstatus(docs, status):
frappe.db.rollback()
matching_docs = frappe.get_all(
self.doctype, {"docstatus": status, "name": ("in", docs)}, pluck="name"
)
return set(matching_docs) == set(docs)
unsubmitted = frappe.get_all(self.doctype, {"docstatus": 0}, limit=20, pluck="name")
submit_cancel_or_update_docs(self.doctype, unsubmitted, action="submit")
self.wait_for_assertion(lambda: check_docstatus(unsubmitted, 1))
submitted = frappe.get_all(self.doctype, {"docstatus": 1}, limit=20, pluck="name")
submit_cancel_or_update_docs(self.doctype, submitted, action="cancel")
self.wait_for_assertion(lambda: check_docstatus(submitted, 2))

View file

@ -1,6 +1,7 @@
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# License: MIT. See LICENSE
import json
from collections import defaultdict
from typing import TYPE_CHECKING, Union
import frappe
@ -233,17 +234,30 @@ def get_workflow_field_value(workflow_name, field):
@frappe.whitelist()
def bulk_workflow_approval(docnames, doctype, action):
from collections import defaultdict
docnames = json.loads(docnames)
if len(docnames) < 20:
_bulk_workflow_action(docnames, doctype, action)
elif len(docnames) <= 500:
frappe.msgprint(_("Bulk {0} is enqueued in background.").format(action), alert=True)
frappe.enqueue(
_bulk_workflow_action,
docnames=docnames,
doctype=doctype,
action=action,
queue="short",
timeout=1000,
)
else:
frappe.throw(_("Bulk approval only support up to 500 documents."), title=_("Too Many Documents"))
def _bulk_workflow_action(docnames, doctype, action):
# dictionaries for logging
failed_transactions = defaultdict(list)
successful_transactions = defaultdict(list)
# WARN: message log is cleared
print("Clearing frappe.message_log...")
frappe.clear_messages()
docnames = json.loads(docnames)
for (idx, docname) in enumerate(docnames, 1):
message_dict = {}
try:
@ -308,7 +322,9 @@ def print_workflow_log(messages, title, doctype, indicator):
html = f"<div>{doc}</div>"
msg += html
frappe.msgprint(msg, title=_("Workflow Status"), indicator=indicator, is_minimizable=True)
frappe.msgprint(
msg, title=_("Workflow Status"), indicator=indicator, is_minimizable=True, realtime=True
)
@frappe.whitelist()

View file

@ -1,8 +0,0 @@
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# License: MIT. See LICENSE
from frappe.tests.utils import FrappeTestCase
class TestBot(FrappeTestCase):
pass