Merge pull request #21546 from frappe/reliable_prepared_reports

refactor: improve prepared report reporting
This commit is contained in:
Ankush Menat 2023-07-01 18:57:47 +05:30 committed by GitHub
commit 05d0fdcaeb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 137 additions and 31 deletions

View file

@ -65,6 +65,13 @@ context("Realtime updates", () => {
cy.get("@callback").should("be.called");
});
});
it("Progress bar", { scrollBehavior: false }, () => {
const title = "RealTime Progress";
cy.call("frappe.tests.ui_test_helpers.publish_progress", { title }).then(() => {
cy.contains(title).should("be.visible");
});
});
});
function publish_realtime(args) {

View file

@ -25,7 +25,8 @@
"fieldtype": "Data",
"label": "Report Name",
"read_only": 1,
"reqd": 1
"reqd": 1,
"search_index": 1
},
{
"default": "Queued",
@ -35,8 +36,9 @@
"in_list_view": 1,
"in_standard_filter": 1,
"label": "Status",
"options": "Error\nQueued\nCompleted",
"read_only": 1
"options": "Error\nQueued\nCompleted\nStarted",
"read_only": 1,
"search_index": 1
},
{
"fieldname": "column_break_4",
@ -104,7 +106,7 @@
],
"in_create": 1,
"links": [],
"modified": "2023-05-19 15:41:03.428589",
"modified": "2023-07-01 18:29:12.700239",
"modified_by": "Administrator",
"module": "Core",
"name": "Prepared Report",

View file

@ -3,6 +3,7 @@
import json
from contextlib import suppress
from typing import Any
from rq import get_current_job
@ -12,9 +13,13 @@ from frappe.desk.form.load import get_attachments
from frappe.desk.query_report import generate_report_result
from frappe.model.document import Document
from frappe.monitor import add_data_to_monitor
from frappe.utils import gzip_compress, gzip_decompress
from frappe.utils import add_to_date, gzip_compress, gzip_decompress, now
from frappe.utils.background_jobs import enqueue
# If prepared report runs for longer than this time it's automatically considered as failed
FAILURE_THRESHOLD = 60 * 60
REPORT_TIMEOUT = 25 * 60
class PreparedReport(Document):
@property
@ -38,12 +43,21 @@ class PreparedReport(Document):
def before_insert(self):
self.status = "Queued"
def on_trash(self):
# If job is running then send stop signal.
if self.status != "Started":
return
with suppress(Exception):
job = frappe.get_doc("RQ Job", self.job_id)
job.stop_job()
def after_insert(self):
enqueue(
generate_report,
queue="long",
prepared_report=self.name,
timeout=1500,
timeout=REPORT_TIMEOUT,
enqueue_after_commit=True,
)
@ -58,7 +72,7 @@ class PreparedReport(Document):
def generate_report(prepared_report):
update_job_id(prepared_report, get_current_job().id)
update_job_id(prepared_report)
instance = frappe.get_doc("Prepared Report", prepared_report)
report = frappe.get_doc("Report", instance.report_name)
@ -95,8 +109,18 @@ def generate_report(prepared_report):
)
def update_job_id(prepared_report, job_id):
frappe.db.set_value("Prepared Report", prepared_report, "job_id", job_id, update_modified=False)
def update_job_id(prepared_report):
job = get_current_job()
frappe.db.set_value(
"Prepared Report",
prepared_report,
{
"job_id": job and job.id,
"status": "Started",
},
)
frappe.db.commit()
@ -132,7 +156,7 @@ def get_reports_in_queued_state(report_name, filters):
filters={
"report_name": report_name,
"filters": process_filters_for_prepared_report(filters),
"status": "Queued",
"status": ("in", ("Queued", "Started")),
"owner": frappe.session.user,
},
)
@ -151,6 +175,21 @@ def get_completed_prepared_report(filters, user, report_name):
)
def expire_stalled_report():
frappe.db.set_value(
"Prepared Report",
{
"status": "Started",
"modified": ("<", add_to_date(now(), seconds=-FAILURE_THRESHOLD, as_datetime=True)),
},
{
"status": "Failed",
"error_message": frappe._("Report timed out."),
},
update_modified=False,
)
@frappe.whitelist()
def delete_prepared_reports(reports):
reports = frappe.parse_json(reports)

View file

