perf(gthread): Pin web workers to a single core (#28854)

Python's multithreaded model is _inefficient_ because of Global
Interpreter Lock (GIL). Any one thread of process can run at any given
time. Thus only valid use case for threads in Python are:

1. Hiding I/O latency by switching to a different thread.
2. Using compiled extensions that yield GIL for long enough time to do
   meaningful work in other threads.

Both of these are not as frequent as you'd imagine and gthread worker
with multiple threads often just end up contending on lock and waste
useful CPU cycles doing nothing. Pinning worker process to a core nearly
eliminates this contention wastage. This waste can be 5-10% and goes up
sharply with more threads.

E.g. FC typically has maxed out config of 24 workers which allows
"accepting" and working on 24 requests at a time. But that doesn't mean
24 requests are on CPU at any given time, that would require 24 physical
cores.

Why do this?

1. Context switching in threads is faster than switching process - fewer
   cache misses, fewer TLB misses etc.
2. The model is simple
    True parallelism = count(cores) = count(processes).
    Expected concurrency = count(processes) * count(threads).
3. This is far simpler to reason about than something like async
   executor model.
4. Ability to queue more requests than what can be handled is already
   implemented by `bind(2)` and `accept(2)` in kernel. There is no real
   benefit of accepting 1000 requests if you can only work on 20 of them
   at a time. This is because we do a lot of "work" in requests, it's
   not just issuing an external request and waiting for it.
5. We can achieve practically same concurrency as 24 workers with 4
   process x 6 threads. That's a lot of memory saved to run other useful
   things.

Caveats:
- This kind of pinning can potentially make Linux scheduler inefficient.
  I don't quite think it's going to be a big problem because there are
  plenty of other things to run which a core can steal from other core
  if it doesn't have enough work.
- Load balancing in single-server multi-bench setup. I *think* by nature
  of how `accept(2)` works, load balancing will still happen pretty much
  automatically. If certain core is overloaded, naturally other cores
  will reach `accept(2)` more frequently and take the load off of that
  core. This is something worth validating in practice by creating
  skewed affinities.
- This code is not NUMA-aware. None of our machines have NUMA nodes so,
  I am ignoring it. Don't use it if you have a NUMA setup.
- If new CPUs are hotplugged or existing ones are disabled then it can
  be inefficient (worse than current) until that worker auto-restarts (which
  happens after N requests in FC setup).

Ideal solution: We write userspace scheduler to implement
"soft-affinity" using Linux's new eBPF based sched_ext feature. That's
too much extra work but I'll consider this too at some point.

closes https://github.com/frappe/caffeine/issues/13
This commit is contained in:
Ankush Menat 2024-12-22 16:59:13 +05:30 committed by GitHub
parent 61a16f399e
commit d466578348
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 147 additions and 0 deletions

View file

@ -1,10 +1,15 @@
import faulthandler
import gc
import glob
import io
import os
import re
import signal
import sys
from functools import lru_cache
from pathlib import Path
import psutil
def optimize_all():
@ -19,6 +24,7 @@ def optimize_all():
_optimize_regex_cache()
_optimize_gc_parameters()
_optimize_gc_for_copy_on_write()
_optimize_for_gil_contention()
_register_fault_handler()
os.register_at_fork(after_in_child=_register_fault_handler)
@ -60,6 +66,7 @@ def _optimize_gc_for_copy_on_write():
_gc_frozen = False
_worker_num = os.getpid()
def _freeze_gc():
@ -79,3 +86,98 @@ def _freeze_gc():
gc.freeze()
# RQ workers constantly fork, there' no benefit in doing this in that case.
_gc_frozen = True
def _optimize_for_gil_contention():
if not os.environ.get("FRAPPE_PERF_PIN_WORKERS"):
return
if os.environ.get("FRAPPE_PERF_PIN_WORKERS_DETERMINISTIC"):
# Ensure same pinning order every time.
# This is only useful for benchmarking, DO NOT enable this in production.
global _worker_num
_worker_num = 0
if not psutil.LINUX:
# No need to support Mac, this optimization is only useful on _real_ servers.
return
os.register_at_fork(after_in_parent=_increment_worker_count, after_in_child=_pin_web_worker_to_one_core)
def _increment_worker_count():
# Not all forked workers will have incrementing numbers.
# This psuedo-counter ensures a deterministic round-robbin style assignment of workers.
global _worker_num
_worker_num += 1
def _assign_core(
pid: int,
physical_cpu_count: int,
logical_cpu_count: int,
current_affinity: list[int],
thread_siblings: list[tuple[int, ...]],
) -> int | None:
if len(current_affinity) == 1: # Already set
return current_affinity[0]
if sorted(current_affinity) == list(range(physical_cpu_count)) and (
physical_cpu_count == logical_cpu_count
):
# There is no SMT, we can just pick a real core in round-robbin fashion
# This assumption can be wrong if some logical cores are disabled in weird manner, though if
# you do that you probably know what you're doing.
physical_core = current_affinity[pid % physical_cpu_count]
return physical_core
# If there is SMT then we need to be careful not to co-schedule. This can be a problem when you
# have 2 workers but both are running on same physical core so expected parallelism is ~1.3x
# instead of 2x.
# This assignment is best understood with an example.
# E.g. If there are 4 cores and 8 HW threads then two most common thread sibling patterns are:
# A: 0-1, 2-3, 4-5, 6-7
# B: 0-4, 1-5, 2-6, 3-7
# The ideal rounding robbin assignment for both is 1 from each real core first and then assign
# other threads. Which would translate to:
# #: 0, 1, 2, 3, 4, 5, 6, 7
# A: 0, 2, 4, 6, 1, 3, 5, 7 (repeated)
# B: 0, 1, 2, 3, 4, 5, 6, 7 (repeated)
if not thread_siblings:
return
thread_bucket = thread_siblings[pid % len(thread_siblings)]
logical_core = thread_bucket[(pid % (len(thread_siblings) * len(thread_bucket))) // len(thread_siblings)]
return logical_core
def _pin_web_worker_to_one_core():
"""Try to assign current process to one core."""
core = _assign_core(
pid=_worker_num,
physical_cpu_count=psutil.cpu_count(logical=False),
logical_cpu_count=psutil.cpu_count(logical=True),
current_affinity=sorted(os.sched_getaffinity(0)),
thread_siblings=_parse_thread_siblings(),
)
if core is not None:
psutil.Process().cpu_affinity([core])
@lru_cache(maxsize=1)
def _parse_thread_siblings() -> list[tuple[int, int]] | None:
try:
threads_list = set()
siblings_pattern = "/sys/devices/system/cpu/cpu[0-9]*/topology/core_cpus_list"
for path in glob.glob(siblings_pattern):
threads_list.add(tuple(int(cpu) for cpu in Path(path).read_text().replace("-", ",").split(",")))
return sorted(threads_list)
except Exception as e:
import frappe
logger = frappe.logger(with_more_info=True)
logger.error("failed to parse thread siblings", e)

View file

@ -18,6 +18,7 @@ query. This test can be written like this.
"""
import gc
import itertools
import sys
import time
from unittest.mock import patch
@ -232,6 +233,50 @@ class TestPerformance(IntegrationTestCase):
self.assertIs(run, patched_run, "frappe.init should run one-time patching code just once")
def test_cpu_allocation(self):
from frappe.optimizations import _assign_core
# Already allocated
self.assertEqual(_assign_core(0, 4, 8, [0], []), 0)
# All physical, pid same as core for 0-7
siblings = [(i,) for i in range(8)]
cores = list(range(8))
for pid in cores:
self.assertEqual(_assign_core(pid, len(cores), len(cores), cores, siblings), pid)
# All physical, pid wraps for core for 8-15
for pid in range(8, 16):
self.assertEqual(_assign_core(pid, len(cores), len(cores), cores, siblings), pid % len(cores))
default_affinity_16 = list(range(16))
# "linear" siblings = (0,1) (2,3) ...
linear_siblings_16 = list(itertools.batched(range(16), 2))
logical_cores = list(range(16))
expected_assignments = [*(l[0] for l in linear_siblings_16), *(l[1] for l in linear_siblings_16)]
for pid, expected_core in zip(logical_cores, expected_assignments, strict=True):
core = _assign_core(
pid, len(logical_cores) // 2, len(logical_cores), default_affinity_16, linear_siblings_16
)
self.assertEqual(core, expected_core)
# "Block" siblings = (0,4) (1,5) ...
block_siblings_16 = list(zip(range(8), range(8, 16), strict=True))
for pid in logical_cores:
core = _assign_core(
pid, len(logical_cores) // 2, len(logical_cores), logical_cores, block_siblings_16
)
self.assertEqual(core, pid)
# Few cores disabled
enabled_cores = [0, 2, 4, 6]
affinity = [(i,) for i in enabled_cores]
core = _assign_core(0, 4, 4, enabled_cores, affinity)
self.assertEqual(core, 0)
core = _assign_core(1, 4, 4, enabled_cores, affinity)
self.assertEqual(core, 2)
@run_only_if(db_type_is.MARIADB)
class TestOverheadCalls(FrappeAPITestCase):