diff --git a/frappe/auth.py b/frappe/auth.py index 3238beaa64..946a8c52d5 100644 --- a/frappe/auth.py +++ b/frappe/auth.py @@ -207,23 +207,44 @@ class LoginManager: if frappe.session.user != "Guest": clear_sessions(frappe.session.user, keep_current=True) - def authenticate(self, user=None, pwd=None): + def authenticate(self, user: str = None, pwd: str = None): + from frappe.core.doctype.user.user import User + if not (user and pwd): user, pwd = frappe.form_dict.get('usr'), frappe.form_dict.get('pwd') if not (user and pwd): self.fail(_('Incomplete login details'), user=user) - if cint(frappe.db.get_value("System Settings", "System Settings", "allow_login_using_mobile_number")): - user = frappe.db.get_value("User", filters={"mobile_no": user}, fieldname="name") or user + # Ignore password check if tmp_id is set, 2FA takes care of authentication. + validate_password = not bool(frappe.form_dict.get('tmp_id')) + user = User.find_by_credentials(user, pwd, validate_password=validate_password) - if cint(frappe.db.get_value("System Settings", "System Settings", "allow_login_using_user_name")): - user = frappe.db.get_value("User", filters={"username": user}, fieldname="name") or user + if not user: + self.fail('Invalid login credentials') - self.check_if_enabled(user) - if not frappe.form_dict.get('tmp_id'): - self.user = self.check_password(user, pwd) + sys_settings = frappe.get_doc("System Settings") + track_login_attempts = (sys_settings.allow_consecutive_login_attempts >0) + + tracker_kwargs = {} + if track_login_attempts: + tracker_kwargs['lock_interval'] = sys_settings.allow_login_after_fail + tracker_kwargs['max_consecutive_login_attempts'] = sys_settings.allow_consecutive_login_attempts + + tracker = LoginAttemptTracker(user.name, **tracker_kwargs) + + if track_login_attempts and not tracker.is_user_allowed(): + frappe.throw(_("Your account has been locked and will resume after {0} seconds") + .format(sys_settings.allow_login_after_fail), frappe.SecurityException) + + if not user.is_authenticated: + tracker.add_failure_attempt() + self.fail('Invalid login credentials', user=user.name) + elif not (user.name == 'Administrator' or user.enabled): + tracker.add_failure_attempt() + self.fail('User disabled or missing', user=user.name) else: - self.user = user + tracker.add_success_attempt() + self.user = user.name def force_user_to_reset_password(self): if not self.user: @@ -245,23 +266,12 @@ class LoginManager: if last_pwd_reset_days > reset_pwd_after_days: return True - def check_if_enabled(self, user): - """raise exception if user not enabled""" - doc = frappe.get_doc("System Settings") - if cint(doc.allow_consecutive_login_attempts) > 0: - check_consecutive_login_attempts(user, doc) - - if user=='Administrator': return - if not cint(frappe.db.get_value('User', user, 'enabled')): - self.fail('User disabled or missing', user=user) - def check_password(self, user, pwd): """check password""" try: # returns user in correct case return check_password(user, pwd) except frappe.AuthenticationError: - self.update_invalid_login(user) self.fail('Incorrect password', user=user) def fail(self, message, user=None): @@ -272,15 +282,6 @@ class LoginManager: frappe.db.commit() raise frappe.AuthenticationError - def update_invalid_login(self, user): - last_login_tried = get_last_tried_login_data(user) - - failed_count = 0 - if last_login_tried > get_datetime(): - failed_count = get_login_failed_count(user) - - frappe.cache().hset('login_failed_count', user, failed_count + 1) - def run_trigger(self, event='on_login'): for method in frappe.get_hooks().get(event, []): frappe.call(frappe.get_attr(method), login_manager=self) @@ -383,38 +384,6 @@ def clear_cookies(): frappe.session.sid = "" frappe.local.cookie_manager.delete_cookie(["full_name", "user_id", "sid", "user_image", "system_user"]) -def get_last_tried_login_data(user, get_last_login=False): - locked_account_time = frappe.cache().hget('locked_account_time', user) - if get_last_login and locked_account_time: - return locked_account_time - - last_login_tried = frappe.cache().hget('last_login_tried', user) - if not last_login_tried or last_login_tried < get_datetime(): - last_login_tried = get_datetime() + datetime.timedelta(seconds=60) - - frappe.cache().hset('last_login_tried', user, last_login_tried) - - return last_login_tried - -def get_login_failed_count(user): - return cint(frappe.cache().hget('login_failed_count', user)) or 0 - -def check_consecutive_login_attempts(user, doc): - login_failed_count = get_login_failed_count(user) - last_login_tried = (get_last_tried_login_data(user, True) - + datetime.timedelta(seconds=doc.allow_login_after_fail)) - - if login_failed_count >= cint(doc.allow_consecutive_login_attempts): - locked_account_time = frappe.cache().hget('locked_account_time', user) - if not locked_account_time: - frappe.cache().hset('locked_account_time', user, get_datetime()) - - if last_login_tried > get_datetime(): - frappe.throw(_("Your account has been locked and will resume after {0} seconds") - .format(doc.allow_login_after_fail), frappe.SecurityException) - else: - delete_login_failed_cache(user) - def validate_ip_address(user): """check if IP Address is valid""" user = frappe.get_cached_doc("User", user) if not frappe.flags.in_test else frappe.get_doc("User", user) diff --git a/frappe/core/doctype/user/user.py b/frappe/core/doctype/user/user.py index 142cc1ee26..3f19a6ef7b 100644 --- a/frappe/core/doctype/user/user.py +++ b/frappe/core/doctype/user/user.py @@ -6,7 +6,7 @@ import frappe from frappe.model.document import Document from frappe.utils import cint, flt, has_gravatar, escape_html, format_datetime, now_datetime, get_formatted_email, today from frappe import throw, msgprint, _ -from frappe.utils.password import update_password as _update_password +from frappe.utils.password import update_password as _update_password, check_password from frappe.desk.notifications import clear_notifications from frappe.desk.doctype.notification_settings.notification_settings import create_notification_settings from frappe.utils.user import get_system_managers @@ -527,6 +527,27 @@ class User(Document): return [i.strip() for i in self.restrict_ip.split(",")] + @classmethod + def find_by_credentials(cls, user_name: str, password: str, validate_password: bool = True): + """Find the user by credentials. + """ + login_with_mobile = cint(frappe.db.get_value("System Settings", "System Settings", "allow_login_using_mobile_number")) + filter = {"mobile_no": user_name} if login_with_mobile else {"name": user_name} + + user = frappe.db.get_value("User", filters=filter, fieldname=['name', 'enabled'], as_dict=True) or {} + if not user: + return + + user['is_authenticated'] = True + if validate_password: + try: + check_password(user_name, password) + except frappe.AuthenticationError: + user['is_authenticated'] = False + + return user + + @frappe.whitelist() def get_timezones(): import pytz diff --git a/frappe/tests/test_auth.py b/frappe/tests/test_auth.py index ace486dae2..c0f61b4863 100644 --- a/frappe/tests/test_auth.py +++ b/frappe/tests/test_auth.py @@ -3,10 +3,8 @@ from __future__ import unicode_literals import time -import frappe, unittest +import unittest from frappe.auth import LoginAttemptTracker -from werkzeug.wrappers import Response -from frappe.app import process_response class TestLoginAttemptTracker(unittest.TestCase): def test_account_lock(self): diff --git a/frappe/utils/data.py b/frappe/utils/data.py index 21fa609b80..3ec1776846 100644 --- a/frappe/utils/data.py +++ b/frappe/utils/data.py @@ -550,13 +550,13 @@ def flt(s, precision=None): return num -def cint(s): +def cint(s, default=0): """Convert to integer :param s: Number in string or other numeric format. :returns: Converted number in python integer type. - Returns 0 if input can not be converted to integer. + Returns default if input can not be converted to integer. Examples: >>> cint("100") @@ -565,9 +565,10 @@ def cint(s): 0 """ - try: num = int(float(s)) - except: num = 0 - return num + try: + return int(float(s)) + except Exception: + return default def floor(s): """