seitime-frappe/frappe/migrate.py

340 lines
9.9 KiB
Python

# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and Contributors
# License: MIT. See LICENSE
import contextlib
import functools
import json
import os
import threading
import time
from collections import defaultdict
from textwrap import dedent
import click
import frappe
import frappe.model.sync
import frappe.modules.patch_handler
import frappe.translate
from frappe.core.doctype.language.language import sync_languages
from frappe.core.doctype.navbar_settings.navbar_settings import sync_standard_items
from frappe.core.doctype.scheduled_job_type.scheduled_job_type import sync_jobs
from frappe.database.schema import add_column
from frappe.deferred_insert import save_to_db as flush_deferred_inserts
from frappe.desk.doctype.desktop_icon.desktop_icon import sync_desktop_icons
from frappe.desk.notifications import clear_notifications
from frappe.modules.patch_handler import PatchType
from frappe.modules.utils import sync_customizations
from frappe.search.website_search import build_index_for_all_routes
from frappe.utils.connections import check_connection
from frappe.utils.dashboard import sync_dashboards
from frappe.utils.data import cint, comma_and
from frappe.utils.fixtures import sync_fixtures
from frappe.website.utils import clear_website_cache
BENCH_START_MESSAGE = dedent(
"""
Cannot run bench migrate without the services running.
If you are running bench in development mode, make sure that bench is running:
$ bench start
Otherwise, check the server logs and ensure that all the required services are running.
"""
)
def atomic(method):
@functools.wraps(method)
def wrapper(*args, **kwargs):
try:
ret = method(*args, **kwargs)
frappe.db.commit()
return ret
except Exception as e:
# database itself can be gone while attempting rollback.
# We should preserve original exception in this case.
with contextlib.suppress(Exception):
frappe.db.rollback()
raise e
return wrapper
class SiteMigration:
"""Migrate all apps to the current version, will:
- run before migrate hooks
- run patches
- sync doctypes (schema)
- sync dashboards
- sync jobs
- sync fixtures
- sync customizations
- sync languages
- sync web pages (from /www)
- run after migrate hooks
"""
def __init__(
self, skip_failing: bool = False, skip_search_index: bool = False, skip_fixtures: bool = False
) -> None:
self.skip_failing = skip_failing
self.skip_search_index = skip_search_index
self.skip_fixtures = skip_fixtures
def setUp(self):
"""Complete setup required for site migration"""
frappe.flags.touched_tables = set()
self.touched_tables_file = frappe.get_site_path("touched_tables.json")
frappe.clear_cache()
if os.path.exists(self.touched_tables_file):
os.remove(self.touched_tables_file)
self.lower_lock_timeout()
with contextlib.suppress(Exception):
self.kill_idle_connections()
frappe.flags.in_migrate = True
def tearDown(self):
"""Run operations that should be run post schema updation processes
This should be executed irrespective of outcome
"""
self.db_monitor.stop()
frappe.translate.clear_cache()
clear_website_cache()
clear_notifications()
with open(self.touched_tables_file, "w") as f:
json.dump(list(frappe.flags.touched_tables), f, sort_keys=True, indent=4)
if not self.skip_search_index:
print(f"Queued rebuilding of search index for {frappe.local.site}")
frappe.enqueue(build_index_for_all_routes, queue="long")
frappe.publish_realtime("version-update")
frappe.flags.touched_tables.clear()
frappe.flags.in_migrate = False
@atomic
def pre_schema_updates(self):
"""Executes `before_migrate` hooks"""
overrides = defaultdict(list)
for app in frappe.get_installed_apps():
for fn in frappe.get_hooks("before_migrate", app_name=app):
frappe.get_attr(fn)()
for doctype in frappe.get_hooks("override_doctype_class", {}, app_name=app).keys():
overrides[doctype].append(app)
for doctype, app_names in overrides.items():
if len(app_names) > 1:
click.secho(
f"The controller for {doctype} is overridden by multiple apps: {comma_and(app_names, add_quotes=False)}.",
fg="yellow",
)
@atomic
def run_schema_updates(self):
"""Run patches as defined in patches.txt, sync schema changes as defined in the {doctype}.json files"""
frappe.modules.patch_handler.run_all(
skip_failing=self.skip_failing, patch_type=PatchType.pre_model_sync
)
frappe.model.sync.sync_all()
frappe.modules.patch_handler.run_all(
skip_failing=self.skip_failing, patch_type=PatchType.post_model_sync
)
@atomic
def post_schema_updates(self):
"""Execute pending migration tasks post patches execution & schema sync
This includes:
* Sync `Scheduled Job Type` and scheduler events defined in hooks
* Sync fixtures & custom scripts
* Sync in-Desk Module Dashboards
* Sync customizations: Custom Fields, Property Setters, Custom Permissions
* Sync Frappe's internal language master
* Flush deferred inserts made during maintenance mode.
* Sync Portal Menu Items
* Sync Installed Applications Version History
* Execute `after_migrate` hooks
"""
print("Syncing jobs...")
sync_jobs()
if not self.skip_fixtures:
print("Syncing fixtures...")
sync_fixtures()
else:
print("Skipping fixtures...")
sync_standard_items()
print("Syncing dashboards...")
sync_dashboards()
print("Syncing customizations...")
sync_customizations()
print("Syncing languages...")
sync_languages()
print("Flushing deferred inserts...")
flush_deferred_inserts()
print("Removing orphan doctypes...")
frappe.model.sync.remove_orphan_doctypes()
print("Syncing portal menu...")
frappe.get_single("Portal Settings").sync_menu()
print("Updating installed applications...")
frappe.get_single("Installed Applications").update_versions()
print("Syncing Desktop Icons...")
sync_desktop_icons()
print("Executing `after_migrate` hooks...")
for app in frappe.get_installed_apps():
for fn in frappe.get_hooks("after_migrate", app_name=app):
frappe.get_attr(fn)()
def required_services_running(self) -> bool:
"""Return True if all required services are running. Return False and print
instructions to stdout when required services are not available.
"""
service_status = check_connection(redis_services=["redis_cache"])
are_services_running = all(service_status.values())
if not are_services_running:
for service in service_status:
if not service_status.get(service, True):
print(f"Service {service} is not running.")
print(BENCH_START_MESSAGE)
return are_services_running
def lower_lock_timeout(self):
"""Lower timeout for table metadata locks, default is 1 day, reduce it to 5 minutes.
This is required to avoid indefinitely waiting for metadata lock.
"""
if frappe.db.db_type != "mariadb":
return
frappe.db.sql("set session lock_wait_timeout = %s", 5 * 60)
def kill_idle_connections(self, idle_limit=30):
"""Assuming migrate has highest priority, kill everything else.
If someone has connected to mariadb using DB console or ipython console and then acquired
certain locks we won't be able to migrate."""
if frappe.db.db_type != "mariadb":
return
processes = frappe.db.sql("show full processlist", as_dict=1)
connection_id = frappe.db.sql("select connection_id()")[0][0]
for process in processes:
sleeping = process.get("Command") == "Sleep"
user = str(process.get("User")).lower()
sleeping_since = cint(process.get("Time")) or 0
pid = process.get("Id")
if (
pid
and pid != connection_id
and process.db == frappe.conf.db_name
and sleeping
and sleeping_since > idle_limit
and user != "system user"
):
try:
frappe.db.sql(f"kill {pid}")
print(f"Killed inactive database connection with PID {pid}")
except Exception as e:
# We might not have permission to do this.
print(f"Failed to kill inactive database connection with PID {pid}: {e}")
def run(self, site: str):
"""Run Migrate operation on site specified. This method initializes
and destroys connections to the site database.
"""
from frappe.utils.synchronization import filelock
if site:
frappe.init(site)
frappe.connect()
self.db_monitor = DBQueryProgressMonitor()
if not self.required_services_running():
raise SystemExit(1)
with filelock("bench_migrate", timeout=1):
self.setUp()
try:
self.pre_schema_updates()
self.run_schema_updates()
self.post_schema_updates()
finally:
self.tearDown()
frappe.destroy()
class DBQueryProgressMonitor(threading.Thread):
POLL_DURATION = 10
def __init__(self) -> None:
super().__init__()
self.site = frappe.local.site
self.daemon = True
self._running = threading.Event()
if frappe.db.db_type == "mariadb":
self.conn_id = frappe.db.sql("select connection_id()")[0][0]
self.start()
def run(self):
if self._running.is_set():
return
self._running.set()
frappe.init(self.site)
frappe.connect()
while self._running.is_set():
time.sleep(self.POLL_DURATION)
queries = frappe.db.sql(
"SELECT * FROM information_schema.PROCESSLIST WHERE ID = %s",
self.conn_id,
as_dict=True,
)
if not queries:
continue
query = frappe._dict(queries[0])
time_taken = query.TIME
if not time_taken or time_taken < 5:
continue
msg = []
command = query.COMMAND or ""
msg.append(f"Command: {command}")
msg.append(f"Time: {time_taken}s")
msg.append(f"State: {query.STATE or 'N/A'}")
if query.PROGRESS:
msg.append(f"Progress: {query.PROGRESS}%")
if command and command == "Query":
sql_query = query.INFO or ""
sql_query = sql_query.replace("\r", "").replace("\n", " ").replace("\t", " ")
if len(sql_query) > 100:
sql_query = sql_query[:40] + " ... " + sql_query[-20:]
msg.append(f"Query: {sql_query}")
msg = "\r" + " | ".join(msg)
if self._running.is_set():
print(msg, end="", flush=True)
frappe.destroy()
def stop(self):
print("") # Clear current line
self._running.clear()