refactor: improve test, rate limiting
This commit is contained in:
parent
a6db8ab5bc
commit
a43a0149eb
3 changed files with 44 additions and 30 deletions
|
|
@ -2,12 +2,14 @@
|
|||
# License: MIT. See LICENSE
|
||||
import time
|
||||
|
||||
import requests
|
||||
|
||||
import frappe
|
||||
import frappe.utils
|
||||
from frappe.auth import LoginAttemptTracker
|
||||
from frappe.frappeclient import AuthError, FrappeClient
|
||||
from frappe.tests.utils import FrappeTestCase
|
||||
from frappe.www.login import login_via_key, send_login_link
|
||||
from frappe.www.login import _generate_temporary_login_link
|
||||
|
||||
|
||||
def add_user(email, password, username=None, mobile_no=None):
|
||||
|
|
@ -128,16 +130,30 @@ class TestAuth(FrappeTestCase):
|
|||
FrappeClient(self.HOST_NAME, self.test_user_email, self.test_user_password).get_list("ToDo")
|
||||
|
||||
def test_login_with_email_link(self):
|
||||
frappe.form_dict.email = self.test_user_email
|
||||
frappe.local.request_ip = "127.0.0.1"
|
||||
|
||||
# test with valid key
|
||||
key = send_login_link()
|
||||
self.assertEqual(login_via_key(key), self.test_user_email)
|
||||
user = self.test_user_email
|
||||
|
||||
# test with invalid key
|
||||
login_via_key("invalid_key")
|
||||
self.assertEqual(frappe.local.response["http_status_code"], 403)
|
||||
# Logs in
|
||||
res = requests.get(_generate_temporary_login_link(user, 10))
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertTrue(res.cookies.get("sid"))
|
||||
self.assertNotEqual(res.cookies.get("sid"), "Guest")
|
||||
|
||||
# Random incorrect URL
|
||||
res = requests.get(_generate_temporary_login_link(user, 10) + "aa")
|
||||
self.assertEqual(res.cookies.get("sid"), "Guest")
|
||||
|
||||
# POST doesn't work
|
||||
res = requests.post(_generate_temporary_login_link(user, 10))
|
||||
self.assertEqual(res.status_code, 403)
|
||||
|
||||
# Rate limiting
|
||||
for _ in range(6):
|
||||
res = requests.get(_generate_temporary_login_link(user, 10))
|
||||
if res.status_code == 417:
|
||||
break
|
||||
else:
|
||||
self.fail("Rate limting not working")
|
||||
|
||||
|
||||
class TestLoginAttemptTracker(FrappeTestCase):
|
||||
|
|
|
|||
|
|
@ -13,9 +13,6 @@ from frappe.utils import cint
|
|||
|
||||
|
||||
class TestRateLimiter(FrappeTestCase):
|
||||
def setUp(self):
|
||||
pass
|
||||
|
||||
def test_apply_with_limit(self):
|
||||
frappe.conf.rate_limit = {"window": 86400, "limit": 1}
|
||||
frappe.rate_limiter.apply()
|
||||
|
|
|
|||
|
|
@ -149,21 +149,11 @@ def login_via_token(login_token):
|
|||
|
||||
|
||||
@frappe.whitelist(allow_guest=True)
|
||||
@rate_limit(key="email", limit=5, seconds=60 * 60)
|
||||
@rate_limit(limit=5, seconds=60 * 60)
|
||||
def send_login_link(email: str):
|
||||
if not frappe.db.exists("User", email):
|
||||
frappe.throw(
|
||||
_("User with email address {0} does not exist").format(email), frappe.DoesNotExistError
|
||||
)
|
||||
|
||||
key = frappe.generate_hash("Login Link", 20)
|
||||
minutes = frappe.get_system_settings("login_with_email_link_expiry") or 10
|
||||
frappe.cache().set_value(f"one_time_login_key:{key}", email, expires_in_sec=minutes * 60)
|
||||
|
||||
link = get_url(f"/api/method/frappe.www.login.login_via_key?key={key}")
|
||||
|
||||
if frappe.flags.in_test:
|
||||
return key
|
||||
expiry = frappe.get_system_settings("login_with_email_link_expiry") or 10
|
||||
link = _generate_temporary_login_link(email, expiry)
|
||||
|
||||
app_name = (
|
||||
frappe.get_website_settings("app_name") or frappe.get_system_settings("app_name") or _("Frappe")
|
||||
|
|
@ -175,12 +165,26 @@ def send_login_link(email: str):
|
|||
subject=subject,
|
||||
recipients=email,
|
||||
template="login_with_email_link",
|
||||
args={"link": link, "minutes": minutes, "app_name": app_name},
|
||||
args={"link": link, "minutes": expiry, "app_name": app_name},
|
||||
now=True,
|
||||
)
|
||||
|
||||
|
||||
@frappe.whitelist(allow_guest=True)
|
||||
def _generate_temporary_login_link(email: str, expiry: int):
|
||||
assert isinstance(email, str)
|
||||
|
||||
if not frappe.db.exists("User", email):
|
||||
frappe.throw(
|
||||
_("User with email address {0} does not exist").format(email), frappe.DoesNotExistError
|
||||
)
|
||||
key = frappe.generate_hash()
|
||||
frappe.cache().set_value(f"one_time_login_key:{key}", email, expires_in_sec=expiry * 60)
|
||||
|
||||
return get_url(f"/api/method/frappe.www.login.login_via_key?key={key}")
|
||||
|
||||
|
||||
@frappe.whitelist(allow_guest=True, methods=["GET"])
|
||||
@rate_limit(limit=5, seconds=60 * 60)
|
||||
def login_via_key(key: str):
|
||||
cache_key = f"one_time_login_key:{key}"
|
||||
email = frappe.cache().get_value(cache_key)
|
||||
|
|
@ -188,9 +192,6 @@ def login_via_key(key: str):
|
|||
if email:
|
||||
frappe.cache().delete_value(cache_key)
|
||||
|
||||
if frappe.flags.in_test:
|
||||
return email
|
||||
|
||||
frappe.local.login_manager.login_as(email)
|
||||
|
||||
redirect_post_login(
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue