refactor: container utils and misc tests
This commit is contained in:
parent
ad4cb710f2
commit
3fa4ec1bd6
2 changed files with 103 additions and 31 deletions
|
|
@ -22,15 +22,22 @@ from frappe.model.document import Document
|
|||
from frappe.tests.utils import FrappeTestCase
|
||||
from frappe.utils import (
|
||||
ceil,
|
||||
dict_to_str,
|
||||
evaluate_filters,
|
||||
execute_in_shell,
|
||||
floor,
|
||||
format_timedelta,
|
||||
get_bench_path,
|
||||
get_file_timestamp,
|
||||
get_gravatar,
|
||||
get_site_info,
|
||||
get_sites,
|
||||
get_url,
|
||||
money_in_words,
|
||||
parse_timedelta,
|
||||
random_string,
|
||||
remove_blanks,
|
||||
safe_json_loads,
|
||||
scrub_urls,
|
||||
validate_email_address,
|
||||
validate_name,
|
||||
|
|
@ -39,13 +46,18 @@ from frappe.utils import (
|
|||
)
|
||||
from frappe.utils.data import (
|
||||
add_to_date,
|
||||
add_years,
|
||||
cast,
|
||||
cstr,
|
||||
duration_to_seconds,
|
||||
get_first_day_of_week,
|
||||
get_time,
|
||||
get_timedelta,
|
||||
get_timespan_date_range,
|
||||
getdate,
|
||||
now_datetime,
|
||||
nowtime,
|
||||
to_timedelta,
|
||||
validate_python_code,
|
||||
)
|
||||
from frappe.utils.dateutils import get_dates_from_timegrain
|
||||
|
|
@ -325,7 +337,9 @@ class TestValidationUtils(unittest.TestCase):
|
|||
self.assertFalse(validate_email_address("someone"))
|
||||
self.assertFalse(validate_email_address("someone@----.com"))
|
||||
|
||||
self.assertFalse(validate_email_address("test@example.com test2@example.com,undisclosed-recipient"))
|
||||
self.assertFalse(
|
||||
validate_email_address("test@example.com test2@example.com,undisclosed-recipient")
|
||||
)
|
||||
|
||||
# Invalid with throw
|
||||
self.assertRaises(
|
||||
|
|
@ -502,6 +516,24 @@ class TestDateUtils(unittest.TestCase):
|
|||
self.assertIsInstance(get_timedelta(str(timedelta_input)), timedelta)
|
||||
self.assertIsInstance(get_timedelta(str(time_input)), timedelta)
|
||||
|
||||
def test_to_timedelta(self):
|
||||
self.assertEqual(to_timedelta("00:00:01"), timedelta(seconds=1))
|
||||
self.assertEqual(to_timedelta("10:00:01"), timedelta(seconds=1, hours=10))
|
||||
self.assertEqual(to_timedelta(time(hour=2)), timedelta(hours=2))
|
||||
|
||||
def test_add_date_utils(self):
|
||||
self.assertEqual(add_years(datetime(2020, 1, 1), 1), datetime(2021, 1, 1))
|
||||
|
||||
def test_duration_to_sec(self):
|
||||
self.assertEqual(duration_to_seconds("3h 34m 45s"), 12885)
|
||||
self.assertEqual(duration_to_seconds("1h"), 3600)
|
||||
self.assertEqual(duration_to_seconds("110m"), 110 * 60)
|
||||
self.assertEqual(duration_to_seconds("110m"), 110 * 60)
|
||||
|
||||
|
||||
def test_get_timespan_date_range(self):
|
||||
get_timespan_date_range()
|
||||
|
||||
def test_date_from_timegrain(self):
|
||||
start_date = getdate("2021-01-01")
|
||||
|
||||
|
|
@ -773,3 +805,38 @@ class TestIdenticon(FrappeTestCase):
|
|||
identicon_bs64 = identicon.base64()
|
||||
self.assertIsInstance(identicon_bs64, str)
|
||||
self.assertTrue(identicon_bs64.startswith("data:image/png;base64,"))
|
||||
|
||||
|
||||
class TestContainerUtils(FrappeTestCase):
|
||||
def test_dict_to_str(self):
|
||||
self.assertEqual(dict_to_str({"a": "b"}), "a=b")
|
||||
|
||||
def test_remove_blanks(self):
|
||||
a = {"asd": "", "b": None, "c": "d"}
|
||||
remove_blanks(a)
|
||||
self.assertEqual(len(a), 1)
|
||||
self.assertEqual(a["c"], "d")
|
||||
|
||||
|
||||
class TestMiscUtils(FrappeTestCase):
|
||||
def test_get_file_timestamp(self):
|
||||
self.assertIsInstance(get_file_timestamp(__file__), str)
|
||||
|
||||
def test_execute_in_shell(self):
|
||||
err, out = execute_in_shell("ls")
|
||||
self.assertIn("apps", cstr(out))
|
||||
|
||||
def test_get_all_sites(self):
|
||||
self.assertIn(frappe.local.site, get_sites())
|
||||
|
||||
def test_get_site_info(self):
|
||||
info = get_site_info()
|
||||
|
||||
installed_apps = [app["app_name"] for app in info["installed_apps"]]
|
||||
self.assertIn("frappe", installed_apps)
|
||||
self.assertGreaterEqual(len(info["users"]), 1)
|
||||
|
||||
def test_safe_json_load(self):
|
||||
self.assertEqual(safe_json_loads("{}"), {})
|
||||
self.assertEqual(safe_json_loads("{ /}"), "{ /}")
|
||||
self.assertEqual(safe_json_loads("12"), 12) # this is a quirk
|
||||
|
|
|
|||
|
|
@ -9,12 +9,19 @@ import os
|
|||
import re
|
||||
import sys
|
||||
import traceback
|
||||
from collections.abc import Generator, Iterable, MutableMapping, MutableSequence, Sequence
|
||||
from collections.abc import (
|
||||
Container,
|
||||
Generator,
|
||||
Iterable,
|
||||
MutableMapping,
|
||||
MutableSequence,
|
||||
Sequence,
|
||||
)
|
||||
from email.header import decode_header, make_header
|
||||
from email.utils import formataddr, parseaddr
|
||||
from gzip import GzipFile
|
||||
from typing import Any, Literal
|
||||
from urllib.parse import quote, urlparse
|
||||
from typing import Literal
|
||||
|
||||
from redis.exceptions import ConnectionError
|
||||
from traceback_with_variables import iter_exc_lines
|
||||
|
|
@ -218,7 +225,11 @@ def split_emails(txt):
|
|||
return email_list
|
||||
|
||||
|
||||
def validate_url(txt, throw=False, valid_schemes=None):
|
||||
def validate_url(
|
||||
txt: str,
|
||||
throw: bool = False,
|
||||
valid_schemes: str | Container[str] | None = None,
|
||||
) -> bool:
|
||||
"""
|
||||
Checks whether `txt` has a valid URL string
|
||||
|
||||
|
|
@ -244,7 +255,7 @@ def validate_url(txt, throw=False, valid_schemes=None):
|
|||
return is_valid
|
||||
|
||||
|
||||
def random_string(length):
|
||||
def random_string(length: int) -> str:
|
||||
"""generate a random string"""
|
||||
import string
|
||||
from random import choice
|
||||
|
|
@ -252,7 +263,7 @@ def random_string(length):
|
|||
return "".join(choice(string.ascii_letters + string.digits) for i in range(length))
|
||||
|
||||
|
||||
def has_gravatar(email):
|
||||
def has_gravatar(email: str) -> str:
|
||||
"""Returns gravatar url if user has set an avatar at gravatar.com"""
|
||||
import requests
|
||||
|
||||
|
|
@ -272,12 +283,12 @@ def has_gravatar(email):
|
|||
return ""
|
||||
|
||||
|
||||
def get_gravatar_url(email, default: Literal["mm", "404"]="mm"):
|
||||
def get_gravatar_url(email: str, default: Literal["mm", "404"] = "mm") -> str:
|
||||
hexdigest = hashlib.md5(frappe.as_unicode(email).encode("utf-8")).hexdigest()
|
||||
return f"https://secure.gravatar.com/avatar/{hexdigest}?d={default}&s=200"
|
||||
|
||||
|
||||
def get_gravatar(email):
|
||||
def get_gravatar(email: str) -> str:
|
||||
from frappe.utils.identicon import Identicon
|
||||
|
||||
return has_gravatar(email) or Identicon(email).base64()
|
||||
|
|
@ -307,7 +318,7 @@ def log(event, details):
|
|||
frappe.logger(event).info(details)
|
||||
|
||||
|
||||
def dict_to_str(args, sep="&"):
|
||||
def dict_to_str(args: dict[str, Any], sep: str = "&") -> str:
|
||||
"""
|
||||
Converts a dictionary to URL
|
||||
"""
|
||||
|
|
@ -343,18 +354,13 @@ def set_default(key, val):
|
|||
return frappe.db.set_default(key, val)
|
||||
|
||||
|
||||
def remove_blanks(d):
|
||||
def remove_blanks(d: dict) -> dict:
|
||||
"""
|
||||
Returns d with empty ('' or None) values stripped
|
||||
Returns d with empty ('' or None) values stripped. Mutates inplace.
|
||||
"""
|
||||
empty_keys = []
|
||||
for key in d:
|
||||
if d[key] == "" or d[key] is None:
|
||||
# del d[key] raises runtime exception, using a workaround
|
||||
empty_keys.append(key)
|
||||
for key in empty_keys:
|
||||
del d[key]
|
||||
|
||||
for k, v in tuple(d.items()):
|
||||
if v == "" or v == None:
|
||||
del d[k]
|
||||
return d
|
||||
|
||||
|
||||
|
|
@ -414,21 +420,20 @@ def execute_in_shell(cmd, verbose=0, low_priority=False):
|
|||
import tempfile
|
||||
from subprocess import Popen
|
||||
|
||||
with tempfile.TemporaryFile() as stdout:
|
||||
with tempfile.TemporaryFile() as stderr:
|
||||
kwargs = {"shell": True, "stdout": stdout, "stderr": stderr}
|
||||
with (tempfile.TemporaryFile() as stdout, tempfile.TemporaryFile() as stderr):
|
||||
kwargs = {"shell": True, "stdout": stdout, "stderr": stderr}
|
||||
|
||||
if low_priority:
|
||||
kwargs["preexec_fn"] = lambda: os.nice(10)
|
||||
if low_priority:
|
||||
kwargs["preexec_fn"] = lambda: os.nice(10)
|
||||
|
||||
p = Popen(cmd, **kwargs)
|
||||
p.wait()
|
||||
p = Popen(cmd, **kwargs)
|
||||
p.wait()
|
||||
|
||||
stdout.seek(0)
|
||||
out = stdout.read()
|
||||
stdout.seek(0)
|
||||
out = stdout.read()
|
||||
|
||||
stderr.seek(0)
|
||||
err = stderr.read()
|
||||
stderr.seek(0)
|
||||
err = stderr.read()
|
||||
|
||||
if verbose:
|
||||
if err:
|
||||
|
|
@ -560,7 +565,7 @@ def update_progress_bar(txt, i, l, absolute=False):
|
|||
sys.stdout.flush()
|
||||
return
|
||||
|
||||
if not getattr(frappe.local, "request", None) or is_cli():
|
||||
if not getattr(frappe.local, "request", None) or is_cli(): # pragma: no cover
|
||||
lt = len(txt)
|
||||
try:
|
||||
col = 40 if os.get_terminal_size().columns > 80 else 20
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue