From 54c12b594d25592f52aec36d4367d6271d0f0a98 Mon Sep 17 00:00:00 2001 From: Ankush Menat Date: Sun, 17 Aug 2025 13:26:29 +0530 Subject: [PATCH] feat: Show progress of long migration queries (#33660) Often large databases take several minutes to alter a table, this gives zero feedback right now and makes it hard to guess where the problem is if things eventually fail. Was the query slow? Was it making adequate progress? after how much time it failed? All of these questions will now be easier to answer. This PR spawns a new thread everytime we migrate database and... 1. Every 10 seconds check if any command has been running for >10 seconds from current DB connection. 2. Print details about ongoing command like state, progress, time take and query. The progress detail is often misleading but it's better than nothing. --- frappe/migrate.py | 67 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/frappe/migrate.py b/frappe/migrate.py index 9d64e8a842..e819cc174a 100644 --- a/frappe/migrate.py +++ b/frappe/migrate.py @@ -5,6 +5,8 @@ import contextlib import functools import json import os +import threading +import time from textwrap import dedent import frappe @@ -91,6 +93,7 @@ class SiteMigration: """Run operations that should be run post schema updation processes This should be executed irrespective of outcome """ + self.db_monitor.stop() frappe.translate.clear_cache() clear_website_cache() clear_notifications() @@ -236,6 +239,8 @@ class SiteMigration: frappe.init(site) frappe.connect() + self.db_monitor = DBQueryProgressMonitor() + if not self.required_services_running(): raise SystemExit(1) @@ -248,3 +253,65 @@ class SiteMigration: finally: self.tearDown() frappe.destroy() + + +class DBQueryProgressMonitor(threading.Thread): + POLL_DURATION = 10 + + def __init__(self) -> None: + super().__init__() + self.site = frappe.local.site + self.daemon = True + self._running = threading.Event() + if frappe.db.db_type == "mariadb": + self.conn_id = frappe.db.sql("select connection_id()")[0][0] + self.start() + + def run(self): + if self._running.is_set(): + return + self._running.set() + + frappe.init(self.site) + frappe.connect() + + while self._running.is_set(): + time.sleep(self.POLL_DURATION) + queries = frappe.db.sql( + "SELECT * FROM information_schema.PROCESSLIST WHERE ID = %s", + self.conn_id, + as_dict=True, + ) + + if not queries: + continue + + query = frappe._dict(queries[0]) + time_taken = query.TIME + if not time_taken or time_taken < 5: + continue + + msg = [] + command = query.COMMAND or "" + msg.append(f"Command: {command}") + msg.append(f"Time: {time_taken}s") + msg.append(f"State: {query.STATE or 'N/A'}") + if query.PROGRESS: + msg.append(f"Progress: {query.PROGRESS}%") + + if command and command == "Query": + sql_query = query.INFO or "" + sql_query = sql_query.replace("\r", "").replace("\n", " ").replace("\t", " ") + if len(sql_query) > 100: + sql_query = sql_query[:40] + " ... " + sql_query[-20:] + msg.append(f"Query: {sql_query}") + + msg = "\r" + " | ".join(msg) + if self._running.is_set(): + print(msg, end="", flush=True) + + frappe.destroy() + + def stop(self): + print("") # Clear current line + self._running.clear()