refactor: use an alternate key for handling expiry

This allows for less changes to update() + allows impersonated sessions to not end later

Signed-off-by: Akhil Narang <me@akhilnarang.dev>
This commit is contained in:
Akhil Narang 2025-01-24 17:57:21 +05:30
parent 15065a93e3
commit a9c1c49fff
No known key found for this signature in database
GPG key ID: 9DCC61E211BF645F
3 changed files with 29 additions and 19 deletions

View file

@ -163,12 +163,12 @@ class LoginManager:
frappe.form_dict.pop("pwd", None)
self.post_login()
def post_login(self, duration: str | None = None, audit_user: str | None = None):
def post_login(self, session_end: str | None = None, audit_user: str | None = None):
self.run_trigger("on_login")
validate_ip_address(self.user)
self.validate_hour()
self.get_user_info()
self.make_session(duration=duration, audit_user=audit_user)
self.make_session(session_end=session_end, audit_user=audit_user)
self.setup_boot_cache()
self.set_user_info()
@ -216,14 +216,16 @@ class LoginManager:
def clear_preferred_language(self):
frappe.local.cookie_manager.delete_cookie("preferred_language")
def make_session(self, resume: bool = False, duration: str | None = None, audit_user: str | None = None):
def make_session(
self, resume: bool = False, session_end: str | None = None, audit_user: str | None = None
):
# start session
frappe.local.session_obj = Session(
user=self.user,
resume=resume,
full_name=self.full_name,
user_type=self.user_type,
duration=duration,
session_end=session_end,
audit_user=audit_user,
)
@ -344,14 +346,14 @@ class LoginManager:
"""login as guest"""
self.login_as("Guest")
def login_as(self, user: str, duration: str | None = None, audit_user: str | None = None):
def login_as(self, user: str, session_end: str | None = None, audit_user: str | None = None):
self.user = user
self.post_login(duration, audit_user)
self.post_login(session_end, audit_user)
def impersonate(self, user):
current_user = frappe.session.user
session_data = frappe.local.session_obj.data.data
self.login_as(user, duration=session_data.duration, audit_user=session_data.audit_user)
self.login_as(user, session_end=session_data.session_end, audit_user=session_data.audit_user)
# Flag this session as impersonated session, so other code can log this.
frappe.local.session_obj.set_impersonated(current_user)

View file

@ -1175,14 +1175,18 @@ def publish_realtime(context: CliCtxObj, event, message, room, user, doctype, do
@click.command("browse")
@click.argument("site", required=False)
@click.option("--user", required=False, help="Login as user")
@click.option("--duration", required=False, help="Session duration (in hh:mm:ss format)")
@click.option(
"--session-end",
required=False,
help="Session end (in ISO8601 format and timezone-aware - 2025-01-24T12:26:29.200853+00:00)",
)
@click.option("--user-for-audit", required=False, help="The user to mention in audit trail")
@pass_context
def browse(
context: CliCtxObj,
site,
user: str | None = None,
duration: str | None = None,
session_end: str | None = None,
user_for_audit: str | None = None,
):
"""Opens the site on web browser"""
@ -1210,7 +1214,7 @@ def browse(
frappe.utils.set_request(path="/")
frappe.local.cookie_manager = CookieManager()
frappe.local.login_manager = LoginManager()
frappe.local.login_manager.login_as(user, duration, user_for_audit)
frappe.local.login_manager.login_as(user, session_end, user_for_audit)
sid = f"/app?sid={frappe.session.sid}"
else:
click.echo("Please enable developer mode to login as a user")

View file

@ -8,6 +8,7 @@ permission, homepage, default variables, system defaults etc
"""
import json
from datetime import datetime, timezone
from urllib.parse import unquote
import redis
@ -211,7 +212,7 @@ class Session:
resume: bool = False,
full_name: str | None = None,
user_type: str | None = None,
duration: str | None = None,
session_end: str | None = None,
audit_user: str | None = None,
):
self.sid = cstr(
@ -233,7 +234,7 @@ class Session:
else:
if self.user:
self.validate_user()
self.start(duration, audit_user)
self.start(session_end, audit_user)
def validate_user(self):
if not frappe.get_cached_value("User", self.user, "enabled"):
@ -242,7 +243,7 @@ class Session:
frappe.ValidationError,
)
def start(self, duration: str | None = None, audit_user: str | None = None):
def start(self, session_end: str | None = None, audit_user: str | None = None):
"""start a new session"""
# generate sid
if self.user == "Guest":
@ -254,8 +255,9 @@ class Session:
self.sid = self.data.sid = sid
self.data.data.user = self.user
self.data.data.session_ip = frappe.local.request_ip
if duration:
self.data.data.fixed_duration = True
if session_end:
self.data.data.session_end = session_end
if audit_user:
self.data.data.audit_user = audit_user
@ -264,7 +266,7 @@ class Session:
self.data.data.update(
{
"last_updated": frappe.utils.now(),
"session_expiry": duration or get_expiry_period(),
"session_expiry": get_expiry_period(),
"full_name": self.full_name,
"user_type": self.user_type,
}
@ -361,7 +363,10 @@ class Session:
)
expiry = get_expiry_in_seconds(session_data.get("session_expiry"))
if self.time_diff > expiry:
if self.time_diff > expiry or (
(session_end := session_data.get("session_end"))
and datetime.now(tz=timezone.utc) > datetime.fromisoformat(session_end)
):
self._delete_session()
data = None
@ -411,8 +416,7 @@ class Session:
if (
force or (time_diff is None) or (time_diff > 600) or self._update_in_cache
) and not frappe.flags.read_only:
if not self.data.data.fixed_duration:
self.data.data.last_updated = now
self.data.data.last_updated = now
self.data.data.lang = str(frappe.lang)
Sessions = frappe.qb.DocType("Sessions")