@ -2,10 +2,13 @@
# License: MIT. See LICENSE
import json
import time
from contextlib import contextmanager
import frappe
from frappe.desk.query_report import generate_report_result, get_report_doc
from frappe.tests.utils import FrappeTestCase
from frappe.query_builder.utils import db_type_is
from frappe.tests.test_query_builder import run_only_if
from frappe.tests.utils import FrappeTestCase, timeout
class TestPreparedReport(FrappeTestCase):
@ -16,11 +19,21 @@ class TestPreparedReport(FrappeTestCase):
frappe.db.commit()
def create_prepared_report(self, commit=False):
@timeout(seconds=20)
def wait_for_status(self, report, status):
frappe.db.commit() # Flush changes first
while True:
frappe.db.rollback() # read new data
report.reload()
if report.status == status:
break
time.sleep(0.5)
def create_prepared_report(self, report=None, commit=True):
doc = frappe.get_doc(
{
"doctype": "Prepared Report",
"report_name": "Database Storage Usage By Tables",
"report_name": report or "Database Storage Usage By Tables",
}
).insert()
@ -30,24 +43,50 @@ class TestPreparedReport(FrappeTestCase):
return doc
def test_queueing(self):
doc_ = self.create_prepared_report()
self.assertEqual("Queued", doc_.status)
self.assertTrue(doc_.queued_at)
doc = self.create_prepared_report()
self.assertEqual("Queued", doc.status)
self.assertTrue(doc.queued_at)
frappe.db.commit()
time.sleep(5)
self.wait_for_status(doc, "Completed")
doc_ = frappe.get_last_doc("Prepared Report")
self.assertEqual("Completed", doc_.status)
self.assertTrue(doc_.job_id)
self.assertTrue(doc_.report_end_time)
doc = frappe.get_last_doc("Prepared Report")
self.assertTrue(doc.job_id)
self.assertTrue(doc.report_end_time)
def test_prepared_data(self):
doc_ = self.create_prepared_report(commit=True)
time.sleep(5)
doc = self.create_prepared_report()
self.wait_for_status(doc, "Completed")
prepared_data = json.loads(doc_.get_prepared_data().decode("utf-8"))
prepared_data = json.loads(doc.get_prepared_data().decode("utf-8"))
generated_data = generate_report_result(get_report_doc("Database Storage Usage By Tables"))
self.assertEqual(len(prepared_data["columns"]), len(generated_data["columns"]))
self.assertEqual(len(prepared_data["result"]), len(generated_data["result"]))
self.assertEqual(len(prepared_data), len(generated_data))
@run_only_if(db_type_is.MARIADB)
def test_start_status_and_kill_jobs(self):
with test_report(report_type="Query Report", query="select sleep(10)") as report:
doc = self.create_prepared_report(report.name)
self.wait_for_status(doc, "Started")
job_id = doc.job_id
doc.delete()
time.sleep(1)
job = frappe.get_doc("RQ Job", job_id)
self.assertEqual(job.status, "stopped")
@contextmanager
def test_report(**args):
try:
report = frappe.new_doc("Report")
report.update(args)
if not report.report_name:
report.report_name = frappe.generate_hash()
if not report.ref_doctype:
report.ref_doctype = "ToDo"
report.insert()
frappe.db.commit()
yield report
finally:
report.delete()

View file

@ -49,6 +49,10 @@ class Report(Document):
def on_update(self):
self.export_doc()
def before_export(self):
self.letterhead = None
self.prepared_report = 0
def on_trash(self):
if (
self.is_standard == "Yes"
@ -121,7 +125,7 @@ class Report(Document):
def execute_script_report(self, filters):
# save the timestamp to automatically set to prepared
threshold = 30
threshold = 15
res = []
start_time = datetime.datetime.now()

View file

@ -9,7 +9,6 @@
"filters": [],
"idx": 0,
"is_standard": "Yes",
"letter_head": "abc",
"modified": "2022-10-19 02:59:00.365307",
"modified_by": "Administrator",
"module": "Core",
@ -25,4 +24,4 @@
"role": "System Manager"
}
]
}
}

View file

@ -196,9 +196,9 @@ scheduler_events = {
"frappe.email.doctype.email_account.email_account.pull",
],
# Hourly but offset by 30 minutes
# "30 * * * *": [
#
# ],
"30 * * * *": [
"frappe.core.doctype.prepared_report.prepared_report.expire_stalled_report",
],
# Daily but offset by 45 minutes
"45 0 * * *": [
"frappe.core.doctype.log_settings.log_settings.run_log_clean_up",

View file

@ -642,3 +642,19 @@ def publish_realtime(
docname=docname,
task_id=task_id,
)
@whitelist_for_tests
def publish_progress(duration=3, title=None, doctype=None, docname=None):
# This should consider session user and only show it to current user.
frappe.enqueue(slow_task, duration=duration, title=title, doctype=doctype, docname=docname)
def slow_task(duration, title, doctype, docname):
import time
steps = 10
for i in range(steps + 1):
frappe.publish_progress(i * 10, title=title, doctype=doctype, docname=docname)
time.sleep(int(duration) / steps)