refactor: Cleaned authentication logic

Auth flow is changed to use login attempt tracker.
This commit is contained in:
leela 2021-02-21 12:18:43 +05:30
parent 49317ce045
commit 6e5e0890f3
4 changed files with 59 additions and 70 deletions

View file

@ -207,23 +207,44 @@ class LoginManager:
if frappe.session.user != "Guest":
clear_sessions(frappe.session.user, keep_current=True)
def authenticate(self, user=None, pwd=None):
def authenticate(self, user: str = None, pwd: str = None):
from frappe.core.doctype.user.user import User
if not (user and pwd):
user, pwd = frappe.form_dict.get('usr'), frappe.form_dict.get('pwd')
if not (user and pwd):
self.fail(_('Incomplete login details'), user=user)
if cint(frappe.db.get_value("System Settings", "System Settings", "allow_login_using_mobile_number")):
user = frappe.db.get_value("User", filters={"mobile_no": user}, fieldname="name") or user
# Ignore password check if tmp_id is set, 2FA takes care of authentication.
validate_password = not bool(frappe.form_dict.get('tmp_id'))
user = User.find_by_credentials(user, pwd, validate_password=validate_password)
if cint(frappe.db.get_value("System Settings", "System Settings", "allow_login_using_user_name")):
user = frappe.db.get_value("User", filters={"username": user}, fieldname="name") or user
if not user:
self.fail('Invalid login credentials')
self.check_if_enabled(user)
if not frappe.form_dict.get('tmp_id'):
self.user = self.check_password(user, pwd)
sys_settings = frappe.get_doc("System Settings")
track_login_attempts = (sys_settings.allow_consecutive_login_attempts >0)
tracker_kwargs = {}
if track_login_attempts:
tracker_kwargs['lock_interval'] = sys_settings.allow_login_after_fail
tracker_kwargs['max_consecutive_login_attempts'] = sys_settings.allow_consecutive_login_attempts
tracker = LoginAttemptTracker(user.name, **tracker_kwargs)
if track_login_attempts and not tracker.is_user_allowed():
frappe.throw(_("Your account has been locked and will resume after {0} seconds")
.format(sys_settings.allow_login_after_fail), frappe.SecurityException)
if not user.is_authenticated:
tracker.add_failure_attempt()
self.fail('Invalid login credentials', user=user.name)
elif not (user.name == 'Administrator' or user.enabled):
tracker.add_failure_attempt()
self.fail('User disabled or missing', user=user.name)
else:
self.user = user
tracker.add_success_attempt()
self.user = user.name
def force_user_to_reset_password(self):
if not self.user:
@ -245,23 +266,12 @@ class LoginManager:
if last_pwd_reset_days > reset_pwd_after_days:
return True
def check_if_enabled(self, user):
"""raise exception if user not enabled"""
doc = frappe.get_doc("System Settings")
if cint(doc.allow_consecutive_login_attempts) > 0:
check_consecutive_login_attempts(user, doc)
if user=='Administrator': return
if not cint(frappe.db.get_value('User', user, 'enabled')):
self.fail('User disabled or missing', user=user)
def check_password(self, user, pwd):
"""check password"""
try:
# returns user in correct case
return check_password(user, pwd)
except frappe.AuthenticationError:
self.update_invalid_login(user)
self.fail('Incorrect password', user=user)
def fail(self, message, user=None):
@ -272,15 +282,6 @@ class LoginManager:
frappe.db.commit()
raise frappe.AuthenticationError
def update_invalid_login(self, user):
last_login_tried = get_last_tried_login_data(user)
failed_count = 0
if last_login_tried > get_datetime():
failed_count = get_login_failed_count(user)
frappe.cache().hset('login_failed_count', user, failed_count + 1)
def run_trigger(self, event='on_login'):
for method in frappe.get_hooks().get(event, []):
frappe.call(frappe.get_attr(method), login_manager=self)
@ -383,38 +384,6 @@ def clear_cookies():
frappe.session.sid = ""
frappe.local.cookie_manager.delete_cookie(["full_name", "user_id", "sid", "user_image", "system_user"])
def get_last_tried_login_data(user, get_last_login=False):
locked_account_time = frappe.cache().hget('locked_account_time', user)
if get_last_login and locked_account_time:
return locked_account_time
last_login_tried = frappe.cache().hget('last_login_tried', user)
if not last_login_tried or last_login_tried < get_datetime():
last_login_tried = get_datetime() + datetime.timedelta(seconds=60)
frappe.cache().hset('last_login_tried', user, last_login_tried)
return last_login_tried
def get_login_failed_count(user):
return cint(frappe.cache().hget('login_failed_count', user)) or 0
def check_consecutive_login_attempts(user, doc):
login_failed_count = get_login_failed_count(user)
last_login_tried = (get_last_tried_login_data(user, True)
+ datetime.timedelta(seconds=doc.allow_login_after_fail))
if login_failed_count >= cint(doc.allow_consecutive_login_attempts):
locked_account_time = frappe.cache().hget('locked_account_time', user)
if not locked_account_time:
frappe.cache().hset('locked_account_time', user, get_datetime())
if last_login_tried > get_datetime():
frappe.throw(_("Your account has been locked and will resume after {0} seconds")
.format(doc.allow_login_after_fail), frappe.SecurityException)
else:
delete_login_failed_cache(user)
def validate_ip_address(user):
"""check if IP Address is valid"""
user = frappe.get_cached_doc("User", user) if not frappe.flags.in_test else frappe.get_doc("User", user)

View file

@ -6,7 +6,7 @@ import frappe
from frappe.model.document import Document
from frappe.utils import cint, flt, has_gravatar, escape_html, format_datetime, now_datetime, get_formatted_email, today
from frappe import throw, msgprint, _
from frappe.utils.password import update_password as _update_password
from frappe.utils.password import update_password as _update_password, check_password
from frappe.desk.notifications import clear_notifications
from frappe.desk.doctype.notification_settings.notification_settings import create_notification_settings
from frappe.utils.user import get_system_managers
@ -527,6 +527,27 @@ class User(Document):
return [i.strip() for i in self.restrict_ip.split(",")]
@classmethod
def find_by_credentials(cls, user_name: str, password: str, validate_password: bool = True):
"""Find the user by credentials.
"""
login_with_mobile = cint(frappe.db.get_value("System Settings", "System Settings", "allow_login_using_mobile_number"))
filter = {"mobile_no": user_name} if login_with_mobile else {"name": user_name}
user = frappe.db.get_value("User", filters=filter, fieldname=['name', 'enabled'], as_dict=True) or {}
if not user:
return
user['is_authenticated'] = True
if validate_password:
try:
check_password(user_name, password)
except frappe.AuthenticationError:
user['is_authenticated'] = False
return user
@frappe.whitelist()
def get_timezones():
import pytz

View file

@ -3,10 +3,8 @@
from __future__ import unicode_literals
import time
import frappe, unittest
import unittest
from frappe.auth import LoginAttemptTracker
from werkzeug.wrappers import Response
from frappe.app import process_response
class TestLoginAttemptTracker(unittest.TestCase):
def test_account_lock(self):

View file

@ -550,13 +550,13 @@ def flt(s, precision=None):
return num
def cint(s):
def cint(s, default=0):
"""Convert to integer
:param s: Number in string or other numeric format.
:returns: Converted number in python integer type.
Returns 0 if input can not be converted to integer.
Returns default if input can not be converted to integer.
Examples:
>>> cint("100")
@ -565,9 +565,10 @@ def cint(s):
0
"""
try: num = int(float(s))
except: num = 0
return num
try:
return int(float(s))
except Exception:
return default
def floor(s):
"""