seitime-frappe/frappe/integrations/google_oauth.py
phot0n 07a577af86 feat: google oauth for google emails
* used unique constraint on email_id in Email Account Doctype
2022-07-13 12:05:46 +05:30

165 lines
4.5 KiB
Python

import json
from typing import Dict, Union
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from requests import get, post
import frappe
CALLBACK_METHOD = "/api/method/frappe.integrations.google_oauth.callback"
_SCOPES = {
"mail": ("https://mail.google.com/"),
"contacts": ("https://www.googleapis.com/auth/contacts"),
"drive": ("https://www.googleapis.com/auth/drive"),
"indexing": ("https://www.googleapis.com/auth/indexing"),
}
_SERVICES = {
"contacts": ("people", "v1"),
"drive": ("drive", "v3"),
"indexing": ("indexing", "v3"),
}
class GoogleAuthenticationError(Exception):
pass
class GoogleOAuth:
OAUTH_URL = "https://oauth2.googleapis.com/token"
def __init__(self, domain: str, validate: bool = True):
self.google_settings = frappe.get_single("Google Settings")
self.domain = domain.lower()
self.scopes = (
" ".join(_SCOPES[self.domain])
if isinstance(_SCOPES[self.domain], (list, tuple))
else _SCOPES[self.domain]
)
if validate:
self.validate_google_settings()
def validate_google_settings(self):
if not self.google_settings.enable:
frappe.throw(frappe._("Please enable Google Settings before continuing."))
if not (self.google_settings.client_id and self.google_settings.client_secret):
frappe.throw(frappe._("Please update Google Settings before continuing."))
def authorize(self, oauth_code: str, site_address: str) -> Dict[str, Union[str, int]]:
data = {
"code": oauth_code,
"client_id": self.google_settings.client_id,
"client_secret": self.google_settings.get_password(
fieldname="client_secret", raise_exception=False
),
"grant_type": "authorization_code",
"scope": self.scopes,
"redirect_uri": site_address + CALLBACK_METHOD,
}
return handle_response(
post(self.OAUTH_URL, data=data).json(),
"Google Oauth Authorization Error",
"Something went wrong during the authorization.",
)
def refresh_access_token(self, refresh_token: str) -> Dict[str, Union[str, int]]:
data = {
"client_id": self.google_settings.client_id,
"client_secret": self.google_settings.get_password(
fieldname="client_secret", raise_exception=False
),
"refresh_token": refresh_token,
"grant_type": "refresh_token",
"scope": self.scopes,
}
return handle_response(
post(self.OAUTH_URL, data=data).json(),
"Google Oauth Access Token Refresh Error",
"Something went wrong during the access token generation.",
raise_err=True,
)
def get_authentication_url(
self, site_address: str, state: Dict[str, str] = None
) -> Dict[str, str]:
"""Return authentication url with the client id and redirect uri."""
state = json.dumps(state)
callback_url = site_address + CALLBACK_METHOD
return {
"url": "https://accounts.google.com/o/oauth2/v2/auth?"
+ "access_type=offline&response_type=code&prompt=consent&include_granted_scopes=true&"
+ "client_id={0}&scope={1}&redirect_uri={2}&state={3}".format(
self.google_settings.client_id, self.scopes, callback_url, state
)
}
def get_google_service_object(self, access_token: str, refresh_token: str):
credentials_dict = {
"token": access_token,
"refresh_token": refresh_token,
"token_uri": self.OAUTH_URL,
"client_id": self.google_settings.client_id,
"client_secret": self.google_settings.get_password(
fieldname="client_secret", raise_exception=False
),
"scopes": self.scopes,
}
return build(
serviceName=_SERVICES[self.domain][0],
version=_SERVICES[self.domain][1],
credentials=Credentials(**credentials_dict),
static_discovery=False,
)
def handle_response(
response: Dict[str, Union[str, int]],
error_title: str,
error_message: str,
raise_err: bool = False,
):
if "error" in response:
frappe.log_error(
frappe._(error_title), frappe._(response.get("error_description", error_message))
)
if raise_err:
frappe.throw(frappe._(error_title), frappe._(error_message))
return {}
return response
def is_valid_access_token(access_token: str) -> bool:
response = get(
"https://oauth2.googleapis.com/tokeninfo", params={"access_token": access_token}
).json()
if "error" in response:
return False
return True
@frappe.whitelist(methods=["GET"])
def callback(state: str, code: str = None, error: str = None) -> None:
state = json.loads(state)
redirect = state.pop("redirect", "/app")
if not error:
state.update({"code": code})
frappe.get_attr(state.pop("method"))(**state)
# GET request, hence using commit to persist changes
frappe.db.commit()
frappe.local.response["type"] = "redirect"
frappe.local.response["location"] = redirect