feat: Show progress of long migration queries (#33660)

Often large databases take several minutes to alter a table, this gives
zero feedback right now and makes it hard to guess where the problem
is if things eventually fail. Was the query slow? Was it making adequate
progress? after how much time it failed?

All of these questions will now be easier to answer. This PR spawns a
new thread everytime we migrate database and...
1. Every 10 seconds check if any command has been running for >10
   seconds from current DB connection.
2. Print details about ongoing command like state, progress, time take
   and query.

The progress detail is often misleading but it's better than nothing.
This commit is contained in:
Ankush Menat 2025-08-17 13:26:29 +05:30 committed by GitHub
parent f7733e78f7
commit 54c12b594d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -5,6 +5,8 @@ import contextlib
import functools
import json
import os
import threading
import time
from textwrap import dedent
import frappe
@ -91,6 +93,7 @@ class SiteMigration:
"""Run operations that should be run post schema updation processes
This should be executed irrespective of outcome
"""
self.db_monitor.stop()
frappe.translate.clear_cache()
clear_website_cache()
clear_notifications()
@ -236,6 +239,8 @@ class SiteMigration:
frappe.init(site)
frappe.connect()
self.db_monitor = DBQueryProgressMonitor()
if not self.required_services_running():
raise SystemExit(1)
@ -248,3 +253,65 @@ class SiteMigration:
finally:
self.tearDown()
frappe.destroy()
class DBQueryProgressMonitor(threading.Thread):
POLL_DURATION = 10
def __init__(self) -> None:
super().__init__()
self.site = frappe.local.site
self.daemon = True
self._running = threading.Event()
if frappe.db.db_type == "mariadb":
self.conn_id = frappe.db.sql("select connection_id()")[0][0]
self.start()
def run(self):
if self._running.is_set():
return
self._running.set()
frappe.init(self.site)
frappe.connect()
while self._running.is_set():
time.sleep(self.POLL_DURATION)
queries = frappe.db.sql(
"SELECT * FROM information_schema.PROCESSLIST WHERE ID = %s",
self.conn_id,
as_dict=True,
)
if not queries:
continue
query = frappe._dict(queries[0])
time_taken = query.TIME
if not time_taken or time_taken < 5:
continue
msg = []
command = query.COMMAND or ""
msg.append(f"Command: {command}")
msg.append(f"Time: {time_taken}s")
msg.append(f"State: {query.STATE or 'N/A'}")
if query.PROGRESS:
msg.append(f"Progress: {query.PROGRESS}%")
if command and command == "Query":
sql_query = query.INFO or ""
sql_query = sql_query.replace("\r", "").replace("\n", " ").replace("\t", " ")
if len(sql_query) > 100:
sql_query = sql_query[:40] + " ... " + sql_query[-20:]
msg.append(f"Query: {sql_query}")
msg = "\r" + " | ".join(msg)
if self._running.is_set():
print(msg, end="", flush=True)
frappe.destroy()
def stop(self):
print("") # Clear current line
self._running.clear()