refactor: bench class inauguration (#28158)
* feat: Add bench layout classes and configuration handler
Bench layout: (`frappe.bench`)
- Layout by env variable, e.g. FRAPPE_BENCH_PATH, FRAPPE_SITES_PATH, etc
- Detecting modules and apps by the presence of a sentinel .frappe file
- Site is scoped by frappe.local.site_name (thread safe)
Config handler: (`frappe.bench.sites{,.site}.config`)
- Optional config registry for better discovery; warning if not specced
- Env variable overload with `FRAPPE_` prefix
* chore: type frappe.config
* chore: type frappe.bencher
* chore: py310 compat
This commit is contained in:
parent
bebc75e635
commit
3d71f594d8
7 changed files with 657 additions and 2 deletions
|
|
@ -51,6 +51,8 @@ from frappe.query_builder.utils import (
|
|||
from frappe.utils.caching import request_cache
|
||||
from frappe.utils.data import cint, cstr, sbool
|
||||
|
||||
from .bencher import Bench
|
||||
|
||||
# Local application imports
|
||||
from .exceptions import *
|
||||
from .types import Filters, FilterSignature, FilterTuple, _dict
|
||||
|
|
@ -81,6 +83,7 @@ if TYPE_CHECKING: # pragma: no cover
|
|||
|
||||
controllers: dict[str, "Document"] = {}
|
||||
local = Local()
|
||||
bench = Bench()
|
||||
cache: Optional["RedisWrapper"] = None
|
||||
STANDARD_USERS = ("Guest", "Administrator")
|
||||
|
||||
|
|
@ -245,6 +248,7 @@ def init(site: str, sites_path: str = ".", new_site: bool = False, force: bool =
|
|||
local.test_objects = defaultdict(list)
|
||||
|
||||
local.site = site
|
||||
local.site_name = site # implicitly scopes bench
|
||||
local.sites_path = sites_path
|
||||
local.site_path = os.path.join(sites_path, site)
|
||||
local.all_apps = None
|
||||
|
|
|
|||
418
frappe/bencher.py
Normal file
418
frappe/bencher.py
Normal file
|
|
@ -0,0 +1,418 @@
|
|||
import json
|
||||
import os
|
||||
from collections.abc import Iterator
|
||||
from pathlib import Path
|
||||
from typing import Any, Final, cast
|
||||
|
||||
from typing_extensions import override
|
||||
|
||||
from frappe.config import ConfigHandler, ConfigType
|
||||
|
||||
# used to implement legacy code paths to keep the main path clean
|
||||
_current_bench: "Bench"
|
||||
|
||||
|
||||
class BenchNotScopedError(NotImplementedError):
|
||||
pass
|
||||
|
||||
|
||||
class BenchSiteNotLoadedError(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
class PathLike:
|
||||
path: Path
|
||||
|
||||
def __fspath__(self) -> str:
|
||||
return str(self.path)
|
||||
|
||||
|
||||
class Serializable:
|
||||
def __json__(self) -> dict[str, Any]: # type: ignore[no-any-explicit]
|
||||
return {k: v for k, v in self.__dict__.items()} # type: ignore[no-any-expr]
|
||||
|
||||
|
||||
class BenchPathResolver(PathLike, Serializable):
|
||||
def __init__(self, path: str | Path | None = None, parent_path: Path | None = None) -> None:
|
||||
path = (
|
||||
path
|
||||
or os.environ.get(f"FRAPPE_{self.__class__.__name__.upper()}_PATH")
|
||||
or (parent_path.joinpath(self.__class__.__name__.lower()) if parent_path else None)
|
||||
)
|
||||
|
||||
if not path and isinstance(self, Bench):
|
||||
path = (
|
||||
# TODO: legacy, remove
|
||||
os.environ.get("FRAPPE_BENCH_ROOT")
|
||||
or
|
||||
# TODO: unsave relative reference, remove:
|
||||
Path(__file__).resolve().parents[3] # bench folder in standard layout
|
||||
# TODO: when above line removed, enable:
|
||||
# or (Path("~/frappe-bench").expanduser())
|
||||
)
|
||||
|
||||
if path is None:
|
||||
raise ValueError(f"Unable to determine path for {self.__class__.__name__}")
|
||||
|
||||
self.path = Path(path)
|
||||
|
||||
|
||||
class FrappeComponent:
|
||||
python_path: Path
|
||||
name: str
|
||||
app: "Apps.App"
|
||||
|
||||
def __bool__(self) -> bool:
|
||||
return (
|
||||
self._is_frappe_component()
|
||||
or (isinstance(self, Apps.App) and self._is_app_installed())
|
||||
or (isinstance(self, Apps.App.Module) and self._is_module_registered())
|
||||
)
|
||||
|
||||
def _is_frappe_component(self) -> bool:
|
||||
return self.python_path.is_dir() and self.python_path.joinpath(".frappe").exists()
|
||||
|
||||
def _is_app_installed(self) -> bool:
|
||||
global _current_bench
|
||||
res = self.name in _current_bench.sites.path.joinpath("apps.txt").read_text()
|
||||
if res:
|
||||
from frappe.deprecation_dumpster import deprecation_warning
|
||||
|
||||
deprecation_warning(
|
||||
"2024-10-18",
|
||||
"yet unknown",
|
||||
f"Instead of adding {self.name} to sites/apps.txt, drop an empty file at {self.python_path}/.frappe",
|
||||
)
|
||||
return res
|
||||
return False
|
||||
|
||||
def _is_module_registered(self) -> bool:
|
||||
res = self.name.title() in self.app.python_path.joinpath("modules.txt").read_text()
|
||||
if res:
|
||||
from frappe.deprecation_dumpster import deprecation_warning
|
||||
|
||||
deprecation_warning(
|
||||
"2024-10-18",
|
||||
"yet unknown",
|
||||
f"Instead of adding {self.name.title()} to {self.app.python_path}/modules.txt, drop an empty file at {self.python_path}/.frappe",
|
||||
)
|
||||
return res
|
||||
return False
|
||||
|
||||
|
||||
class Apps(BenchPathResolver):
|
||||
__apps: dict[str, "Apps.App"]
|
||||
|
||||
class App(FrappeComponent, PathLike, Serializable):
|
||||
__modules: dict[str, "Apps.App.Module"]
|
||||
|
||||
class Module(FrappeComponent, PathLike, Serializable):
|
||||
def __init__(self, *, name: str, path: Path, app: "Apps.App") -> None:
|
||||
self.app = app
|
||||
self.name = name
|
||||
self.path = path
|
||||
self.python_path = self.path
|
||||
|
||||
@override
|
||||
def __repr__(self) -> str:
|
||||
return (
|
||||
super().__repr__()
|
||||
+ ("\n * Path: " + str(self.path))
|
||||
+ ("\n * Python Path: " + str(self.python_path))
|
||||
)
|
||||
|
||||
@override
|
||||
def __json__(self) -> dict[str, Any]: # type: ignore[no-any-explicit,no-any-decorated]
|
||||
excluded = (
|
||||
"app", # prevent circular deps
|
||||
)
|
||||
return {k: v for k, v in self.__dict__.items() if k not in excluded} # type: ignore[no-any-expr]
|
||||
|
||||
def __init__(self, *, name: str, path: Path):
|
||||
self.name = name
|
||||
self.path = path
|
||||
self.python_path = self.path.joinpath(self.name)
|
||||
|
||||
@override
|
||||
def __repr__(self) -> str:
|
||||
return (
|
||||
super().__repr__()
|
||||
+ ("\n * Path: " + str(self.path))
|
||||
+ ("\n * Python Path: " + str(self.python_path))
|
||||
+ ("\n * Modules:\n\t" + ("\n\t".join([str(module) for module in self.modules]) or "n/a"))
|
||||
)
|
||||
|
||||
@override
|
||||
def __str__(self) -> str:
|
||||
return self.name
|
||||
|
||||
@override
|
||||
def __json__(self) -> dict[str, Module]:
|
||||
return self.modules
|
||||
|
||||
@property
|
||||
def modules(self) -> dict[str, Module]:
|
||||
if not hasattr(self, "__modules"):
|
||||
self.__modules = {
|
||||
d.name: module
|
||||
for d in self.python_path.iterdir()
|
||||
if d.is_dir() and (module := self.Module(name=d.name, path=d, app=self))
|
||||
}
|
||||
return self.__modules
|
||||
|
||||
def __iter__(self) -> Iterator[Module]:
|
||||
return iter(self.modules.values())
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self.modules)
|
||||
|
||||
def __getitem__(self, key: str) -> Module:
|
||||
return self.modules[key]
|
||||
|
||||
def __init__(self, path: str | Path | None = None, parent_path: Path | None = None) -> None:
|
||||
super().__init__(path, parent_path)
|
||||
|
||||
@override
|
||||
def __repr__(self) -> str:
|
||||
return (
|
||||
super().__repr__()
|
||||
+ ("\n * Apps Path: " + str(self.path))
|
||||
+ ("\n * Loaded Apps:\n\t" + ("\n\t".join([str(app) for app in self.apps]) or "n/a"))
|
||||
)
|
||||
|
||||
@override
|
||||
def __json__(self) -> dict[str, App]:
|
||||
return self.apps
|
||||
|
||||
@property
|
||||
def apps(self) -> dict[str, App]:
|
||||
if not hasattr(self, "__apps"):
|
||||
self.__apps = {
|
||||
d.name: app
|
||||
for d in self.path.iterdir()
|
||||
if d.is_dir() and (app := self.App(name=d.name, path=d))
|
||||
}
|
||||
return self.__apps
|
||||
|
||||
def __iter__(self) -> Iterator[App]:
|
||||
return iter(self.apps.values())
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self.apps)
|
||||
|
||||
def __getitem__(self, key: str) -> App:
|
||||
return self.apps[key]
|
||||
|
||||
|
||||
class Logs(BenchPathResolver):
|
||||
def __init__(self, *, path: str | Path | None = None, parent_path: Path | None = None):
|
||||
super().__init__(path, parent_path)
|
||||
|
||||
def get_log_file(self, log_type: str) -> Path:
|
||||
return self.path.joinpath(f"{log_type}.log")
|
||||
|
||||
|
||||
class Run(BenchPathResolver):
|
||||
def __init__(self, *, path: str | Path | None = None, parent_path: Path | None = None):
|
||||
super().__init__(
|
||||
path
|
||||
# config is the legacy naming of this folder; a misnomer
|
||||
or (parent_path.joinpath("config") if parent_path else None),
|
||||
parent_path,
|
||||
)
|
||||
|
||||
|
||||
class Sites(BenchPathResolver, ConfigHandler):
|
||||
SCOPE_ALL_SITES: Final = "__scope-all-sites__"
|
||||
__sites: dict[str, "Sites.Site"]
|
||||
|
||||
class Site(ConfigHandler, PathLike, Serializable):
|
||||
_combined_config: ConfigType
|
||||
|
||||
def __init__(self, *, bench: "Bench", name: str, path: str | Path):
|
||||
self.name = name
|
||||
self.path = Path(path)
|
||||
self.bench: Bench = bench
|
||||
ConfigHandler.__init__(self, self.path.joinpath("site_config.json"))
|
||||
|
||||
@override
|
||||
def __repr__(self) -> str:
|
||||
return (
|
||||
super().__repr__() + ("\n * Site Path: " + str(self.path)) + ("\n * Site Name: " + self.name)
|
||||
)
|
||||
|
||||
@override
|
||||
def __eq__(self, o: Any) -> bool: # type: ignore[no-any-explicit,no-any-decorated]
|
||||
return self.name == str(o) # type: ignore[no-any-expr]
|
||||
|
||||
@override
|
||||
def __hash__(self) -> int:
|
||||
return hash(self.name)
|
||||
|
||||
@override
|
||||
def __str__(self) -> str:
|
||||
return self.name
|
||||
|
||||
@override
|
||||
def __json__(self) -> dict[str, Any]: # type: ignore[no-any-explicit,no-any-decorated]
|
||||
excluded = (
|
||||
"bench", # prevent circular deps
|
||||
"_ConfigHandler__config", # holds file contents
|
||||
"_config_stale", # never stale after accessored
|
||||
"_config",
|
||||
)
|
||||
naming = {"_combined_config": "config"}
|
||||
self.config # ensure config is loaded
|
||||
return {naming.get(k, k): v for k, v in self.__dict__.items() if k not in excluded} # type: ignore[no-any-expr]
|
||||
|
||||
@property
|
||||
@override
|
||||
def config(self) -> ConfigType:
|
||||
if not hasattr(self, "_combined_config") or self._config_stale or self.bench.sites._config_stale:
|
||||
site_config = super().config.copy()
|
||||
config = self.bench.sites.config.copy()
|
||||
config.update(site_config)
|
||||
self._combined_config = ConfigType(**config)
|
||||
return self._combined_config
|
||||
|
||||
def __init__(self, *, bench: "Bench", path: str | Path | None = None, parent_path: Path | None = None):
|
||||
BenchPathResolver.__init__(self, path, parent_path)
|
||||
ConfigHandler.__init__(self, self.path.joinpath("common_site_config.json"))
|
||||
self.bench = bench
|
||||
# security & thread-safety: site_name is stored in thread-local storage
|
||||
self.site_name = os.environ.get("FRAPPE_SITE") or cast(str | None, self.config.get("default_site"))
|
||||
|
||||
@override
|
||||
def __repr__(self) -> str:
|
||||
return (
|
||||
super().__repr__()
|
||||
+ ("\n * Sites Path: " + str(self.path))
|
||||
+ ("\n * Scoped Site: " + (self.site_name or "n/a"))
|
||||
+ ("\n * Loaded Sites:\n\t" + ("\n\t".join([str(site) for site in self]) or "n/a"))
|
||||
)
|
||||
|
||||
@override
|
||||
def __json__(self) -> dict[str, Any]: # type: ignore[no-any-explicit,no-any-decorated]
|
||||
excluded = (
|
||||
"bench", # prevent circular deps
|
||||
"_ConfigHandler__config", # holds file contents
|
||||
"_config_stale", # never stale after accessored
|
||||
)
|
||||
naming = {"_config": "config", "_Sites__sites": "_sites"}
|
||||
self._sites # ensure sites are loaded
|
||||
return {naming.get(k, k): v for k, v in self.__dict__.items() if k not in excluded} # type: ignore[no-any-expr]
|
||||
|
||||
@property
|
||||
def site_name(self) -> str | None:
|
||||
import frappe
|
||||
|
||||
return cast(str | None, getattr(frappe.local, "site_name", None))
|
||||
|
||||
@site_name.setter
|
||||
def site_name(self, value):
|
||||
import frappe
|
||||
|
||||
frappe.local.site_name = value
|
||||
|
||||
def add_site(self, site_name: str) -> None:
|
||||
site_path = self.path.joinpath(site_name)
|
||||
for dir_path in [
|
||||
site_path.joinpath("public", "files"),
|
||||
site_path.joinpath("private", "backups"),
|
||||
site_path.joinpath("private", "files"),
|
||||
site_path.joinpath("locks"),
|
||||
site_path.joinpath("logs"),
|
||||
]:
|
||||
dir_path.mkdir(parents=True, exist_ok=True)
|
||||
self.__sites[site_name] = self.Site(bench=self.bench, name=site_name, path=site_path)
|
||||
|
||||
def remove_site(self, site_name: str) -> None:
|
||||
if site_name in self.__sites:
|
||||
del self.__sites[site_name]
|
||||
# site_path = self.path.joinpath(site_name)
|
||||
# Note: This doesn't actually delete the site directory, just removes it from the sites dict
|
||||
# Actual deletion should be handled separately with proper safeguards
|
||||
|
||||
def scope(self, site_name: str | None = None) -> "Sites.Site":
|
||||
if site_name is None:
|
||||
return self.site
|
||||
|
||||
del self.__sites # trigger re-scan after scope
|
||||
self.site_name = site_name
|
||||
|
||||
if self.site_name != self.SCOPE_ALL_SITES:
|
||||
return self.site
|
||||
|
||||
raise BenchNotScopedError("Cannot scope to ALL_SITES")
|
||||
|
||||
@property
|
||||
def scoped(self) -> bool:
|
||||
return bool(self.site_name) and self.site_name != self.SCOPE_ALL_SITES
|
||||
|
||||
@property
|
||||
def site(self) -> Site:
|
||||
# security & thread-safety: site_name is stored in thread-local storage
|
||||
if not self.site_name or self.site_name == self.SCOPE_ALL_SITES:
|
||||
raise BenchNotScopedError("Sites was not scoped to a single site, yet.")
|
||||
return self[self.site_name]
|
||||
|
||||
@property
|
||||
def _sites(self) -> dict[str, Site]:
|
||||
if not hasattr(self, "__sites"):
|
||||
self.__sites = {}
|
||||
|
||||
def _process(path: Path):
|
||||
if path.is_dir() and path.joinpath("site_config.json").exists():
|
||||
self.__sites[path.name] = self.Site(bench=self.bench, name=path.name, path=path)
|
||||
|
||||
# security & thread-safety: site_name is stored in thread-local storage
|
||||
if self.site_name and self.site_name != self.SCOPE_ALL_SITES:
|
||||
_process(self.path.joinpath(self.site_name))
|
||||
elif self.site_name == self.SCOPE_ALL_SITES:
|
||||
for site_path in self.path.iterdir():
|
||||
_process(site_path)
|
||||
return self.__sites
|
||||
|
||||
def __iter__(self) -> Iterator[Site]:
|
||||
# security & thread-safety: site_name is stored in thread-local storage
|
||||
if self.site_name == self.SCOPE_ALL_SITES:
|
||||
return iter(self._sites.values())
|
||||
elif self.site_name:
|
||||
return iter([self[self.site_name]])
|
||||
raise BenchNotScopedError("Sites was not scoped, yet.")
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self._sites)
|
||||
|
||||
def __getitem__(self, key: str) -> Site:
|
||||
try:
|
||||
return self._sites[key]
|
||||
except KeyError:
|
||||
raise BenchSiteNotLoadedError(f"Site '{key}' was not loaded")
|
||||
|
||||
|
||||
class Bench(BenchPathResolver):
|
||||
def __init__(self, path: str | Path | None = None):
|
||||
super().__init__(path)
|
||||
self.logs = Logs(parent_path=self.path)
|
||||
self.run = Run(parent_path=self.path)
|
||||
self.sites = Sites(bench=self, parent_path=self.path)
|
||||
global _current_bench
|
||||
_current_bench = self
|
||||
self.apps = Apps(parent_path=self.path)
|
||||
|
||||
def scope(self, site_name: str) -> Sites.Site:
|
||||
return self.sites.scope(site_name)
|
||||
|
||||
@property
|
||||
def scoped(self) -> bool:
|
||||
return self.sites.scoped
|
||||
|
||||
@override
|
||||
def __repr__(self) -> str:
|
||||
return (
|
||||
super().__repr__()
|
||||
+ ("\n * Bench Path: " + str(self.path))
|
||||
+ ("\n\n" + str(self.apps))
|
||||
+ ("\n\n" + str(self.sites))
|
||||
)
|
||||
206
frappe/config.py
Normal file
206
frappe/config.py
Normal file
|
|
@ -0,0 +1,206 @@
|
|||
import importlib
|
||||
import json
|
||||
import os
|
||||
import pprint
|
||||
import re
|
||||
import traceback
|
||||
import warnings
|
||||
from collections.abc import Callable
|
||||
from pathlib import Path
|
||||
from typing import Any, TypeAlias, TypedDict, cast
|
||||
|
||||
from filelock import FileLock, Timeout
|
||||
from typing_extensions import NotRequired, override
|
||||
|
||||
|
||||
class FrappeUnregisteredConfigOptionWarning(Warning):
|
||||
pass
|
||||
|
||||
|
||||
ConfigValue: TypeAlias = str | bool | int | float | list["ConfigValue"] | dict[str, "ConfigValue"]
|
||||
ConfigCallable: TypeAlias = Callable[["ConfigType"], ConfigValue | None]
|
||||
|
||||
|
||||
class ConfigType(dict[str, ConfigValue]):
|
||||
"""A dictionary subclass that provides attribute-style access to configuration options.
|
||||
|
||||
Warns when accessing unregistered configuration options.
|
||||
"""
|
||||
|
||||
@override
|
||||
def __repr__(self) -> str:
|
||||
return pprint.pformat(dict(self), indent=2, width=80, sort_dicts=False)
|
||||
|
||||
def __getattr__(self, name: str) -> ConfigValue | None:
|
||||
if name not in registry.options:
|
||||
# filter out noise in ipython console
|
||||
if not name.startswith("_ipython") and name != "_repr_mimebundle_":
|
||||
warnings.warn(
|
||||
f"Accessing unregistered configuration option: {name}",
|
||||
FrappeUnregisteredConfigOptionWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
if name not in self and (option := registry.options.get(name)) and (default := option["default"]):
|
||||
if callable(default):
|
||||
return default(self)
|
||||
return default
|
||||
elif name not in self:
|
||||
return None
|
||||
return self[name]
|
||||
|
||||
|
||||
class ConfigRegistryOption(TypedDict):
|
||||
docstring: str
|
||||
default: NotRequired[ConfigCallable | ConfigValue | None]
|
||||
|
||||
|
||||
class ConfigRegistry:
|
||||
"""Registry for configuration options with their documentation and default values."""
|
||||
|
||||
def __init__(self):
|
||||
self.options: dict[str, ConfigRegistryOption] = {}
|
||||
|
||||
def register(self, option: str, docstring: str, default: ConfigCallable | ConfigValue | None):
|
||||
self.options[option] = {"docstring": docstring, "default": default}
|
||||
|
||||
@override
|
||||
def __repr__(self) -> str:
|
||||
if not self.options:
|
||||
return "ConfigRegistry(No options registered)"
|
||||
|
||||
# Find the maximum lengths for formatting
|
||||
max_option_length = max(len(option) for option in self.options)
|
||||
max_default_length = max(len(self._format_default(opt["default"])) for opt in self.options.values())
|
||||
|
||||
# Create the header
|
||||
header = f"{'Option':<{max_option_length}} | {'Default':<{max_default_length}} | Description"
|
||||
separator = f"{'-' * max_option_length}-+-{'-' * max_default_length}-+{'-' * 20}"
|
||||
|
||||
# Create the table rows
|
||||
rows = []
|
||||
for option, details in self.options.items():
|
||||
default = self._format_default(details["default"])
|
||||
docstring = details["docstring"].replace("\n", " ") # Remove any newlines in docstring
|
||||
row = f"{option:<{max_option_length}} | {default:<{max_default_length}} | {docstring}"
|
||||
rows.append(row)
|
||||
|
||||
# Combine all parts
|
||||
table = "\n".join([header, separator, *rows])
|
||||
return f"ConfigRegistry:\n{table}"
|
||||
|
||||
def _format_default(self, default: ConfigCallable | ConfigValue | None) -> str:
|
||||
if callable(default):
|
||||
return "<dynamic>"
|
||||
return str(default)
|
||||
|
||||
|
||||
registry = ConfigRegistry()
|
||||
|
||||
|
||||
def register(option: str, docstring: str, default: ConfigCallable | ConfigValue | None):
|
||||
"""Register a new configuration option with documentation and default value.
|
||||
|
||||
Args:
|
||||
option: Name of the configuration option
|
||||
docstring: Documentation describing the option
|
||||
default: Default value for the option
|
||||
"""
|
||||
registry.register(option, docstring, default)
|
||||
|
||||
|
||||
# Global default config
|
||||
register("redis_queue", "Redis URL for queue management", "redis://127.0.0.1:11311")
|
||||
register("redis_cache", "Redis URL for caching", "redis://127.0.0.1:13311")
|
||||
register("db_type", "Database type (mariadb or postgres)", "mariadb")
|
||||
register("db_host", "Database host address", "127.0.0.1")
|
||||
register("db_port", "Database port number", lambda c: 5432 if c.db_type == "postgres" else 3306)
|
||||
register("db_user", "Database user name", lambda c: c.db_name)
|
||||
register("db_name", "Database name", lambda c: c.db_user)
|
||||
register("db_socket", "Unix socket file path for database connection (optional)", None)
|
||||
|
||||
|
||||
class ConfigHandler:
|
||||
"""Handles loading, storing and updating configuration values from files and environment.
|
||||
|
||||
Supports hot reloading of configuration upon tainting.
|
||||
"""
|
||||
|
||||
__config: dict[str, ConfigValue | None]
|
||||
_config: ConfigType
|
||||
_config_stale: bool
|
||||
|
||||
def __init__(self, config_path: str | Path):
|
||||
self.config_path = Path(config_path)
|
||||
self._config_stale = True
|
||||
|
||||
def taint(self):
|
||||
"Mark configuration as stale to trigger reload"
|
||||
self._config_stale = True
|
||||
|
||||
@property
|
||||
def config(self) -> ConfigType:
|
||||
"Get current configuration, reloading if stale"
|
||||
if not hasattr(self, "_config") or self._config_stale:
|
||||
if self.config_path.exists():
|
||||
self.__config = json.load(self.config_path.open())
|
||||
self._config = ConfigType(**self.__config)
|
||||
self._update_from_env()
|
||||
self._apply_extra_config()
|
||||
# TODO: enable in-memory caching only once we have identified a mechanism to hot-reload on external config changes
|
||||
# self._config_stale = False
|
||||
return self._config
|
||||
|
||||
def update_config(self, updates: dict[str, ConfigValue | None]):
|
||||
"""Update configuration with new values and save to config file.
|
||||
|
||||
Args:
|
||||
updates: Dictionary of configuration updates to apply
|
||||
|
||||
Raises:
|
||||
Timeout: If unable to acquire file lock for saving
|
||||
"""
|
||||
self.__config.update(updates)
|
||||
try:
|
||||
with FileLock(f"{self.config_path}.lock", timeout=5):
|
||||
from frappe.utils.response import json_handler
|
||||
|
||||
json.dump(
|
||||
self.__config,
|
||||
self.config_path.open("w"),
|
||||
indent=2,
|
||||
default=json_handler, # type: ignore[no-any-expr]
|
||||
sort_keys=True,
|
||||
)
|
||||
|
||||
except Timeout as e:
|
||||
from frappe.utils.error import log_error
|
||||
|
||||
log_error(f"Filelock: Failed to aquire {self.config_path}.lock") # type: ignore[no-untyped-call]
|
||||
raise e
|
||||
self._config_stale = True
|
||||
|
||||
def _update_from_env(self):
|
||||
"Update config values from environment variables"
|
||||
assert isinstance(self._config, ConfigType) # will never be None by now
|
||||
for key in self._config.keys():
|
||||
# Convert camelCase or kebab-case to SNAKE_CASE
|
||||
env_key = re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", key)
|
||||
env_key = env_key.replace("-", "_")
|
||||
env_key = f"FRAPPE_{env_key.upper()}"
|
||||
if env_value := os.environ.get(env_key):
|
||||
self._config[key] = env_value
|
||||
|
||||
def _apply_extra_config(self):
|
||||
"Apply additional configuration from external modules"
|
||||
# TODO: maybe motion to deprecate https://github.com/frappe/frappe/pull/24706#issuecomment-2471209484
|
||||
assert isinstance(self._config, ConfigType) # will never be None by now
|
||||
if extra_config := cast(str | list[str], self._config.get("extra_config")):
|
||||
if isinstance(extra_config, str):
|
||||
extra_config = [extra_config]
|
||||
for hook in extra_config:
|
||||
try:
|
||||
module, method = hook.rsplit(".", 1)
|
||||
self._config.update(getattr(importlib.import_module(module), method)()) # type: ignore[no-any-expr]
|
||||
except Exception:
|
||||
print(f"Config hook {hook} failed")
|
||||
traceback.print_exc()
|
||||
|
|
@ -1005,3 +1005,17 @@ def get_number_format_info(format: str) -> tuple[str, str, int]:
|
|||
from frappe.utils.number_format import NUMBER_FORMAT_MAP
|
||||
|
||||
return NUMBER_FORMAT_MAP.get(format) or (".", ",", 2)
|
||||
|
||||
|
||||
@deprecated(
|
||||
"modules.txt",
|
||||
"2024-11-12",
|
||||
"yet unknown",
|
||||
"""It has been added for compatibility in addition to the new .frappe sentinel file inside the module. This is for your info: you don't have to do anything.
|
||||
""",
|
||||
)
|
||||
def boilerplate_modules_txt(dest, app_name, app_title):
|
||||
import frappe
|
||||
|
||||
with open(os.path.join(dest, app_name, app_name, "modules.txt"), "w") as f:
|
||||
f.write(frappe.as_unicode(app_title))
|
||||
|
|
|
|||
|
|
@ -4,6 +4,11 @@
|
|||
# BEWARE don't put anything in this file except exceptions
|
||||
from werkzeug.exceptions import NotFound
|
||||
|
||||
from .bencher import (
|
||||
BenchNotScopedError,
|
||||
BenchSiteNotLoadedError,
|
||||
)
|
||||
|
||||
|
||||
class SiteNotSpecifiedError(Exception):
|
||||
def __init__(self, *args, **kwargs):
|
||||
|
|
|
|||
|
|
@ -166,8 +166,14 @@ def _create_app_boilerplate(dest, hooks, no_git=False):
|
|||
with open(os.path.join(dest, hooks.app_name, "license.txt"), "w") as f:
|
||||
f.write(frappe.as_unicode(license_body))
|
||||
|
||||
with open(os.path.join(dest, hooks.app_name, hooks.app_name, "modules.txt"), "w") as f:
|
||||
f.write(frappe.as_unicode(hooks.app_title))
|
||||
with open(
|
||||
os.path.join(dest, hooks.app_name, hooks.app_name, frappe.scrub(hooks.app_title), ".frappe"), "w"
|
||||
) as f:
|
||||
f.write("")
|
||||
|
||||
from frappe.deprecation_dumpster import boilerplate_modules_txt
|
||||
|
||||
boilerplate_modules_txt(dest, hooks.app_name, hooks.app_title)
|
||||
|
||||
# These values could contain quotes and can break string declarations
|
||||
# So escaping them before setting variables in setup.py and hooks.py
|
||||
|
|
|
|||
|
|
@ -226,6 +226,8 @@ files = [
|
|||
"frappe/types/docref.py",
|
||||
"frappe/types/frappedict.py",
|
||||
"frappe/types/filter.py",
|
||||
"frappe/bencher.py",
|
||||
"frappe/config.py",
|
||||
]
|
||||
exclude = [
|
||||
# permanent excludes
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue