diff --git a/frappe/migrate.py b/frappe/migrate.py index 9d64e8a842..e819cc174a 100644 --- a/frappe/migrate.py +++ b/frappe/migrate.py @@ -5,6 +5,8 @@ import contextlib import functools import json import os +import threading +import time from textwrap import dedent import frappe @@ -91,6 +93,7 @@ class SiteMigration: """Run operations that should be run post schema updation processes This should be executed irrespective of outcome """ + self.db_monitor.stop() frappe.translate.clear_cache() clear_website_cache() clear_notifications() @@ -236,6 +239,8 @@ class SiteMigration: frappe.init(site) frappe.connect() + self.db_monitor = DBQueryProgressMonitor() + if not self.required_services_running(): raise SystemExit(1) @@ -248,3 +253,65 @@ class SiteMigration: finally: self.tearDown() frappe.destroy() + + +class DBQueryProgressMonitor(threading.Thread): + POLL_DURATION = 10 + + def __init__(self) -> None: + super().__init__() + self.site = frappe.local.site + self.daemon = True + self._running = threading.Event() + if frappe.db.db_type == "mariadb": + self.conn_id = frappe.db.sql("select connection_id()")[0][0] + self.start() + + def run(self): + if self._running.is_set(): + return + self._running.set() + + frappe.init(self.site) + frappe.connect() + + while self._running.is_set(): + time.sleep(self.POLL_DURATION) + queries = frappe.db.sql( + "SELECT * FROM information_schema.PROCESSLIST WHERE ID = %s", + self.conn_id, + as_dict=True, + ) + + if not queries: + continue + + query = frappe._dict(queries[0]) + time_taken = query.TIME + if not time_taken or time_taken < 5: + continue + + msg = [] + command = query.COMMAND or "" + msg.append(f"Command: {command}") + msg.append(f"Time: {time_taken}s") + msg.append(f"State: {query.STATE or 'N/A'}") + if query.PROGRESS: + msg.append(f"Progress: {query.PROGRESS}%") + + if command and command == "Query": + sql_query = query.INFO or "" + sql_query = sql_query.replace("\r", "").replace("\n", " ").replace("\t", " ") + if len(sql_query) > 100: + sql_query = sql_query[:40] + " ... " + sql_query[-20:] + msg.append(f"Query: {sql_query}") + + msg = "\r" + " | ".join(msg) + if self._running.is_set(): + print(msg, end="", flush=True) + + frappe.destroy() + + def stop(self): + print("") # Clear current line + self._running.clear()