test: fix/refactor oauth tests

This commit is contained in:
barredterra 2020-09-25 16:43:24 +02:00
parent 3275cc4c5a
commit 8dfea8a3e3

View file

@ -4,21 +4,26 @@ from __future__ import unicode_literals
import unittest, frappe, requests, time
from frappe.test_runner import make_test_records
from six.moves.urllib.parse import urlparse, parse_qs
from six.moves.urllib.parse import urlparse, parse_qs, urljoin
class TestOAuth20(unittest.TestCase):
def setUp(self):
make_test_records("OAuth Client")
make_test_records("User")
self.client_id = frappe.get_all("OAuth Client", fields=["*"])[0].get("client_id")
self.form_header = {"content-type": "application/x-www-form-urlencoded"}
self.scope = "all"
self.redirect_uri = "http://localhost"
# Set Frappe server URL reqired for id_token generation
try:
frappe_login_key = frappe.get_doc("Social Login Key", "frappe")
except frappe.DoesNotExistError:
frappe_login_key = frappe.new_doc("Social Login Key")
frappe_login_key.get_social_login_provider("Frappe", initialize=True)
frappe_login_key.base_url = "http://localhost:8000"
frappe_login_key.base_url = frappe.utils.get_url()
frappe_login_key.enable_social_login = 0
frappe_login_key.save()
frappe.db.commit()
@ -34,45 +39,46 @@ class TestOAuth20(unittest.TestCase):
frappe.db.commit()
session = requests.Session()
# Login
session.post(
frappe.get_site_config().host_name + "/api/method/login",
data={"usr":"test@example.com","pwd":"Eastern_43A1W"}
)
login(session)
redirect_destination = None
# Go to Authorize url
try:
session.get(
frappe.get_site_config().host_name + "/api/method/frappe.integrations.oauth2.authorize?client_id=" +
self.client_id +
"&scope=all%20openid&response_type=code&redirect_uri=http%3A%2F%2Flocalhost"
get_full_url("/api/method/frappe.integrations.oauth2.authorize"),
params={
"client_id": self.client_id,
"scope": self.scope,
"response_type": "code",
"redirect_uri": self.redirect_uri
}
)
except requests.exceptions.ConnectionError as ex:
redirect_destination = ex.request.url
# Get authorization code from redirected URL
auth_code = urlparse(redirect_destination).query.split("=")[1]
payload = "grant_type=authorization_code&code="
payload += auth_code
payload += "&redirect_uri=http%3A%2F%2Flocalhost&client_id="
payload += self.client_id
headers = {'content-type':'application/x-www-form-urlencoded'}
query = parse_qs(urlparse(redirect_destination).query)
auth_code = query.get("code")[0]
# Request for bearer token
token_response = requests.post( frappe.get_site_config().host_name +
"/api/method/frappe.integrations.oauth2.get_token", data=payload, headers=headers)
token_response = requests.post(
get_full_url("/api/method/frappe.integrations.oauth2.get_token"),
headers=self.form_header,
data={
"grant_type": "authorization_code",
"code": auth_code,
"redirect_uri": self.redirect_uri,
"client_id": self.client_id
}
)
# Parse bearer token json
bearer_token = token_response.json()
self.assertTrue(bearer_token.get("access_token"))
self.assertTrue(bearer_token.get("expires_in"))
self.assertTrue(bearer_token.get("id_token"))
# self.assertTrue(bearer_token.get("id_token"))
self.assertTrue(bearer_token.get("refresh_token"))
self.assertTrue(bearer_token.get("scope"))
self.assertTrue(bearer_token.get("token_type") == "Bearer")
@ -86,45 +92,49 @@ class TestOAuth20(unittest.TestCase):
frappe.db.commit()
session = requests.Session()
# Login
session.post(
frappe.get_site_config().host_name + "/api/method/login",
data={"usr":"test@example.com","pwd":"Eastern_43A1W"}
)
login(session)
redirect_destination = None
# Go to Authorize url
try:
session.get(
frappe.get_site_config().host_name + "/api/method/frappe.integrations.oauth2.authorize?client_id=" +
self.client_id +
"&scope=all%20openid&response_type=code&redirect_uri=http%3A%2F%2Flocalhost"
get_full_url("/api/method/frappe.integrations.oauth2.authorize"),
params={
"client_id": self.client_id,
"scope": self.scope,
"response_type": "code",
"redirect_uri": self.redirect_uri
}
)
except requests.exceptions.ConnectionError as ex:
redirect_destination = ex.request.url
# Get authorization code from redirected URL
auth_code = urlparse(redirect_destination).query.split("=")[1]
payload = "grant_type=authorization_code&code="
payload += auth_code
payload += "&redirect_uri=http%3A%2F%2Flocalhost&client_id="
payload += self.client_id
headers = {'content-type':'application/x-www-form-urlencoded'}
query = parse_qs(urlparse(redirect_destination).query)
auth_code = query.get("code")[0]
# Request for bearer token
token_response = requests.post( frappe.get_site_config().host_name +
"/api/method/frappe.integrations.oauth2.get_token", data=payload, headers=headers)
token_response = requests.post(
get_full_url("/api/method/frappe.integrations.oauth2.get_token"),
headers=self.form_header,
data={
"grant_type": "authorization_code",
"code": auth_code,
"redirect_uri": self.redirect_uri,
"client_id": self.client_id
}
)
# Parse bearer token json
bearer_token = token_response.json()
# Revoke Token
revoke_token_response = requests.post(frappe.get_site_config().host_name + "/api/method/frappe.integrations.oauth2.revoke_token",
data="token=" + bearer_token.get("access_token"), headers=headers)
revoke_token_response = requests.post(
get_full_url("/api/method/frappe.integrations.oauth2.revoke_token"),
headers=self.form_header,
data={"token": bearer_token.get("access_token")}
)
self.assertTrue(revoke_token_response.status_code == 200)
@ -138,18 +148,18 @@ class TestOAuth20(unittest.TestCase):
client.save()
frappe.db.commit()
# Set payload
payload = "grant_type=password"
payload += "&username=test@example.com"
payload += "&password=Eastern_43A1W"
payload += "&client_id=" + self.client_id
payload += "&scope=openid%20all"
headers = {'content-type':'application/x-www-form-urlencoded'}
# Request for bearer token
token_response = requests.post( frappe.get_site_config().host_name +
"/api/method/frappe.integrations.oauth2.get_token", data=payload, headers=headers)
token_response = requests.post(
get_full_url("/api/method/frappe.integrations.oauth2.get_token"),
headers=self.form_header,
data={
"grant_type": "password",
"username": "test@example.com",
"password": "Eastern_43A1W",
"client_id": self.client_id,
"scope": self.scope
}
)
# Parse bearer token json
bearer_token = token_response.json()
@ -158,7 +168,6 @@ class TestOAuth20(unittest.TestCase):
self.assertTrue(check_valid_openid_response(bearer_token.get("access_token")))
def test_login_using_implicit_token(self):
oauth_client = frappe.get_doc("OAuth Client", self.client_id)
oauth_client.grant_type = "Implicit"
oauth_client.response_type = "Token"
@ -166,43 +175,58 @@ class TestOAuth20(unittest.TestCase):
frappe.db.commit()
session = requests.Session()
# Login
session.post(
frappe.get_site_config().host_name + "/api/method/login",
data={"usr":"test@example.com","pwd":"Eastern_43A1W"}
)
login(session)
redirect_destination = None
# Go to Authorize url
try:
session.get(
frappe.get_site_config().host_name + "/api/method/frappe.integrations.oauth2.authorize?client_id=" +
self.client_id +
"&scope=all%20openid&response_type=token&redirect_uri=http%3A%2F%2Flocalhost"
get_full_url("/api/method/frappe.integrations.oauth2.authorize"),
params={
"client_id": self.client_id,
"scope": self.scope,
"response_type": "token",
"redirect_uri": self.redirect_uri
}
)
except requests.exceptions.ConnectionError as ex:
redirect_destination = ex.request.url
response_url = dict(parse_qs(urlparse(redirect_destination).fragment))
response_dict = parse_qs(urlparse(redirect_destination).fragment)
self.assertTrue(response_dict.get("access_token"))
self.assertTrue(response_dict.get("expires_in"))
self.assertTrue(response_dict.get("scope"))
self.assertTrue(response_dict.get("token_type"))
self.assertTrue(check_valid_openid_response(response_dict.get("access_token")[0]))
self.assertTrue(response_url.get("access_token"))
self.assertTrue(response_url.get("expires_in"))
self.assertTrue(response_url.get("scope"))
self.assertTrue(response_url.get("token_type"))
self.assertTrue(check_valid_openid_response(response_url.get("access_token")[0]))
def check_valid_openid_response(access_token=None):
# Returns True for valid response
"""Return True for valid response."""
# Use token in header
headers = {}
if access_token:
headers["Authorization"] = 'Bearer ' + access_token
headers["Authorization"] = "Bearer " + access_token
# check openid for email test@example.com
openid_response = requests.get(frappe.get_site_config().host_name +
"/api/method/frappe.integrations.oauth2.openid_profile", headers=headers)
openid_response = requests.get(
get_full_url("/api/method/frappe.integrations.oauth2.openid_profile"),
headers=headers
)
return True if openid_response.status_code == 200 else False
def get_full_url(endpoint):
return urljoin(frappe.utils.get_url(), endpoint)
def login(session):
session.post(
frappe.utils.get_url() + "/api/method/login",
data={
"usr": "test@example.com",
"pwd": "Eastern_43A1W"
}
)