diff --git a/frappe/optimizations.py b/frappe/optimizations.py index 85f4381cea..0b89a83d0f 100644 --- a/frappe/optimizations.py +++ b/frappe/optimizations.py @@ -1,10 +1,15 @@ import faulthandler import gc +import glob import io import os import re import signal import sys +from functools import lru_cache +from pathlib import Path + +import psutil def optimize_all(): @@ -19,6 +24,7 @@ def optimize_all(): _optimize_regex_cache() _optimize_gc_parameters() _optimize_gc_for_copy_on_write() + _optimize_for_gil_contention() _register_fault_handler() os.register_at_fork(after_in_child=_register_fault_handler) @@ -60,6 +66,7 @@ def _optimize_gc_for_copy_on_write(): _gc_frozen = False +_worker_num = os.getpid() def _freeze_gc(): @@ -79,3 +86,98 @@ def _freeze_gc(): gc.freeze() # RQ workers constantly fork, there' no benefit in doing this in that case. _gc_frozen = True + + +def _optimize_for_gil_contention(): + if not os.environ.get("FRAPPE_PERF_PIN_WORKERS"): + return + + if os.environ.get("FRAPPE_PERF_PIN_WORKERS_DETERMINISTIC"): + # Ensure same pinning order every time. + # This is only useful for benchmarking, DO NOT enable this in production. + global _worker_num + _worker_num = 0 + + if not psutil.LINUX: + # No need to support Mac, this optimization is only useful on _real_ servers. + return + + os.register_at_fork(after_in_parent=_increment_worker_count, after_in_child=_pin_web_worker_to_one_core) + + +def _increment_worker_count(): + # Not all forked workers will have incrementing numbers. + # This psuedo-counter ensures a deterministic round-robbin style assignment of workers. + global _worker_num + + _worker_num += 1 + + +def _assign_core( + pid: int, + physical_cpu_count: int, + logical_cpu_count: int, + current_affinity: list[int], + thread_siblings: list[tuple[int, ...]], +) -> int | None: + if len(current_affinity) == 1: # Already set + return current_affinity[0] + + if sorted(current_affinity) == list(range(physical_cpu_count)) and ( + physical_cpu_count == logical_cpu_count + ): + # There is no SMT, we can just pick a real core in round-robbin fashion + # This assumption can be wrong if some logical cores are disabled in weird manner, though if + # you do that you probably know what you're doing. + physical_core = current_affinity[pid % physical_cpu_count] + return physical_core + + # If there is SMT then we need to be careful not to co-schedule. This can be a problem when you + # have 2 workers but both are running on same physical core so expected parallelism is ~1.3x + # instead of 2x. + + # This assignment is best understood with an example. + # E.g. If there are 4 cores and 8 HW threads then two most common thread sibling patterns are: + # A: 0-1, 2-3, 4-5, 6-7 + # B: 0-4, 1-5, 2-6, 3-7 + # The ideal rounding robbin assignment for both is 1 from each real core first and then assign + # other threads. Which would translate to: + # #: 0, 1, 2, 3, 4, 5, 6, 7 + # A: 0, 2, 4, 6, 1, 3, 5, 7 (repeated) + # B: 0, 1, 2, 3, 4, 5, 6, 7 (repeated) + + if not thread_siblings: + return + thread_bucket = thread_siblings[pid % len(thread_siblings)] + logical_core = thread_bucket[(pid % (len(thread_siblings) * len(thread_bucket))) // len(thread_siblings)] + return logical_core + + +def _pin_web_worker_to_one_core(): + """Try to assign current process to one core.""" + + core = _assign_core( + pid=_worker_num, + physical_cpu_count=psutil.cpu_count(logical=False), + logical_cpu_count=psutil.cpu_count(logical=True), + current_affinity=sorted(os.sched_getaffinity(0)), + thread_siblings=_parse_thread_siblings(), + ) + if core is not None: + psutil.Process().cpu_affinity([core]) + + +@lru_cache(maxsize=1) +def _parse_thread_siblings() -> list[tuple[int, int]] | None: + try: + threads_list = set() + + siblings_pattern = "/sys/devices/system/cpu/cpu[0-9]*/topology/core_cpus_list" + for path in glob.glob(siblings_pattern): + threads_list.add(tuple(int(cpu) for cpu in Path(path).read_text().replace("-", ",").split(","))) + return sorted(threads_list) + except Exception as e: + import frappe + + logger = frappe.logger(with_more_info=True) + logger.error("failed to parse thread siblings", e) diff --git a/frappe/tests/test_perf.py b/frappe/tests/test_perf.py index ecc511abc7..ba879f0733 100644 --- a/frappe/tests/test_perf.py +++ b/frappe/tests/test_perf.py @@ -18,6 +18,7 @@ query. This test can be written like this. """ import gc +import itertools import sys import time from unittest.mock import patch @@ -232,6 +233,50 @@ class TestPerformance(IntegrationTestCase): self.assertIs(run, patched_run, "frappe.init should run one-time patching code just once") + def test_cpu_allocation(self): + from frappe.optimizations import _assign_core + + # Already allocated + self.assertEqual(_assign_core(0, 4, 8, [0], []), 0) + + # All physical, pid same as core for 0-7 + siblings = [(i,) for i in range(8)] + cores = list(range(8)) + for pid in cores: + self.assertEqual(_assign_core(pid, len(cores), len(cores), cores, siblings), pid) + + # All physical, pid wraps for core for 8-15 + for pid in range(8, 16): + self.assertEqual(_assign_core(pid, len(cores), len(cores), cores, siblings), pid % len(cores)) + + default_affinity_16 = list(range(16)) + # "linear" siblings = (0,1) (2,3) ... + linear_siblings_16 = list(itertools.batched(range(16), 2)) + logical_cores = list(range(16)) + expected_assignments = [*(l[0] for l in linear_siblings_16), *(l[1] for l in linear_siblings_16)] + for pid, expected_core in zip(logical_cores, expected_assignments, strict=True): + core = _assign_core( + pid, len(logical_cores) // 2, len(logical_cores), default_affinity_16, linear_siblings_16 + ) + self.assertEqual(core, expected_core) + + # "Block" siblings = (0,4) (1,5) ... + block_siblings_16 = list(zip(range(8), range(8, 16), strict=True)) + for pid in logical_cores: + core = _assign_core( + pid, len(logical_cores) // 2, len(logical_cores), logical_cores, block_siblings_16 + ) + self.assertEqual(core, pid) + + # Few cores disabled + enabled_cores = [0, 2, 4, 6] + affinity = [(i,) for i in enabled_cores] + core = _assign_core(0, 4, 4, enabled_cores, affinity) + self.assertEqual(core, 0) + + core = _assign_core(1, 4, 4, enabled_cores, affinity) + self.assertEqual(core, 2) + @run_only_if(db_type_is.MARIADB) class TestOverheadCalls(FrappeAPITestCase):