seitime-frappe/frappe/integrations/google_oauth.py

182 lines
5.3 KiB
Python

import json
from typing import Dict, Union
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from requests import get, post
import frappe
from frappe.utils import get_request_site_address
CALLBACK_METHOD = "/api/method/frappe.integrations.google_oauth.callback"
_SCOPES = {
"mail": ("https://mail.google.com/"),
"contacts": ("https://www.googleapis.com/auth/contacts"),
"drive": ("https://www.googleapis.com/auth/drive"),
"indexing": ("https://www.googleapis.com/auth/indexing"),
}
_SERVICES = {
"contacts": ("people", "v1"),
"drive": ("drive", "v3"),
"indexing": ("indexing", "v3"),
}
class GoogleAuthenticationError(Exception):
pass
class GoogleOAuth:
OAUTH_URL = "https://oauth2.googleapis.com/token"
def __init__(self, domain: str, validate: bool = True):
self.google_settings = frappe.get_single("Google Settings")
self.domain = domain.lower()
self.scopes = (
" ".join(_SCOPES[self.domain])
if isinstance(_SCOPES[self.domain], (list, tuple))
else _SCOPES[self.domain]
)
if validate:
self.validate_google_settings()
def validate_google_settings(self):
if not self.google_settings.enable:
frappe.throw(frappe._("Please enable Google Settings before continuing."))
if not (self.google_settings.client_id and self.google_settings.client_secret):
frappe.throw(frappe._("Please update Google Settings before continuing."))
def authorize(self, oauth_code: str) -> Dict[str, Union[str, int]]:
"""Returns a dict with access and refresh token.
:param oauth_code: code got back from google upon successful auhtorization
:param site_address: side address from which the request is being made
"""
data = {
"code": oauth_code,
"client_id": self.google_settings.client_id,
"client_secret": self.google_settings.get_password(
fieldname="client_secret", raise_exception=False
),
"grant_type": "authorization_code",
"scope": self.scopes,
"redirect_uri": get_request_site_address(True) + CALLBACK_METHOD,
}
return handle_response(
post(self.OAUTH_URL, data=data).json(),
"Google Oauth Authorization Error",
"Something went wrong during the authorization.",
)
def refresh_access_token(self, refresh_token: str) -> Dict[str, Union[str, int]]:
"""Refreshes google access token using refresh token"""
data = {
"client_id": self.google_settings.client_id,
"client_secret": self.google_settings.get_password(
fieldname="client_secret", raise_exception=False
),
"refresh_token": refresh_token,
"grant_type": "refresh_token",
"scope": self.scopes,
}
return handle_response(
post(self.OAUTH_URL, data=data).json(),
"Google Oauth Access Token Refresh Error",
"Something went wrong during the access token generation.",
raise_err=True,
)
def get_authentication_url(self, state: Dict[str, str]) -> Dict[str, str]:
"""Returns google authentication url.
:param site_address: side address from which the request is being made (for redirect back to site)
:param state: [optional] dict of values which you need on callback (for calling methods, redirection back to the form, doc name, etc)
"""
state = json.dumps(state)
callback_url = get_request_site_address(True) + CALLBACK_METHOD
return {
"url": "https://accounts.google.com/o/oauth2/v2/auth?"
+ "access_type=offline&response_type=code&prompt=consent&include_granted_scopes=true&"
+ "client_id={0}&scope={1}&redirect_uri={2}&state={3}".format(
self.google_settings.client_id, self.scopes, callback_url, state
)
}
def get_google_service_object(self, access_token: str, refresh_token: str):
"""Returns google service object"""
credentials_dict = {
"token": access_token,
"refresh_token": refresh_token,
"token_uri": self.OAUTH_URL,
"client_id": self.google_settings.client_id,
"client_secret": self.google_settings.get_password(
fieldname="client_secret", raise_exception=False
),
"scopes": self.scopes,
}
return build(
serviceName=_SERVICES[self.domain][0],
version=_SERVICES[self.domain][1],
credentials=Credentials(**credentials_dict),
static_discovery=False,
)
def handle_response(
response: Dict[str, Union[str, int]],
error_title: str,
error_message: str,
raise_err: bool = False,
):
if "error" in response:
frappe.log_error(
frappe._(error_title), frappe._(response.get("error_description", error_message))
)
if raise_err:
frappe.throw(frappe._(error_title), GoogleAuthenticationError, frappe._(error_message))
return {}
return response
def is_valid_access_token(access_token: str) -> bool:
response = get(
"https://oauth2.googleapis.com/tokeninfo", params={"access_token": access_token}
).json()
if "error" in response:
return False
return True
@frappe.whitelist(methods=["GET"])
def callback(state: str, code: str = None, error: str = None) -> None:
"""Common callback for google integrations.
Invokes functions using `frappe.get_attr` and also adds required (keyworded) arguments
along with committing and redirecting us back to frappe site."""
state = json.loads(state)
redirect = state.pop("redirect", "/app")
if not error:
state.update({"code": code})
frappe.get_attr(state.pop("method"))(**state)
# GET request, hence using commit to persist changes
frappe.db.commit()
frappe.local.response["type"] = "redirect"
frappe.local.response["location"] = redirect