Ugh, currently gunicorn is overriding our signal handler and we don't care about gunicorn's SIGUSR1 handler.
191 lines
6.1 KiB
Python
191 lines
6.1 KiB
Python
"""This file houses all Frappe specific optimizations and hooks that run on startup or during fork.
|
|
|
|
Warning: This entire file is private as indicated by `_` prefix in filename.
|
|
"""
|
|
|
|
import faulthandler
|
|
import gc
|
|
import glob
|
|
import io
|
|
import os
|
|
import re
|
|
import signal
|
|
import sys
|
|
from functools import lru_cache
|
|
from pathlib import Path
|
|
|
|
import psutil
|
|
|
|
|
|
def optimize_all():
|
|
"""Single entry point to enable all optimizations at right time automatically."""
|
|
|
|
# Note:
|
|
# - This function is ALWAYS executed as soon as `import frappe` ends.
|
|
# - Any deferred work should be deferred using os module's fork hooks.
|
|
# - Respect configurations using environement variables.
|
|
# - fork hooks can not be unregistered, so care should be taken to execute them only when they
|
|
# make sense.
|
|
optimize_regex_cache()
|
|
optimize_gc_parameters()
|
|
optimize_gc_for_copy_on_write()
|
|
optimize_for_gil_contention()
|
|
|
|
|
|
def optimize_gc_parameters():
|
|
from frappe.utils import sbool
|
|
|
|
if not bool(sbool(os.environ.get("FRAPPE_TUNE_GC", True))):
|
|
return
|
|
|
|
# generational GC gets triggered after certain allocs (g0) which is 700 by default.
|
|
# This number is quite small for frappe where a single query can potentially create 700+
|
|
# objects easily.
|
|
# Bump this number higher, this will make GC less aggressive but that improves performance of
|
|
# everything else.
|
|
g0, g1, g2 = gc.get_threshold() # defaults are 700, 10, 10.
|
|
gc.set_threshold(g0 * 10, g1 * 2, g2 * 2)
|
|
|
|
|
|
def optimize_regex_cache():
|
|
# Remove references to pattern that are pre-compiled and loaded to global scopes.
|
|
# Leave that cache for dynamically generated regex.
|
|
os.register_at_fork(before=re.purge)
|
|
|
|
|
|
def register_fault_handler():
|
|
# Some libraries monkey patch stderr, we need actual fd
|
|
if isinstance(sys.__stderr__, io.TextIOWrapper):
|
|
faulthandler.register(signal.SIGUSR1, file=sys.__stderr__)
|
|
|
|
|
|
def optimize_gc_for_copy_on_write():
|
|
from frappe.utils import sbool
|
|
|
|
if not bool(sbool(os.environ.get("FRAPPE_TUNE_GC", True))):
|
|
return
|
|
|
|
os.register_at_fork(before=freeze_gc)
|
|
|
|
|
|
_gc_frozen = False
|
|
worker_num = os.getpid()
|
|
|
|
|
|
def freeze_gc():
|
|
global _gc_frozen
|
|
if _gc_frozen:
|
|
return
|
|
# Both Gunicorn and RQ use forking to spawn workers. In an ideal world, the fork should be sharing
|
|
# most of the memory if there are no writes made to data because of Copy on Write, however,
|
|
# python's GC is not CoW friendly and writes to data even if user-code doesn't. Specifically, the
|
|
# generational GC which stores and mutates every python object: `PyGC_Head`
|
|
#
|
|
# Calling gc.freeze() moves all the objects imported so far into permanant generation and hence
|
|
# doesn't mutate `PyGC_Head`
|
|
#
|
|
# Refer to issue for more info: https://github.com/frappe/frappe/issues/18927
|
|
gc.collect()
|
|
gc.freeze()
|
|
# RQ workers constantly fork, there' no benefit in doing this in that case.
|
|
_gc_frozen = True
|
|
|
|
|
|
def optimize_for_gil_contention():
|
|
if not os.environ.get("FRAPPE_PERF_PIN_WORKERS"):
|
|
return
|
|
|
|
if "gunicorn" not in str(sys.argv[0]):
|
|
return
|
|
|
|
if os.environ.get("FRAPPE_PERF_PIN_WORKERS_DETERMINISTIC"):
|
|
# Ensure same pinning order every time.
|
|
# This is only useful for benchmarking, DO NOT enable this in production.
|
|
global worker_num
|
|
worker_num = 0
|
|
|
|
if not psutil.LINUX:
|
|
# No need to support Mac, this optimization is only useful on _real_ servers.
|
|
return
|
|
|
|
# Populate the cache to avoid recomputing this in future.
|
|
_ = parse_thread_siblings()
|
|
os.register_at_fork(after_in_parent=increment_worker_count, after_in_child=pin_web_worker_to_one_core)
|
|
|
|
|
|
def increment_worker_count():
|
|
# Not all forked workers will have incrementing numbers.
|
|
# This psuedo-counter ensures a deterministic round-robbin style assignment of workers.
|
|
global worker_num
|
|
|
|
worker_num += 1
|
|
|
|
|
|
def assign_core(
|
|
pid: int,
|
|
physical_cpu_count: int,
|
|
logical_cpu_count: int,
|
|
current_affinity: list[int],
|
|
thread_siblings: list[tuple[int, ...]],
|
|
) -> int | None:
|
|
if len(current_affinity) == 1: # Already set
|
|
return current_affinity[0]
|
|
|
|
if sorted(current_affinity) == list(range(physical_cpu_count)) and (
|
|
physical_cpu_count == logical_cpu_count
|
|
):
|
|
# There is no SMT, we can just pick a real core in round-robbin fashion
|
|
# This assumption can be wrong if some logical cores are disabled in weird manner, though if
|
|
# you do that you probably know what you're doing.
|
|
physical_core = current_affinity[pid % physical_cpu_count]
|
|
return physical_core
|
|
|
|
# If there is SMT then we need to be careful not to co-schedule. This can be a problem when you
|
|
# have 2 workers but both are running on same physical core so expected parallelism is ~1.3x
|
|
# instead of 2x.
|
|
|
|
# This assignment is best understood with an example.
|
|
# E.g. If there are 4 cores and 8 HW threads then two most common thread sibling patterns are:
|
|
# A: 0-1, 2-3, 4-5, 6-7
|
|
# B: 0-4, 1-5, 2-6, 3-7
|
|
# The ideal rounding robbin assignment for both is 1 from each real core first and then assign
|
|
# other threads. Which would translate to:
|
|
# #: 0, 1, 2, 3, 4, 5, 6, 7
|
|
# A: 0, 2, 4, 6, 1, 3, 5, 7 (repeated)
|
|
# B: 0, 1, 2, 3, 4, 5, 6, 7 (repeated)
|
|
|
|
if not thread_siblings:
|
|
return
|
|
thread_bucket = thread_siblings[pid % len(thread_siblings)]
|
|
logical_core = thread_bucket[(pid % (len(thread_siblings) * len(thread_bucket))) // len(thread_siblings)]
|
|
return logical_core
|
|
|
|
|
|
def pin_web_worker_to_one_core():
|
|
"""Try to assign current process to one core."""
|
|
|
|
core = assign_core(
|
|
pid=worker_num,
|
|
physical_cpu_count=psutil.cpu_count(logical=False),
|
|
logical_cpu_count=psutil.cpu_count(logical=True),
|
|
current_affinity=sorted(os.sched_getaffinity(0)),
|
|
thread_siblings=parse_thread_siblings(),
|
|
)
|
|
if core is not None:
|
|
psutil.Process().cpu_affinity([core])
|
|
|
|
|
|
@lru_cache(maxsize=1)
|
|
def parse_thread_siblings() -> list[tuple[int, int]] | None:
|
|
try:
|
|
threads_list = set()
|
|
|
|
siblings_pattern = "/sys/devices/system/cpu/cpu[0-9]*/topology/core_cpus_list"
|
|
for path in glob.glob(siblings_pattern):
|
|
threads_list.add(tuple(int(cpu) for cpu in Path(path).read_text().replace("-", ",").split(",")))
|
|
return sorted(threads_list)
|
|
except Exception as e:
|
|
import frappe
|
|
|
|
logger = frappe.logger(with_more_info=True)
|
|
logger.error(f"failed to parse thread siblings: {e}")
|