diff --git a/cypress/integration/socket_updates.js b/cypress/integration/socket_updates.js index 38d13a500e..9f8c55b938 100644 --- a/cypress/integration/socket_updates.js +++ b/cypress/integration/socket_updates.js @@ -65,6 +65,13 @@ context("Realtime updates", () => { cy.get("@callback").should("be.called"); }); }); + + it("Progress bar", { scrollBehavior: false }, () => { + const title = "RealTime Progress"; + cy.call("frappe.tests.ui_test_helpers.publish_progress", { title }).then(() => { + cy.contains(title).should("be.visible"); + }); + }); }); function publish_realtime(args) { diff --git a/frappe/core/doctype/prepared_report/prepared_report.json b/frappe/core/doctype/prepared_report/prepared_report.json index fb3809e481..b64b91c4ef 100644 --- a/frappe/core/doctype/prepared_report/prepared_report.json +++ b/frappe/core/doctype/prepared_report/prepared_report.json @@ -25,7 +25,8 @@ "fieldtype": "Data", "label": "Report Name", "read_only": 1, - "reqd": 1 + "reqd": 1, + "search_index": 1 }, { "default": "Queued", @@ -35,8 +36,9 @@ "in_list_view": 1, "in_standard_filter": 1, "label": "Status", - "options": "Error\nQueued\nCompleted", - "read_only": 1 + "options": "Error\nQueued\nCompleted\nStarted", + "read_only": 1, + "search_index": 1 }, { "fieldname": "column_break_4", @@ -104,7 +106,7 @@ ], "in_create": 1, "links": [], - "modified": "2023-05-19 15:41:03.428589", + "modified": "2023-07-01 18:29:12.700239", "modified_by": "Administrator", "module": "Core", "name": "Prepared Report", diff --git a/frappe/core/doctype/prepared_report/prepared_report.py b/frappe/core/doctype/prepared_report/prepared_report.py index ed7f4711aa..30efa8eb91 100644 --- a/frappe/core/doctype/prepared_report/prepared_report.py +++ b/frappe/core/doctype/prepared_report/prepared_report.py @@ -3,6 +3,7 @@ import json +from contextlib import suppress from typing import Any from rq import get_current_job @@ -12,9 +13,13 @@ from frappe.desk.form.load import get_attachments from frappe.desk.query_report import generate_report_result from frappe.model.document import Document from frappe.monitor import add_data_to_monitor -from frappe.utils import gzip_compress, gzip_decompress +from frappe.utils import add_to_date, gzip_compress, gzip_decompress, now from frappe.utils.background_jobs import enqueue +# If prepared report runs for longer than this time it's automatically considered as failed +FAILURE_THRESHOLD = 60 * 60 +REPORT_TIMEOUT = 25 * 60 + class PreparedReport(Document): @property @@ -38,12 +43,21 @@ class PreparedReport(Document): def before_insert(self): self.status = "Queued" + def on_trash(self): + # If job is running then send stop signal. + if self.status != "Started": + return + + with suppress(Exception): + job = frappe.get_doc("RQ Job", self.job_id) + job.stop_job() + def after_insert(self): enqueue( generate_report, queue="long", prepared_report=self.name, - timeout=1500, + timeout=REPORT_TIMEOUT, enqueue_after_commit=True, ) @@ -58,7 +72,7 @@ class PreparedReport(Document): def generate_report(prepared_report): - update_job_id(prepared_report, get_current_job().id) + update_job_id(prepared_report) instance = frappe.get_doc("Prepared Report", prepared_report) report = frappe.get_doc("Report", instance.report_name) @@ -95,8 +109,18 @@ def generate_report(prepared_report): ) -def update_job_id(prepared_report, job_id): - frappe.db.set_value("Prepared Report", prepared_report, "job_id", job_id, update_modified=False) +def update_job_id(prepared_report): + job = get_current_job() + + frappe.db.set_value( + "Prepared Report", + prepared_report, + { + "job_id": job and job.id, + "status": "Started", + }, + ) + frappe.db.commit() @@ -132,7 +156,7 @@ def get_reports_in_queued_state(report_name, filters): filters={ "report_name": report_name, "filters": process_filters_for_prepared_report(filters), - "status": "Queued", + "status": ("in", ("Queued", "Started")), "owner": frappe.session.user, }, ) @@ -151,6 +175,21 @@ def get_completed_prepared_report(filters, user, report_name): ) +def expire_stalled_report(): + frappe.db.set_value( + "Prepared Report", + { + "status": "Started", + "modified": ("<", add_to_date(now(), seconds=-FAILURE_THRESHOLD, as_datetime=True)), + }, + { + "status": "Failed", + "error_message": frappe._("Report timed out."), + }, + update_modified=False, + ) + + @frappe.whitelist() def delete_prepared_reports(reports): reports = frappe.parse_json(reports) diff --git a/frappe/core/doctype/prepared_report/test_prepared_report.py b/frappe/core/doctype/prepared_report/test_prepared_report.py index a864ea73f8..dbd1294cbb 100644 --- a/frappe/core/doctype/prepared_report/test_prepared_report.py +++ b/frappe/core/doctype/prepared_report/test_prepared_report.py @@ -2,10 +2,13 @@ # License: MIT. See LICENSE import json import time +from contextlib import contextmanager import frappe from frappe.desk.query_report import generate_report_result, get_report_doc -from frappe.tests.utils import FrappeTestCase +from frappe.query_builder.utils import db_type_is +from frappe.tests.test_query_builder import run_only_if +from frappe.tests.utils import FrappeTestCase, timeout class TestPreparedReport(FrappeTestCase): @@ -16,11 +19,21 @@ class TestPreparedReport(FrappeTestCase): frappe.db.commit() - def create_prepared_report(self, commit=False): + @timeout(seconds=20) + def wait_for_status(self, report, status): + frappe.db.commit() # Flush changes first + while True: + frappe.db.rollback() # read new data + report.reload() + if report.status == status: + break + time.sleep(0.5) + + def create_prepared_report(self, report=None, commit=True): doc = frappe.get_doc( { "doctype": "Prepared Report", - "report_name": "Database Storage Usage By Tables", + "report_name": report or "Database Storage Usage By Tables", } ).insert() @@ -30,24 +43,50 @@ class TestPreparedReport(FrappeTestCase): return doc def test_queueing(self): - doc_ = self.create_prepared_report() - self.assertEqual("Queued", doc_.status) - self.assertTrue(doc_.queued_at) + doc = self.create_prepared_report() + self.assertEqual("Queued", doc.status) + self.assertTrue(doc.queued_at) - frappe.db.commit() - time.sleep(5) + self.wait_for_status(doc, "Completed") - doc_ = frappe.get_last_doc("Prepared Report") - self.assertEqual("Completed", doc_.status) - self.assertTrue(doc_.job_id) - self.assertTrue(doc_.report_end_time) + doc = frappe.get_last_doc("Prepared Report") + self.assertTrue(doc.job_id) + self.assertTrue(doc.report_end_time) def test_prepared_data(self): - doc_ = self.create_prepared_report(commit=True) - time.sleep(5) + doc = self.create_prepared_report() + self.wait_for_status(doc, "Completed") - prepared_data = json.loads(doc_.get_prepared_data().decode("utf-8")) + prepared_data = json.loads(doc.get_prepared_data().decode("utf-8")) generated_data = generate_report_result(get_report_doc("Database Storage Usage By Tables")) self.assertEqual(len(prepared_data["columns"]), len(generated_data["columns"])) self.assertEqual(len(prepared_data["result"]), len(generated_data["result"])) self.assertEqual(len(prepared_data), len(generated_data)) + + @run_only_if(db_type_is.MARIADB) + def test_start_status_and_kill_jobs(self): + with test_report(report_type="Query Report", query="select sleep(10)") as report: + doc = self.create_prepared_report(report.name) + self.wait_for_status(doc, "Started") + job_id = doc.job_id + + doc.delete() + time.sleep(1) + job = frappe.get_doc("RQ Job", job_id) + self.assertEqual(job.status, "stopped") + + +@contextmanager +def test_report(**args): + try: + report = frappe.new_doc("Report") + report.update(args) + if not report.report_name: + report.report_name = frappe.generate_hash() + if not report.ref_doctype: + report.ref_doctype = "ToDo" + report.insert() + frappe.db.commit() + yield report + finally: + report.delete() diff --git a/frappe/core/doctype/report/report.py b/frappe/core/doctype/report/report.py index dba82bada5..8bd22b173e 100644 --- a/frappe/core/doctype/report/report.py +++ b/frappe/core/doctype/report/report.py @@ -49,6 +49,10 @@ class Report(Document): def on_update(self): self.export_doc() + def before_export(self): + self.letterhead = None + self.prepared_report = 0 + def on_trash(self): if ( self.is_standard == "Yes" @@ -121,7 +125,7 @@ class Report(Document): def execute_script_report(self, filters): # save the timestamp to automatically set to prepared - threshold = 30 + threshold = 15 res = [] start_time = datetime.datetime.now() diff --git a/frappe/core/report/database_storage_usage_by_tables/database_storage_usage_by_tables.json b/frappe/core/report/database_storage_usage_by_tables/database_storage_usage_by_tables.json index 20deb78ad6..773cb7771f 100644 --- a/frappe/core/report/database_storage_usage_by_tables/database_storage_usage_by_tables.json +++ b/frappe/core/report/database_storage_usage_by_tables/database_storage_usage_by_tables.json @@ -9,7 +9,6 @@ "filters": [], "idx": 0, "is_standard": "Yes", - "letter_head": "abc", "modified": "2022-10-19 02:59:00.365307", "modified_by": "Administrator", "module": "Core", @@ -25,4 +24,4 @@ "role": "System Manager" } ] -} \ No newline at end of file +} diff --git a/frappe/hooks.py b/frappe/hooks.py index 85a28feb39..fe430918d0 100644 --- a/frappe/hooks.py +++ b/frappe/hooks.py @@ -196,9 +196,9 @@ scheduler_events = { "frappe.email.doctype.email_account.email_account.pull", ], # Hourly but offset by 30 minutes - # "30 * * * *": [ - # - # ], + "30 * * * *": [ + "frappe.core.doctype.prepared_report.prepared_report.expire_stalled_report", + ], # Daily but offset by 45 minutes "45 0 * * *": [ "frappe.core.doctype.log_settings.log_settings.run_log_clean_up", diff --git a/frappe/tests/ui_test_helpers.py b/frappe/tests/ui_test_helpers.py index fd0fdbdc5b..8920872069 100644 --- a/frappe/tests/ui_test_helpers.py +++ b/frappe/tests/ui_test_helpers.py @@ -642,3 +642,19 @@ def publish_realtime( docname=docname, task_id=task_id, ) + + +@whitelist_for_tests +def publish_progress(duration=3, title=None, doctype=None, docname=None): + # This should consider session user and only show it to current user. + frappe.enqueue(slow_task, duration=duration, title=title, doctype=doctype, docname=docname) + + +def slow_task(duration, title, doctype, docname): + import time + + steps = 10 + + for i in range(steps + 1): + frappe.publish_progress(i * 10, title=title, doctype=doctype, docname=docname) + time.sleep(int(duration) / steps)