fix(tests): OAuth 2.0 tests

This commit is contained in:
Revant Nandgaonkar 2019-10-21 13:21:20 +05:30
parent 7c50b40c4b
commit 953d9b8488
3 changed files with 98 additions and 61 deletions

View file

@ -11,6 +11,6 @@
"redirect_uris": "http://localhost",
"response_type": "Code",
"scopes": "all openid",
"skip_authorization": 0
"skip_authorization": 1
}
]

View file

@ -12,7 +12,7 @@ import base64
class TestAPI(unittest.TestCase):
def test_insert_many(self):
server = FrappeClient(frappe.get_site_config().host_name, "Administrator", "admin", verify=False)
frappe.db.sql('delete from `tabNote` where title in ("Sing","a","song","of","sixpence")')
frappe.db.sql("delete from `tabNote` where title in ('Sing','a','song','of','sixpence')")
frappe.db.commit()
server.insert_many([
@ -31,7 +31,7 @@ class TestAPI(unittest.TestCase):
def test_create_doc(self):
server = FrappeClient(frappe.get_site_config().host_name, "Administrator", "admin", verify=False)
frappe.db.sql('delete from `tabNote` where title = "test_create"')
frappe.db.sql("delete from `tabNote` where title = 'test_create'")
frappe.db.commit()
server.insert({"doctype": "Note", "public": True, "title": "test_create"})
@ -46,7 +46,7 @@ class TestAPI(unittest.TestCase):
def test_get_doc(self):
server = FrappeClient(frappe.get_site_config().host_name, "Administrator", "admin", verify=False)
frappe.db.sql('delete from `tabNote` where title = "get_this"')
frappe.db.sql("delete from `tabNote` where title = 'get_this'")
frappe.db.commit()
server.insert_many([
@ -57,7 +57,7 @@ class TestAPI(unittest.TestCase):
def test_update_doc(self):
server = FrappeClient(frappe.get_site_config().host_name, "Administrator", "admin", verify=False)
frappe.db.sql('delete from `tabNote` where title = "sing"')
frappe.db.sql("delete from `tabNote` where title in ('Sing','sing')")
frappe.db.commit()
server.insert({"doctype":"Note", "public": True, "title": "Sing"})
@ -69,7 +69,7 @@ class TestAPI(unittest.TestCase):
def test_delete_doc(self):
server = FrappeClient(frappe.get_site_config().host_name, "Administrator", "admin", verify=False)
frappe.db.sql('delete from `tabNote` where title = "delete"')
frappe.db.sql("delete from `tabNote` where title = 'delete'")
frappe.db.commit()
server.insert_many([

View file

@ -4,12 +4,10 @@ from __future__ import unicode_literals
import unittest, frappe, requests, time
from frappe.test_runner import make_test_records
from frappe.utils.selenium_testdriver import TestDriver
from six.moves.urllib.parse import urlparse, parse_qs
class TestOAuth20(unittest.TestCase):
def setUp(self):
self.driver = TestDriver()
make_test_records("OAuth Client")
make_test_records("User")
self.client_id = frappe.get_all("OAuth Client", fields=["*"])[0].get("client_id")
@ -23,41 +21,40 @@ class TestOAuth20(unittest.TestCase):
frappe_login_key.base_url = "http://localhost:8000"
frappe_login_key.enable_social_login = 0
frappe_login_key.save()
frappe.db.commit()
def test_invalid_login(self):
self.assertFalse(check_valid_openid_response())
def test_login_using_authorization_code(self):
client = frappe.get_doc("OAuth Client", self.client_id)
client.grant_type = "Authorization Code"
client.response_type = "Code"
client.save()
frappe.db.commit()
# Go to Authorize url
self.driver.get(
"api/method/frappe.integrations.oauth2.authorize?client_id=" +
self.client_id +
"&scope=all%20openid&response_type=code&redirect_uri=http%3A%2F%2Flocalhost"
)
time.sleep(2)
session = requests.Session()
# Login
username = self.driver.find("#login_email")[0]
username.send_keys("test@example.com")
session.post(
frappe.get_site_config().host_name + "/api/method/login",
data={"usr":"test@example.com","pwd":"Eastern_43A1W"}
)
password = self.driver.find("#login_password")[0]
password.send_keys("Eastern_43A1W")
redirect_destination = None
sign_in = self.driver.find(".btn-login")[0]
sign_in.submit()
time.sleep(2)
# Allow access to resource
allow = self.driver.find("#allow")[0]
allow.click()
time.sleep(2)
# Go to Authorize url
try:
session.get(
frappe.get_site_config().host_name + "/api/method/frappe.integrations.oauth2.authorize?client_id=" +
self.client_id +
"&scope=all%20openid&response_type=code&redirect_uri=http%3A%2F%2Flocalhost"
)
except requests.exceptions.ConnectionError as ex:
redirect_destination = ex.request.url
# Get authorization code from redirected URL
auth_code = urlparse(self.driver.driver.current_url).query.split("=")[1]
auth_code = urlparse(redirect_destination).query.split("=")[1]
payload = "grant_type=authorization_code&code="
payload += auth_code
@ -81,15 +78,66 @@ class TestOAuth20(unittest.TestCase):
self.assertTrue(bearer_token.get("token_type") == "Bearer")
self.assertTrue(check_valid_openid_response(bearer_token.get("access_token")))
def test_revoke_token(self):
client = frappe.get_doc("OAuth Client", self.client_id)
client.grant_type = "Authorization Code"
client.response_type = "Code"
client.save()
frappe.db.commit()
session = requests.Session()
# Login
session.post(
frappe.get_site_config().host_name + "/api/method/login",
data={"usr":"test@example.com","pwd":"Eastern_43A1W"}
)
redirect_destination = None
# Go to Authorize url
try:
session.get(
frappe.get_site_config().host_name + "/api/method/frappe.integrations.oauth2.authorize?client_id=" +
self.client_id +
"&scope=all%20openid&response_type=code&redirect_uri=http%3A%2F%2Flocalhost"
)
except requests.exceptions.ConnectionError as ex:
redirect_destination = ex.request.url
# Get authorization code from redirected URL
auth_code = urlparse(redirect_destination).query.split("=")[1]
payload = "grant_type=authorization_code&code="
payload += auth_code
payload += "&redirect_uri=http%3A%2F%2Flocalhost&client_id="
payload += self.client_id
headers = {'content-type':'application/x-www-form-urlencoded'}
# Request for bearer token
token_response = requests.post( frappe.get_site_config().host_name +
"/api/method/frappe.integrations.oauth2.get_token", data=payload, headers=headers)
# Parse bearer token json
bearer_token = token_response.json()
# Revoke Token
revoke_token_response = requests.post(frappe.get_site_config().host_name + "/api/method/frappe.integrations.oauth2.revoke_token",
data="token=" + bearer_token.get("access_token"))
data="token=" + bearer_token.get("access_token"), headers=headers)
self.assertTrue(revoke_token_response.status_code == 200)
# Check revoked token
self.assertFalse(check_valid_openid_response(bearer_token.get("access_token")))
def test_resource_owner_password_credentials_grant(self):
client = frappe.get_doc("OAuth Client", self.client_id)
client.grant_type = "Authorization Code"
client.response_type = "Code"
client.save()
frappe.db.commit()
# Set payload
payload = "grant_type=password"
payload += "&username=test@example.com"
@ -117,44 +165,33 @@ class TestOAuth20(unittest.TestCase):
oauth_client.save()
frappe.db.commit()
# Go to Authorize url
self.driver.get(
"api/method/frappe.integrations.oauth2.authorize?client_id=" +
self.client_id +
"&scope=all%20openid&response_type=token&redirect_uri=http%3A%2F%2Flocalhost"
)
time.sleep(2)
session = requests.Session()
# Login
username = self.driver.find("#login_email")[0]
username.send_keys("test@example.com")
session.post(
frappe.get_site_config().host_name + "/api/method/login",
data={"usr":"test@example.com","pwd":"Eastern_43A1W"}
)
password = self.driver.find("#login_password")[0]
password.send_keys("Eastern_43A1W")
redirect_destination = None
sign_in = self.driver.find(".btn-login")[0]
sign_in.submit()
# Go to Authorize url
try:
session.get(
frappe.get_site_config().host_name + "/api/method/frappe.integrations.oauth2.authorize?client_id=" +
self.client_id +
"&scope=all%20openid&response_type=token&redirect_uri=http%3A%2F%2Flocalhost"
)
except requests.exceptions.ConnectionError as ex:
redirect_destination = ex.request.url
time.sleep(2)
# Allow access to resource
allow = self.driver.find("#allow")[0]
allow.click()
time.sleep(2)
# Get token from redirected URL
response_url = dict(parse_qs(urlparse(self.driver.driver.current_url).fragment))
response_url = dict(parse_qs(urlparse(redirect_destination).fragment))
self.assertTrue(response_url.get("access_token"))
self.assertTrue(response_url.get("expires_in"))
self.assertTrue(response_url.get("scope"))
self.assertTrue(response_url.get("token_type"))
self.assertTrue(check_valid_openid_response(response_url.get("access_token")))
def tearDown(self):
self.driver.close()
self.assertTrue(check_valid_openid_response(response_url.get("access_token")[0]))
def check_valid_openid_response(access_token=None):
# Returns True for valid response
@ -162,7 +199,7 @@ def check_valid_openid_response(access_token=None):
# Use token in header
headers = {}
if access_token:
headers["Authorization"] = 'Bearer' + access_token
headers["Authorization"] = 'Bearer ' + access_token
# check openid for email test@example.com
openid_response = requests.get(frappe.get_site_config().host_name +