diff --git a/.github/ISSUE_TEMPLATE/config.yml b/.github/ISSUE_TEMPLATE/config.yml index 26bb7ab280..6fe6ab1dcd 100644 --- a/.github/ISSUE_TEMPLATE/config.yml +++ b/.github/ISSUE_TEMPLATE/config.yml @@ -1,5 +1,5 @@ blank_issues_enabled: false contact_links: - name: Community Forum - url: https://discuss.erpnext.com/ + url: https://discuss.frappe.io/c/framework/5 about: For general QnA, discussions and community help. diff --git a/frappe/core/doctype/user/user.py b/frappe/core/doctype/user/user.py index 91de1d29be..7ba35f5181 100644 --- a/frappe/core/doctype/user/user.py +++ b/frappe/core/doctype/user/user.py @@ -582,7 +582,7 @@ class User(Document): if len(email_accounts) != len(set(email_accounts)): frappe.throw(_("Email Account added multiple times")) - def get_social_login_userid(self, provider): + def get_social_login_userid(self, provider: str): try: for p in self.social_logins: if p.provider == provider: diff --git a/frappe/integrations/oauth2_logins.py b/frappe/integrations/oauth2_logins.py index b668211c2b..942ad2b51b 100644 --- a/frappe/integrations/oauth2_logins.py +++ b/frappe/integrations/oauth2_logins.py @@ -1,4 +1,4 @@ -# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors +# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and Contributors # License: MIT. See LICENSE import json @@ -9,42 +9,42 @@ from frappe.utils.oauth import login_via_oauth2, login_via_oauth2_id_token @frappe.whitelist(allow_guest=True) -def login_via_google(code, state): +def login_via_google(code: str, state: str): login_via_oauth2("google", code, state, decoder=decoder_compat) @frappe.whitelist(allow_guest=True) -def login_via_github(code, state): +def login_via_github(code: str, state: str): login_via_oauth2("github", code, state) @frappe.whitelist(allow_guest=True) -def login_via_facebook(code, state): +def login_via_facebook(code: str, state: str): login_via_oauth2("facebook", code, state, decoder=decoder_compat) @frappe.whitelist(allow_guest=True) -def login_via_frappe(code, state): +def login_via_frappe(code: str, state: str): login_via_oauth2("frappe", code, state, decoder=decoder_compat) @frappe.whitelist(allow_guest=True) -def login_via_office365(code, state): +def login_via_office365(code: str, state: str): login_via_oauth2_id_token("office_365", code, state, decoder=decoder_compat) @frappe.whitelist(allow_guest=True) -def login_via_salesforce(code, state): +def login_via_salesforce(code: str, state: str): login_via_oauth2("salesforce", code, state, decoder=decoder_compat) @frappe.whitelist(allow_guest=True) -def login_via_fairlogin(code, state): +def login_via_fairlogin(code: str, state: str): login_via_oauth2("fairlogin", code, state, decoder=decoder_compat) @frappe.whitelist(allow_guest=True) -def custom(code, state): +def custom(code: str, state: str): """ Callback for processing code and state for user added providers diff --git a/frappe/utils/oauth.py b/frappe/utils/oauth.py index 7034394fcf..d07011afd1 100644 --- a/frappe/utils/oauth.py +++ b/frappe/utils/oauth.py @@ -1,8 +1,9 @@ -# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors +# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and Contributors # License: MIT. See LICENSE import base64 import json +from typing import TYPE_CHECKING, Callable import jwt @@ -11,12 +12,15 @@ import frappe.utils from frappe import _ from frappe.utils.password import get_decrypted_password +if TYPE_CHECKING: + from frappe.core.doctype.user.user import User + class SignupDisabledError(frappe.PermissionError): - pass + ... -def get_oauth2_providers(): +def get_oauth2_providers() -> dict[str, dict]: out = {} providers = frappe.get_all("Social Login Key", fields=["*"]) for provider in providers: @@ -43,25 +47,19 @@ def get_oauth2_providers(): return out -def get_oauth_keys(provider): +def get_oauth_keys(provider: str) -> dict[str, str]: """get client_id and client_secret from database or conf""" - # try conf - keys = frappe.conf.get(f"{provider}_login") - - if not keys: - # try database - client_id, client_secret = frappe.get_value( - "Social Login Key", provider, ["client_id", "client_secret"] - ) - client_secret = get_decrypted_password("Social Login Key", provider, "client_secret") - keys = {"client_id": client_id, "client_secret": client_secret} - return keys - else: + if keys := frappe.conf.get(f"{provider}_login"): return {"client_id": keys["client_id"], "client_secret": keys["client_secret"]} + return { + "client_id": frappe.db.get_value("Social Login Key", provider, "client_id"), + "client_secret": get_decrypted_password("Social Login Key", provider, "client_secret"), + } -def get_oauth2_authorize_url(provider, redirect_to): + +def get_oauth2_authorize_url(provider: str, redirect_to: str) -> str: flow = get_oauth2_flow(provider) state = { @@ -84,7 +82,7 @@ def get_oauth2_authorize_url(provider, redirect_to): return flow.get_authorize_url(**data) -def get_oauth2_flow(provider): +def get_oauth2_flow(provider: str): from rauth import OAuth2Service # get client_id and client_secret @@ -99,33 +97,35 @@ def get_oauth2_flow(provider): return OAuth2Service(**params) -def get_redirect_uri(provider): +def get_redirect_uri(provider: str) -> str: keys = frappe.conf.get(f"{provider}_login") if keys and keys.get("redirect_uri"): # this should be a fully qualified redirect uri return keys["redirect_uri"] - else: - oauth2_providers = get_oauth2_providers() + oauth2_providers = get_oauth2_providers() + redirect_uri = oauth2_providers[provider]["redirect_uri"] - redirect_uri = oauth2_providers[provider]["redirect_uri"] - - # this uses the site's url + the relative redirect uri - return frappe.utils.get_url(redirect_uri) + # this uses the site's url + the relative redirect uri + return frappe.utils.get_url(redirect_uri) -def login_via_oauth2(provider, code, state, decoder=None): +def login_via_oauth2(provider: str, code: str, state: str, decoder: Callable | None = None): info = get_info_via_oauth(provider, code, decoder) login_oauth_user(info, provider=provider, state=state) -def login_via_oauth2_id_token(provider, code, state, decoder=None): +def login_via_oauth2_id_token( + provider: str, code: str, state: str, decoder: Callable | None = None +): info = get_info_via_oauth(provider, code, decoder, id_token=True) login_oauth_user(info, provider=provider, state=state) -def get_info_via_oauth(provider, code, decoder=None, id_token=False): +def get_info_via_oauth( + provider: str, code: str, decoder: Callable | None = None, id_token: bool = False +): flow = get_oauth2_flow(provider) oauth2_providers = get_oauth2_providers() @@ -144,14 +144,12 @@ def get_info_via_oauth(provider, code, decoder=None, id_token=False): if id_token: parsed_access = json.loads(session.access_token_response.text) - token = parsed_access["id_token"] - info = jwt.decode(token, flow.client_secret, options={"verify_signature": False}) + else: api_endpoint = oauth2_providers[provider].get("api_endpoint") api_endpoint_args = oauth2_providers[provider].get("api_endpoint_args") - info = session.get(api_endpoint, params=api_endpoint_args).json() if provider == "github" and not info.get("email"): @@ -166,22 +164,12 @@ def get_info_via_oauth(provider, code, decoder=None, id_token=False): def login_oauth_user( - data=None, provider=None, state=None, email_id=None, key=None, generate_login_token=False + data: dict | str, + *, + provider: str | None = None, + state: dict | str, + generate_login_token: bool = False, ): - # NOTE: This could lead to security issue as the signed in user can type any email address in complete_signup - # if email_id and key: - # data = json.loads(frappe.db.get_temp(key)) - # # What if data is missing because of an invalid key - # data["email"] = email_id - # - # elif not (data.get("email") and get_first_name(data)) and not frappe.db.exists("User", data.get("email")): - # # ask for user email - # key = frappe.db.set_temp(json.dumps(data)) - # frappe.db.commit() - # frappe.local.response["type"] = "redirect" - # frappe.local.response["location"] = "/complete_signup?key=" + key - # return - # json.loads data and state if isinstance(data, str): data = json.loads(data) @@ -237,110 +225,98 @@ def login_oauth_user( ) -def update_oauth_user(user, data, provider): - if isinstance(data.get("location"), dict): - data["location"] = data.get("location").get("name") - - save = False - - if not frappe.db.exists("User", user): - - # is signup disabled? - if frappe.utils.cint(frappe.db.get_single_value("Website Settings", "disable_signup")): +def get_user_record(user: str, data: dict) -> "User": + try: + return frappe.get_doc("User", user) + except frappe.DoesNotExistError: + if frappe.get_website_settings("disable_signup"): raise SignupDisabledError - save = True - user = frappe.new_doc("User") + user: "User" = frappe.new_doc("User") - gender = data.get("gender", "").title() - - if gender and not frappe.db.exists("Gender", gender): - doc = frappe.new_doc("Gender", {"gender": gender}) - doc.insert(ignore_permissions=True) - - user.update( - { - "doctype": "User", - "first_name": get_first_name(data), - "last_name": get_last_name(data), - "email": get_email(data), - "gender": gender, - "enabled": 1, - "new_password": frappe.generate_hash(), - "location": data.get("location"), - "user_type": "Website User", - "user_image": data.get("picture") or data.get("avatar_url"), - } + if gender := data.get("gender", "").title(): + frappe.get_doc({"doctype": "Gender", "gender": gender}).insert( + ignore_permissions=True, ignore_if_duplicate=True ) - else: - user = frappe.get_doc("User", user) - if not user.enabled: - frappe.respond_as_web_page(_("Not Allowed"), _("User {0} is disabled").format(user.email)) - return False + user.update( + { + "doctype": "User", + "first_name": get_first_name(data), + "last_name": get_last_name(data), + "email": get_email(data), + "gender": gender, + "enabled": 1, + "new_password": frappe.generate_hash(), + "location": data.get("location"), + "user_type": "Website User", + "user_image": data.get("picture") or data.get("avatar_url"), + } + ) - if provider == "facebook" and not user.get_social_login_userid(provider): - save = True - user.set_social_login_userid(provider, userid=data["id"], username=data.get("username")) - user.update({"user_image": "https://graph.facebook.com/{id}/picture".format(id=data["id"])}) + return user - elif provider == "google" and not user.get_social_login_userid(provider): - save = True - user.set_social_login_userid(provider, userid=data["id"]) - elif provider == "github" and not user.get_social_login_userid(provider): - save = True - user.set_social_login_userid(provider, userid=data["id"], username=data.get("login")) +def update_oauth_user(user: str, data: dict, provider: str): + if isinstance(data.get("location"), dict): + data["location"] = data["location"].get("name") - elif provider == "frappe" and not user.get_social_login_userid(provider): - save = True - user.set_social_login_userid(provider, userid=data["sub"]) + user: "User" = get_user_record(user, data) + update_user_record = user.is_new() - elif provider == "office_365" and not user.get_social_login_userid(provider): - save = True - user.set_social_login_userid(provider, userid=data["sub"]) + if not user.enabled: + frappe.respond_as_web_page(_("Not Allowed"), _("User {0} is disabled").format(user.email)) + return False - elif provider == "salesforce" and not user.get_social_login_userid(provider): - save = True - user.set_social_login_userid(provider, userid="/".join(data["sub"].split("/")[-2:])) + if not user.get_social_login_userid(provider): + update_user_record = True + match provider: + case "facebook": + user.set_social_login_userid(provider, userid=data["id"], username=data.get("username")) + user.update({"user_image": f"https://graph.facebook.com/{data['id']}/picture"}) + case "google": + user.set_social_login_userid(provider, userid=data["id"]) + case "github": + user.set_social_login_userid(provider, userid=data["id"], username=data.get("login")) + case "frappe" | "office_365": + user.set_social_login_userid(provider, userid=data["sub"]) + case "salesforce": + user.set_social_login_userid(provider, userid="/".join(data["sub"].split("/")[-2:])) + case _: + user_id_property = ( + frappe.db.get_value("Social Login Key", provider, "user_id_property") or "sub" + ) + user.set_social_login_userid(provider, userid=data[user_id_property]) - elif not user.get_social_login_userid(provider): - save = True - user_id_property = frappe.db.get_value("Social Login Key", provider, "user_id_property") or "sub" - user.set_social_login_userid(provider, userid=data[user_id_property]) - - if save: + if update_user_record: user.flags.ignore_permissions = True user.flags.no_welcome_mail = True - # set default signup role as per Portal Settings - default_role = frappe.db.get_single_value("Portal Settings", "default_role") - if default_role: + if default_role := frappe.db.get_single_value("Portal Settings", "default_role"): user.add_roles(default_role) user.save() -def get_first_name(data): +def get_first_name(data: dict) -> str: return data.get("first_name") or data.get("given_name") or data.get("name") -def get_last_name(data): +def get_last_name(data: dict) -> str: return data.get("last_name") or data.get("family_name") -def get_email(data): +def get_email(data: dict) -> str: return data.get("email") or data.get("upn") or data.get("unique_name") -def redirect_post_login(desk_user, redirect_to=None, provider=None): - # redirect! +def redirect_post_login( + desk_user: bool, redirect_to: str | None = None, provider: str | None = None +): frappe.local.response["type"] = "redirect" if not redirect_to: - # the #desktop is added to prevent a facebook redirect bug desk_uri = "/app/workspace" if provider == "facebook" else "/app" - redirect_to = desk_uri if desk_user else "/me" - redirect_to = frappe.utils.get_url(redirect_to) + redirect_to = frappe.utils.get_url(desk_uri if desk_user else "/me") frappe.local.response["location"] = redirect_to diff --git a/frappe/www/login.py b/frappe/www/login.py index ce65390f3c..347c0fe8d3 100644 --- a/frappe/www/login.py +++ b/frappe/www/login.py @@ -109,32 +109,32 @@ def get_context(context): @frappe.whitelist(allow_guest=True) -def login_via_google(code, state): +def login_via_google(code: str, state: str): login_via_oauth2("google", code, state, decoder=decoder_compat) @frappe.whitelist(allow_guest=True) -def login_via_github(code, state): +def login_via_github(code: str, state: str): login_via_oauth2("github", code, state) @frappe.whitelist(allow_guest=True) -def login_via_facebook(code, state): +def login_via_facebook(code: str, state: str): login_via_oauth2("facebook", code, state, decoder=decoder_compat) @frappe.whitelist(allow_guest=True) -def login_via_frappe(code, state): +def login_via_frappe(code: str, state: str): login_via_oauth2("frappe", code, state, decoder=decoder_compat) @frappe.whitelist(allow_guest=True) -def login_via_office365(code, state): +def login_via_office365(code: str, state: str): login_via_oauth2_id_token("office_365", code, state, decoder=decoder_compat) @frappe.whitelist(allow_guest=True) -def login_via_token(login_token): +def login_via_token(login_token: str): sid = frappe.cache().get_value(f"login_token:{login_token}", expires=True) if not sid: frappe.respond_as_web_page(_("Invalid Request"), _("Invalid Login Token"), http_status_code=417)