# Copyright (c) 2021, Frappe Technologies Pvt. Ltd. and Contributors # License: MIT. See LICENSE import datetime import time import requests from werkzeug.test import EnvironBuilder from werkzeug.wrappers import Request import frappe from frappe.auth import LoginAttemptTracker from frappe.frappeclient import AuthError, FrappeClient from frappe.sessions import Session, get_expired_sessions, get_expiry_in_seconds from frappe.tests import IntegrationTestCase, UnitTestCase from frappe.tests.test_api import FrappeAPITestCase from frappe.utils import get_datetime, get_site_url, now from frappe.utils.data import add_to_date from frappe.www.login import _generate_temporary_login_link def add_user(email, password, username=None, mobile_no=None): first_name = email.split("@", 1)[0] user = frappe.get_doc( doctype="User", email=email, first_name=first_name, username=username, mobile_no=mobile_no ).insert() user.new_password = password user.simultaneous_sessions = 1 user.add_roles("System Manager") frappe.db.commit() class TestAuth(IntegrationTestCase): @classmethod def setUpClass(cls): super().setUpClass() cls.HOST_NAME = frappe.get_site_config().host_name or get_site_url(frappe.local.site) cls.test_user_email = "test_auth@test.com" cls.test_user_name = "test_auth_user" cls.test_user_mobile = "+911234567890" cls.test_user_password = "pwd_012" cls.tearDownClass() add_user( email=cls.test_user_email, password=cls.test_user_password, username=cls.test_user_name, mobile_no=cls.test_user_mobile, ) @classmethod def tearDownClass(cls): frappe.delete_doc("User", cls.test_user_email, force=True) frappe.local.request_ip = None frappe.form_dict.email = None frappe.local.response["http_status_code"] = None def set_system_settings(self, k, v): frappe.db.set_single_value("System Settings", k, v) frappe.clear_cache() frappe.db.commit() def test_allow_login_using_mobile(self): self.set_system_settings("allow_login_using_mobile_number", 1) self.set_system_settings("allow_login_using_user_name", 0) # Login by both email and mobile should work FrappeClient(self.HOST_NAME, self.test_user_mobile, self.test_user_password) FrappeClient(self.HOST_NAME, self.test_user_email, self.test_user_password) # login by username should fail with self.assertRaises(AuthError): FrappeClient(self.HOST_NAME, self.test_user_name, self.test_user_password) def test_allow_login_using_only_email(self): self.set_system_settings("allow_login_using_mobile_number", 0) self.set_system_settings("allow_login_using_user_name", 0) # Login by mobile number should fail with self.assertRaises(AuthError): FrappeClient(self.HOST_NAME, self.test_user_mobile, self.test_user_password) # login by username should fail with self.assertRaises(AuthError): FrappeClient(self.HOST_NAME, self.test_user_name, self.test_user_password) # Login by email should work FrappeClient(self.HOST_NAME, self.test_user_email, self.test_user_password) def test_allow_login_using_username(self): self.set_system_settings("allow_login_using_mobile_number", 0) self.set_system_settings("allow_login_using_user_name", 1) # Mobile login should fail with self.assertRaises(AuthError): FrappeClient(self.HOST_NAME, self.test_user_mobile, self.test_user_password) # Both email and username logins should work FrappeClient(self.HOST_NAME, self.test_user_email, self.test_user_password) FrappeClient(self.HOST_NAME, self.test_user_name, self.test_user_password) def test_allow_login_using_username_and_mobile(self): self.set_system_settings("allow_login_using_mobile_number", 1) self.set_system_settings("allow_login_using_user_name", 1) # Both email and username and mobile logins should work FrappeClient(self.HOST_NAME, self.test_user_mobile, self.test_user_password) FrappeClient(self.HOST_NAME, self.test_user_email, self.test_user_password) FrappeClient(self.HOST_NAME, self.test_user_name, self.test_user_password) def test_deny_multiple_login(self): self.set_system_settings("deny_multiple_sessions", 1) self.addCleanup(self.set_system_settings, "deny_multiple_sessions", 0) first_login = FrappeClient(self.HOST_NAME, self.test_user_email, self.test_user_password) first_login.get_list("ToDo") second_login = FrappeClient(self.HOST_NAME, self.test_user_email, self.test_user_password) second_login.get_list("ToDo") with self.assertRaises(Exception): first_login.get_list("ToDo") third_login = FrappeClient(self.HOST_NAME, self.test_user_email, self.test_user_password) with self.assertRaises(Exception): first_login.get_list("ToDo") with self.assertRaises(Exception): second_login.get_list("ToDo") third_login.get_list("ToDo") def test_disable_user_pass_login(self): FrappeClient(self.HOST_NAME, self.test_user_email, self.test_user_password).get_list("ToDo") self.set_system_settings("disable_user_pass_login", 1) self.addCleanup(self.set_system_settings, "disable_user_pass_login", 0) with self.assertRaises(Exception): FrappeClient(self.HOST_NAME, self.test_user_email, self.test_user_password).get_list("ToDo") def test_login_with_email_link(self): user = self.test_user_email # Logs in res = requests.get(_generate_temporary_login_link(user, 10)) self.assertEqual(res.status_code, 200) self.assertTrue(res.cookies.get("sid")) self.assertNotEqual(res.cookies.get("sid"), "Guest") # Random incorrect URL res = requests.get(_generate_temporary_login_link(user, 10) + "aa") self.assertEqual(res.cookies.get("sid"), "Guest") # POST doesn't work res = requests.post(_generate_temporary_login_link(user, 10)) self.assertEqual(res.status_code, 403) # Rate limiting for _ in range(6): res = requests.get(_generate_temporary_login_link(user, 10)) if res.status_code == 429: break else: self.fail("Rate limting not working") def test_correct_cookie_expiry_set(self): client = FrappeClient(self.HOST_NAME, self.test_user_email, self.test_user_password) expiry_time = next(x for x in client.session.cookies if x.name == "sid").expires current_time = datetime.datetime.now(tz=datetime.UTC).timestamp() self.assertAlmostEqual(get_expiry_in_seconds(), expiry_time - current_time, delta=60 * 60) class TestAllowedReferrer(UnitTestCase): def test_is_allowed_referrer(self): def create_request(headers): builder = EnvironBuilder(headers=headers) env = builder.get_environ() return Request(env) # Test with valid referrer frappe.cache.set_value("allowed_referrers", ["https://example.com"]) frappe.local.request = create_request({"Referer": "https://example.com/some/path"}) http_request = frappe.auth.HTTPRequest() self.assertTrue(http_request.is_allowed_referrer()) # Test with invalid referrer frappe.local.request = create_request({"Referer": "https://malicious.com"}) http_request = frappe.auth.HTTPRequest() self.assertFalse(http_request.is_allowed_referrer()) # Test with valid origin frappe.local.request = create_request({"Origin": "https://example.com"}) http_request = frappe.auth.HTTPRequest() self.assertTrue(http_request.is_allowed_referrer()) # Test with invalid origin frappe.local.request = create_request({"Origin": "https://malicious.com"}) http_request = frappe.auth.HTTPRequest() self.assertFalse(http_request.is_allowed_referrer()) # Clean up frappe.cache.delete_value("allowed_referrers") frappe.local.request = None class TestLoginAttemptTracker(IntegrationTestCase): def test_account_lock(self): """Make sure that account locks after `n consecutive failures""" tracker = LoginAttemptTracker("tester", max_consecutive_login_attempts=3, lock_interval=60) # Clear the cache by setting attempt as success tracker.add_success_attempt() tracker.add_failure_attempt() self.assertTrue(tracker.is_user_allowed()) tracker.add_failure_attempt() self.assertTrue(tracker.is_user_allowed()) tracker.add_failure_attempt() self.assertTrue(tracker.is_user_allowed()) tracker.add_failure_attempt() self.assertFalse(tracker.is_user_allowed()) def test_account_unlock(self): """Make sure that locked account gets unlocked after lock_interval of time.""" lock_interval = 2 # In sec tracker = LoginAttemptTracker("tester", max_consecutive_login_attempts=1, lock_interval=lock_interval) # Clear the cache by setting attempt as success tracker.add_success_attempt() tracker.add_failure_attempt() self.assertTrue(tracker.is_user_allowed()) tracker.add_failure_attempt() self.assertFalse(tracker.is_user_allowed()) # Sleep for lock_interval of time, so that next request con unlock the user access. time.sleep(lock_interval) tracker.add_failure_attempt() self.assertTrue(tracker.is_user_allowed()) class TestSessionExpirty(FrappeAPITestCase): def test_session_expires(self): sid = self.sid # triggers login for test case login s: Session = frappe.local.session_obj expiry_in = get_expiry_in_seconds() session_created = now() # Try with 1% increments of times, it should always work for step in range(0, 100, 1): seconds_elapsed = expiry_in * step / 100 time_now = add_to_date(session_created, seconds=seconds_elapsed, as_string=True) with self.freeze_time(time_now): data = s.get_session_data_from_db() self.assertEqual(data.user, "Administrator") # 1% higher should immediately expire time_of_expiry = add_to_date(session_created, seconds=expiry_in * 1.01, as_string=True) with self.freeze_time(time_of_expiry): self.assertIn(sid, get_expired_sessions()) self.assertFalse(s.get_session_data_from_db())