seitime-frappe/frappe/tests/test_oauth20.py
Gavin D'souza 3446026555 chore: Update header: license.txt => LICENSE
The license.txt file has been replaced with LICENSE for quite a while
now. INAL but it didn't seem accurate to say "hey, checkout license.txt
although there's no such file". Apart from this, there were
inconsistencies in the headers altogether...this change brings
consistency.
2021-09-03 12:02:59 +05:30

348 lines
9.6 KiB
Python

# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# License: MIT. See LICENSE
import unittest
from urllib.parse import parse_qs, urljoin, urlparse
import jwt
import requests
import frappe
from frappe.integrations.oauth2 import encode_params
from frappe.test_runner import make_test_records
class TestOAuth20(unittest.TestCase):
def setUp(self):
make_test_records("OAuth Client")
make_test_records("User")
self.client_id = frappe.get_all("OAuth Client", fields=["*"])[0].get("client_id")
self.form_header = {"content-type": "application/x-www-form-urlencoded"}
self.scope = "all openid"
self.redirect_uri = "http://localhost"
# Set Frappe server URL reqired for id_token generation
try:
frappe_login_key = frappe.get_doc("Social Login Key", "frappe")
except frappe.DoesNotExistError:
frappe_login_key = frappe.new_doc("Social Login Key")
frappe_login_key.get_social_login_provider("Frappe", initialize=True)
frappe_login_key.base_url = frappe.utils.get_url()
frappe_login_key.enable_social_login = 0
frappe_login_key.save()
frappe.db.commit()
def test_invalid_login(self):
self.assertFalse(check_valid_openid_response())
def test_login_using_authorization_code(self):
update_client_for_auth_code_grant(self.client_id)
session = requests.Session()
login(session)
redirect_destination = None
# Go to Authorize url
try:
session.get(
get_full_url("/api/method/frappe.integrations.oauth2.authorize"),
params=encode_params({
"client_id": self.client_id,
"scope": self.scope,
"response_type": "code",
"redirect_uri": self.redirect_uri
})
)
except requests.exceptions.ConnectionError as ex:
redirect_destination = ex.request.url
# Get authorization code from redirected URL
query = parse_qs(urlparse(redirect_destination).query)
auth_code = query.get("code")[0]
# Request for bearer token
token_response = requests.post(
get_full_url("/api/method/frappe.integrations.oauth2.get_token"),
headers=self.form_header,
data=encode_params({
"grant_type": "authorization_code",
"code": auth_code,
"redirect_uri": self.redirect_uri,
"client_id": self.client_id,
"scope": self.scope,
})
)
# Parse bearer token json
bearer_token = token_response.json()
self.assertTrue(bearer_token.get("access_token"))
self.assertTrue(bearer_token.get("expires_in"))
self.assertTrue(bearer_token.get("id_token"))
self.assertTrue(bearer_token.get("refresh_token"))
self.assertTrue(bearer_token.get("scope"))
self.assertTrue(bearer_token.get("token_type") == "Bearer")
self.assertTrue(check_valid_openid_response(bearer_token.get("access_token")))
def test_login_using_authorization_code_with_pkce(self):
update_client_for_auth_code_grant(self.client_id)
session = requests.Session()
login(session)
redirect_destination = None
# Go to Authorize url
try:
session.get(
get_full_url("/api/method/frappe.integrations.oauth2.authorize"),
params=encode_params({
"client_id": self.client_id,
"scope": self.scope,
"response_type": "code",
"redirect_uri": self.redirect_uri,
"code_challenge_method": 'S256',
"code_challenge": '21XaP8MJjpxCMRxgEzBP82sZ73PRLqkyBUta1R309J0' ,
})
)
except requests.exceptions.ConnectionError as ex:
redirect_destination = ex.request.url
# Get authorization code from redirected URL
query = parse_qs(urlparse(redirect_destination).query)
auth_code = query.get("code")[0]
# Request for bearer token
token_response = requests.post(
get_full_url("/api/method/frappe.integrations.oauth2.get_token"),
headers=self.form_header,
data=encode_params({
"grant_type": "authorization_code",
"code": auth_code,
"redirect_uri": self.redirect_uri,
"client_id": self.client_id,
"scope": self.scope,
"code_verifier": "420",
})
)
# Parse bearer token json
bearer_token = token_response.json()
self.assertTrue(bearer_token.get("access_token"))
self.assertTrue(bearer_token.get("id_token"))
def test_revoke_token(self):
client = frappe.get_doc("OAuth Client", self.client_id)
client.grant_type = "Authorization Code"
client.response_type = "Code"
client.save()
frappe.db.commit()
session = requests.Session()
login(session)
redirect_destination = None
# Go to Authorize url
try:
session.get(
get_full_url("/api/method/frappe.integrations.oauth2.authorize"),
params=encode_params({
"client_id": self.client_id,
"scope": self.scope,
"response_type": "code",
"redirect_uri": self.redirect_uri
})
)
except requests.exceptions.ConnectionError as ex:
redirect_destination = ex.request.url
# Get authorization code from redirected URL
query = parse_qs(urlparse(redirect_destination).query)
auth_code = query.get("code")[0]
# Request for bearer token
token_response = requests.post(
get_full_url("/api/method/frappe.integrations.oauth2.get_token"),
headers=self.form_header,
data=encode_params({
"grant_type": "authorization_code",
"code": auth_code,
"redirect_uri": self.redirect_uri,
"client_id": self.client_id
})
)
# Parse bearer token json
bearer_token = token_response.json()
# Revoke Token
revoke_token_response = requests.post(
get_full_url("/api/method/frappe.integrations.oauth2.revoke_token"),
headers=self.form_header,
data={"token": bearer_token.get("access_token")}
)
self.assertTrue(revoke_token_response.status_code == 200)
# Check revoked token
self.assertFalse(check_valid_openid_response(bearer_token.get("access_token")))
def test_resource_owner_password_credentials_grant(self):
client = frappe.get_doc("OAuth Client", self.client_id)
client.grant_type = "Authorization Code"
client.response_type = "Code"
client.save()
frappe.db.commit()
# Request for bearer token
token_response = requests.post(
get_full_url("/api/method/frappe.integrations.oauth2.get_token"),
headers=self.form_header,
data=encode_params({
"grant_type": "password",
"username": "test@example.com",
"password": "Eastern_43A1W",
"client_id": self.client_id,
"scope": self.scope
})
)
# Parse bearer token json
bearer_token = token_response.json()
# Check token for valid response
self.assertTrue(check_valid_openid_response(bearer_token.get("access_token")))
def test_login_using_implicit_token(self):
oauth_client = frappe.get_doc("OAuth Client", self.client_id)
oauth_client.grant_type = "Implicit"
oauth_client.response_type = "Token"
oauth_client.save()
frappe.db.commit()
session = requests.Session()
login(session)
redirect_destination = None
# Go to Authorize url
try:
session.get(
get_full_url("/api/method/frappe.integrations.oauth2.authorize"),
params=encode_params({
"client_id": self.client_id,
"scope": self.scope,
"response_type": "token",
"redirect_uri": self.redirect_uri
})
)
except requests.exceptions.ConnectionError as ex:
redirect_destination = ex.request.url
response_dict = parse_qs(urlparse(redirect_destination).fragment)
self.assertTrue(response_dict.get("access_token"))
self.assertTrue(response_dict.get("expires_in"))
self.assertTrue(response_dict.get("scope"))
self.assertTrue(response_dict.get("token_type"))
self.assertTrue(check_valid_openid_response(response_dict.get("access_token")[0]))
def test_openid_code_id_token(self):
client = update_client_for_auth_code_grant(self.client_id)
session = requests.Session()
login(session)
redirect_destination = None
nonce = frappe.generate_hash()
# Go to Authorize url
try:
session.get(
get_full_url("/api/method/frappe.integrations.oauth2.authorize"),
params=encode_params({
"client_id": self.client_id,
"scope": self.scope,
"response_type": "code",
"redirect_uri": self.redirect_uri,
"nonce": nonce,
})
)
except requests.exceptions.ConnectionError as ex:
redirect_destination = ex.request.url
# Get authorization code from redirected URL
query = parse_qs(urlparse(redirect_destination).query)
auth_code = query.get("code")[0]
# Request for bearer token
token_response = requests.post(
get_full_url("/api/method/frappe.integrations.oauth2.get_token"),
headers=self.form_header,
data=encode_params({
"grant_type": "authorization_code",
"code": auth_code,
"redirect_uri": self.redirect_uri,
"client_id": self.client_id,
"scope": self.scope,
})
)
# Parse bearer token json
bearer_token = token_response.json()
id_token = bearer_token.get("id_token")
payload = jwt.decode(
id_token,
audience=client.client_id,
key=client.client_secret,
algorithms=["HS256"],
)
self.assertTrue(payload.get("nonce") == nonce)
def check_valid_openid_response(access_token=None):
"""Return True for valid response."""
# Use token in header
headers = {}
if access_token:
headers["Authorization"] = "Bearer " + access_token
# check openid for email test@example.com
openid_response = requests.get(
get_full_url("/api/method/frappe.integrations.oauth2.openid_profile"),
headers=headers
)
return openid_response.status_code == 200
def login(session):
session.post(
get_full_url("/api/method/login"),
data={
"usr": "test@example.com",
"pwd": "Eastern_43A1W"
}
)
def get_full_url(endpoint):
"""Turn '/endpoint' into 'http://127.0.0.1:8000/endpoint'."""
return urljoin(frappe.utils.get_url(), endpoint)
def update_client_for_auth_code_grant(client_id):
client = frappe.get_doc("OAuth Client", client_id)
client.grant_type = "Authorization Code"
client.response_type = "Code"
client.save()
frappe.db.commit()
return client