seitime-frappe/frappe/_optimizations.py
Ankush Menat fe63af5449
refactor: make optimizations.py private entirely (#28872)
Avoids having to prefix everything with `_`.
2024-12-23 09:56:56 +00:00

190 lines
6.1 KiB
Python

"""This file houses all Frappe specific optimizations and hooks that run on startup or during fork.
Warning: This entire file is private as indicated by `_` prefix in filename.
"""
import faulthandler
import gc
import glob
import io
import os
import re
import signal
import sys
from functools import lru_cache
from pathlib import Path
import psutil
def optimize_all():
"""Single entry point to enable all optimizations at right time automatically."""
# Note:
# - This function is ALWAYS executed as soon as `import frappe` ends.
# - Any deferred work should be deferred using os module's fork hooks.
# - Respect configurations using environement variables.
# - fork hooks can not be unregistered, so care should be taken to execute them only when they
# make sense.
optimize_regex_cache()
optimize_gc_parameters()
optimize_gc_for_copy_on_write()
optimize_for_gil_contention()
register_fault_handler()
os.register_at_fork(after_in_child=register_fault_handler)
def optimize_gc_parameters():
from frappe.utils import sbool
if not bool(sbool(os.environ.get("FRAPPE_TUNE_GC", True))):
return
# generational GC gets triggered after certain allocs (g0) which is 700 by default.
# This number is quite small for frappe where a single query can potentially create 700+
# objects easily.
# Bump this number higher, this will make GC less aggressive but that improves performance of
# everything else.
g0, g1, g2 = gc.get_threshold() # defaults are 700, 10, 10.
gc.set_threshold(g0 * 10, g1 * 2, g2 * 2)
def optimize_regex_cache():
# Remove references to pattern that are pre-compiled and loaded to global scopes.
# Leave that cache for dynamically generated regex.
os.register_at_fork(before=re.purge)
def register_fault_handler():
# Some libraries monkey patch stderr, we need actual fd
if isinstance(sys.__stderr__, io.TextIOWrapper):
faulthandler.register(signal.SIGUSR1, file=sys.__stderr__)
def optimize_gc_for_copy_on_write():
from frappe.utils import sbool
if not bool(sbool(os.environ.get("FRAPPE_TUNE_GC", True))):
return
os.register_at_fork(before=freeze_gc)
_gc_frozen = False
worker_num = os.getpid()
def freeze_gc():
global _gc_frozen
if _gc_frozen:
return
# Both Gunicorn and RQ use forking to spawn workers. In an ideal world, the fork should be sharing
# most of the memory if there are no writes made to data because of Copy on Write, however,
# python's GC is not CoW friendly and writes to data even if user-code doesn't. Specifically, the
# generational GC which stores and mutates every python object: `PyGC_Head`
#
# Calling gc.freeze() moves all the objects imported so far into permanant generation and hence
# doesn't mutate `PyGC_Head`
#
# Refer to issue for more info: https://github.com/frappe/frappe/issues/18927
gc.collect()
gc.freeze()
# RQ workers constantly fork, there' no benefit in doing this in that case.
_gc_frozen = True
def optimize_for_gil_contention():
if not os.environ.get("FRAPPE_PERF_PIN_WORKERS"):
return
if os.environ.get("FRAPPE_PERF_PIN_WORKERS_DETERMINISTIC"):
# Ensure same pinning order every time.
# This is only useful for benchmarking, DO NOT enable this in production.
global worker_num
worker_num = 0
if not psutil.LINUX:
# No need to support Mac, this optimization is only useful on _real_ servers.
return
# Populate the cache to avoid recomputing this in future.
_ = parse_thread_siblings()
os.register_at_fork(after_in_parent=increment_worker_count, after_in_child=pin_web_worker_to_one_core)
def increment_worker_count():
# Not all forked workers will have incrementing numbers.
# This psuedo-counter ensures a deterministic round-robbin style assignment of workers.
global worker_num
worker_num += 1
def assign_core(
pid: int,
physical_cpu_count: int,
logical_cpu_count: int,
current_affinity: list[int],
thread_siblings: list[tuple[int, ...]],
) -> int | None:
if len(current_affinity) == 1: # Already set
return current_affinity[0]
if sorted(current_affinity) == list(range(physical_cpu_count)) and (
physical_cpu_count == logical_cpu_count
):
# There is no SMT, we can just pick a real core in round-robbin fashion
# This assumption can be wrong if some logical cores are disabled in weird manner, though if
# you do that you probably know what you're doing.
physical_core = current_affinity[pid % physical_cpu_count]
return physical_core
# If there is SMT then we need to be careful not to co-schedule. This can be a problem when you
# have 2 workers but both are running on same physical core so expected parallelism is ~1.3x
# instead of 2x.
# This assignment is best understood with an example.
# E.g. If there are 4 cores and 8 HW threads then two most common thread sibling patterns are:
# A: 0-1, 2-3, 4-5, 6-7
# B: 0-4, 1-5, 2-6, 3-7
# The ideal rounding robbin assignment for both is 1 from each real core first and then assign
# other threads. Which would translate to:
# #: 0, 1, 2, 3, 4, 5, 6, 7
# A: 0, 2, 4, 6, 1, 3, 5, 7 (repeated)
# B: 0, 1, 2, 3, 4, 5, 6, 7 (repeated)
if not thread_siblings:
return
thread_bucket = thread_siblings[pid % len(thread_siblings)]
logical_core = thread_bucket[(pid % (len(thread_siblings) * len(thread_bucket))) // len(thread_siblings)]
return logical_core
def pin_web_worker_to_one_core():
"""Try to assign current process to one core."""
core = assign_core(
pid=worker_num,
physical_cpu_count=psutil.cpu_count(logical=False),
logical_cpu_count=psutil.cpu_count(logical=True),
current_affinity=sorted(os.sched_getaffinity(0)),
thread_siblings=parse_thread_siblings(),
)
if core is not None:
psutil.Process().cpu_affinity([core])
@lru_cache(maxsize=1)
def parse_thread_siblings() -> list[tuple[int, int]] | None:
try:
threads_list = set()
siblings_pattern = "/sys/devices/system/cpu/cpu[0-9]*/topology/core_cpus_list"
for path in glob.glob(siblings_pattern):
threads_list.add(tuple(int(cpu) for cpu in Path(path).read_text().replace("-", ",").split(",")))
return sorted(threads_list)
except Exception as e:
import frappe
logger = frappe.logger(with_more_info=True)
logger.error(f"failed to parse thread siblings: {e}")