From ead0cb58706b9991dfd90c92fc573e0056a80dfc Mon Sep 17 00:00:00 2001 From: Gavin D'souza Date: Mon, 17 Mar 2025 19:18:00 +0100 Subject: [PATCH] refactor: Google Calendar * Simplify logical flows based on flow of data & transactions * Reduce indents * Remove redundant blocks * Make code less brittle in redirect_uri generation * Add typing wherever valuable --- .../google_calendar/google_calendar.py | 229 +++++++++--------- 1 file changed, 110 insertions(+), 119 deletions(-) diff --git a/frappe/integrations/doctype/google_calendar/google_calendar.py b/frappe/integrations/doctype/google_calendar/google_calendar.py index 9a28314ee3..cb2c146af3 100644 --- a/frappe/integrations/doctype/google_calendar/google_calendar.py +++ b/frappe/integrations/doctype/google_calendar/google_calendar.py @@ -2,8 +2,9 @@ # License: MIT. See LICENSE -from urllib.parse import quote +from contextlib import suppress from datetime import date, datetime, timedelta +from math import ceil from typing import TypedDict from zoneinfo import ZoneInfo @@ -14,7 +15,7 @@ from googleapiclient.discovery import build from googleapiclient.errors import HttpError import frappe -from frappe import _ +from frappe import _, _lt from frappe.integrations.google_oauth import GoogleOAuth from frappe.model.document import Document from frappe.utils import ( @@ -72,6 +73,9 @@ framework_days = { } +allow_google_calendar_label = frappe.bold(_lt("Allow Google Calendar Access")) + + class GoogleCalendar(Document): # begin: auto-generated types # This code is auto-generated. Do not modify anything in this block. @@ -107,8 +111,9 @@ class GoogleCalendar(Document): google_settings = self.validate() if not self.refresh_token: - button_label = frappe.bold(_("Allow Google Calendar Access")) - raise frappe.ValidationError(_("Click on {0} to generate Refresh Token.").format(button_label)) + raise frappe.ValidationError( + _("Click on {0} to generate Refresh Token.").format(allow_google_calendar_label) + ) data = { "client_id": google_settings.client_id, @@ -121,69 +126,64 @@ class GoogleCalendar(Document): try: r = requests.post(GoogleOAuth.OAUTH_URL, data=data).json() except requests.exceptions.HTTPError: - button_label = frappe.bold(_("Allow Google Calendar Access")) frappe.throw( _( "Something went wrong during the token generation. Click on {0} to generate a new one." - ).format(button_label) + ).format(allow_google_calendar_label) ) return r.get("access_token") @frappe.whitelist() -def authorize_access(g_calendar, reauthorize=None): +def authorize_access(g_calendar: str, reauthorize: bool = False): """ If no Authorization code get it from Google and then request for Refresh Token. Google Calendar Name is set to flags to set_value after Authorization Code is obtained. """ - google_settings = frappe.get_doc("Google Settings") google_calendar = frappe.get_doc("Google Calendar", g_calendar) google_calendar.check_permission("write") google_settings = frappe.get_cached_doc("Google Settings") redirect_uri = ( - get_request_site_address(True) - + "?cmd=frappe.integrations.doctype.google_calendar.google_calendar.google_callback" + f"{get_request_site_address(full_address=True)}" + f"?cmd={google_callback.__module__}.{google_callback.__qualname__}" ) if not google_calendar.authorization_code or reauthorize: frappe.cache.hset("google_calendar", "google_calendar", google_calendar.name) return get_authentication_url(client_id=google_settings.client_id, redirect_uri=redirect_uri) - else: - try: - data = { - "code": google_calendar.get_password(fieldname="authorization_code", raise_exception=False), - "client_id": google_settings.client_id, - "client_secret": google_settings.get_password( - fieldname="client_secret", raise_exception=False - ), - "redirect_uri": redirect_uri, - "grant_type": "authorization_code", - } - r = requests.post(GoogleOAuth.OAUTH_URL, data=data).json() - if "refresh_token" in r: - frappe.db.set_value( - "Google Calendar", google_calendar.name, "refresh_token", r.get("refresh_token") - ) - frappe.db.commit() + data = { + "code": google_calendar.get_password(fieldname="authorization_code", raise_exception=False), + "client_id": google_settings.client_id, + "client_secret": google_settings.get_password(fieldname="client_secret", raise_exception=False), + "redirect_uri": redirect_uri, + "grant_type": "authorization_code", + } - frappe.local.response["type"] = "redirect" - frappe.local.response["location"] = "/app/Form/{}/{}".format( - quote("Google Calendar"), quote(google_calendar.name) - ) + try: + r = requests.post(GoogleOAuth.OAUTH_URL, data=data).json() + except Exception as e: + frappe.throw(e) - frappe.msgprint(_("Google Calendar has been configured.")) - except Exception as e: - frappe.throw(e) + if "refresh_token" in r: + frappe.db.set_value("Google Calendar", google_calendar.name, "refresh_token", r["refresh_token"]) + frappe.db.commit() + + frappe.local.response["type"] = "redirect" + frappe.local.response["location"] = google_calendar.get_url() + + frappe.msgprint(_("Google Calendar has been configured."), indicator="green") def get_authentication_url(client_id=None, redirect_uri=None): return { - "url": "https://accounts.google.com/o/oauth2/v2/auth?access_type=offline&response_type=code&prompt=consent&client_id={}&include_granted_scopes=true&scope={}&redirect_uri={}".format( - client_id, SCOPES, redirect_uri + "url": ( + "https://accounts.google.com/o/oauth2/v2/auth?" + f"access_type=offline&response_type=code&prompt=consent&client_id={client_id}" + f"&include_granted_scopes=true&scope={SCOPES}&redirect_uri={redirect_uri}" ) } @@ -218,52 +218,53 @@ def get_google_calendar_object(g_calendar): google_settings = frappe.get_cached_doc("Google Settings") account: GoogleCalendar = frappe.get_doc("Google Calendar", g_calendar) - credentials_dict = { - "token": account.get_access_token(), - "refresh_token": account.get_password(fieldname="refresh_token", raise_exception=False), - "token_uri": GoogleOAuth.OAUTH_URL, - "client_id": google_settings.client_id, - "client_secret": google_settings.get_password(fieldname="client_secret", raise_exception=False), - "scopes": [SCOPES], - } - - credentials = google.oauth2.credentials.Credentials(**credentials_dict) + credentials = google.oauth2.credentials.Credentials( + token=account.get_access_token(), + refresh_token=account.get_password(fieldname="refresh_token", raise_exception=False), + token_uri=GoogleOAuth.OAUTH_URL, + client_id=google_settings.client_id, + client_secret=google_settings.get_password(fieldname="client_secret", raise_exception=False), + scopes=[SCOPES], + ) google_calendar = build( serviceName="calendar", version="v3", credentials=credentials, static_discovery=False ) - check_google_calendar(account, google_calendar) + check_google_calendar(account.reload(), google_calendar) - account.load_from_db() - return google_calendar, account + return google_calendar, account.reload() -def check_google_calendar(account, google_calendar): +def check_google_calendar(account: GoogleCalendar, google_calendar): """ Checks if Google Calendar is present with the specified name. If not, creates one. """ - account.load_from_db() - try: - if account.google_calendar_id: - google_calendar.calendars().get(calendarId=account.google_calendar_id).execute() - else: - # If no Calendar ID create a new Calendar - calendar = { - "summary": account.calendar_name, - "timeZone": frappe.get_system_settings("time_zone"), - } - created_calendar = google_calendar.calendars().insert(body=calendar).execute() - frappe.db.set_value( - "Google Calendar", account.name, "google_calendar_id", created_calendar.get("id") + if account.google_calendar_id: + try: + return google_calendar.calendars().get(calendarId=account.google_calendar_id).execute() + except HttpError as err: + frappe.throw( + _("Google Calendar - Could not find Calendar for {0}, error code {1}.").format( + account.name, err.resp.status + ) ) - frappe.db.commit() + + # If no Calendar ID create a new Calendar + calendar = { + "summary": account.calendar_name, + "timeZone": frappe.get_system_settings("time_zone"), + } + try: + created_calendar = google_calendar.calendars().insert(body=calendar).execute() except HttpError as err: frappe.throw( _("Google Calendar - Could not create Calendar for {0}, error code {1}.").format( account.name, err.resp.status ) ) + account.db_set("google_calendar_id", created_calendar.get("id")) + frappe.db.commit() def sync_events_from_google_calendar(g_calendar, method=None): @@ -318,17 +319,15 @@ def sync_events_from_google_calendar(g_calendar, method=None): for idx, event in enumerate(results): frappe.publish_realtime( - "import_google_calendar", dict(progress=idx + 1, total=len(results)), user=frappe.session.user + "import_google_calendar", {"progress": idx + 1, "total": len(results)}, user=frappe.session.user ) # If Google Calendar Event if confirmed, then create an Event if event.get("status") == "confirmed": recurrence = None if event.get("recurrence"): - try: + with suppress(IndexError): recurrence = event.get("recurrence")[0] - except IndexError: - pass if not frappe.db.exists("Event", {"google_calendar_event_id": event.get("id")}): insert_event_to_calendar(account, event, recurrence) @@ -359,8 +358,6 @@ def sync_events_from_google_calendar(g_calendar, method=None): "content": " - Event deleted from Google Calendar.", } ).insert(ignore_permissions=True) - else: - pass if not results: return _("No Google Calendar Event to sync.") @@ -583,28 +580,23 @@ def delete_event_from_google_calendar(doc, method=None): ) -def google_calendar_to_repeat_on(start, end, recurrence=None): +def parse_google_calendar_date(dt): + if dt.get("date"): + return get_datetime(dt.get("date")) + return parser.parse(dt.get("dateTime")).astimezone(ZoneInfo(get_system_timezone())).replace(tzinfo=None) + + +def google_calendar_to_repeat_on(*, start, end, recurrence=None): """ recurrence is in the form ['RRULE:FREQ=WEEKLY;BYDAY=MO,TU,TH'] has the frequency and then the days on which the event recurs Both have been mapped in a dict for easier mapping. """ + repeat_on = { - "starts_on": ( - get_datetime(start.get("date")) - if start.get("date") - else parser.parse(start.get("dateTime")) - .astimezone(ZoneInfo(get_system_timezone())) - .replace(tzinfo=None) - ), - "ends_on": ( - get_datetime(end.get("date")) - if end.get("date") - else parser.parse(end.get("dateTime")) - .astimezone(ZoneInfo(get_system_timezone())) - .replace(tzinfo=None) - ), + "starts_on": parse_google_calendar_date(start), + "ends_on": parse_google_calendar_date(end), "all_day": 1 if start.get("date") else 0, "repeat_this_event": 1 if recurrence else 0, "repeat_on": None, @@ -619,44 +611,45 @@ def google_calendar_to_repeat_on(start, end, recurrence=None): } # recurrence rule "RRULE:FREQ=WEEKLY;BYDAY=MO,TU,TH" - if recurrence: - # google_calendar_frequency = RRULE:FREQ=WEEKLY, byday = BYDAY=MO,TU,TH, until = 20191028 - google_calendar_frequency, until, byday = get_recurrence_parameters(recurrence) - repeat_on["repeat_on"] = google_calendar_frequencies.get(google_calendar_frequency) + if not recurrence: + return repeat_on - if repeat_on["repeat_on"] == "Daily": - repeat_on["ends_on"] = None - repeat_on["repeat_till"] = datetime.strptime(until, "%Y%m%d") if until else None + # google_calendar_frequency = RRULE:FREQ=WEEKLY, byday = BYDAY=MO,TU,TH, until = 20191028 + google_calendar_frequency, until, byday = get_recurrence_parameters(recurrence) + repeat_on["repeat_on"] = google_calendar_frequencies.get(google_calendar_frequency) - if byday and repeat_on["repeat_on"] == "Weekly": - repeat_on["repeat_till"] = datetime.strptime(until, "%Y%m%d") if until else None - byday = byday.split("=")[1].split(",") - for repeat_day in byday: - repeat_on[google_calendar_days[repeat_day]] = 1 + if repeat_on["repeat_on"] == "Daily": + repeat_on["ends_on"] = None + repeat_on["repeat_till"] = until - if byday and repeat_on["repeat_on"] == "Monthly": - byday = byday.split("=")[1] - repeat_day_week_number, repeat_day_name = None, None + if byday and repeat_on["repeat_on"] == "Weekly": + repeat_on["repeat_till"] = until + for repeat_day in byday: + repeat_on[google_calendar_days[repeat_day]] = 1 - for num in ["-2", "-1", "1", "2", "3", "4", "5"]: - if num in byday: - repeat_day_week_number = num - break + if byday and repeat_on["repeat_on"] == "Monthly": + byday = byday.split("=")[1] + repeat_day_week_number, repeat_day_name = None, None - for day in ["MO", "TU", "WE", "TH", "FR", "SA", "SU"]: - if day in byday: - repeat_day_name = google_calendar_days.get(day) - break + for num in ["-2", "-1", "1", "2", "3", "4", "5"]: + if num in byday: + repeat_day_week_number = num + break - # Only Set starts_on for the event to repeat monthly - start_date = parse_google_calendar_recurrence_rule(int(repeat_day_week_number), repeat_day_name) - repeat_on["starts_on"] = start_date - repeat_on["ends_on"] = add_to_date(start_date, minutes=5) - repeat_on["repeat_till"] = datetime.strptime(until, "%Y%m%d") if until else None + for day in ["MO", "TU", "WE", "TH", "FR", "SA", "SU"]: + if day in byday: + repeat_day_name = google_calendar_days.get(day) + break - if repeat_on["repeat_till"] == "Yearly": - repeat_on["ends_on"] = None - repeat_on["repeat_till"] = datetime.strptime(until, "%Y%m%d") if until else None + # Only Set starts_on for the event to repeat monthly + start_date = parse_google_calendar_recurrence_rule(int(repeat_day_week_number), repeat_day_name) + repeat_on["starts_on"] = start_date + repeat_on["ends_on"] = add_to_date(start_date, minutes=5) + repeat_on["repeat_till"] = until + + if repeat_on["repeat_till"] == "Yearly": + repeat_on["ends_on"] = None + repeat_on["repeat_till"] = until return repeat_on @@ -730,13 +723,11 @@ def repeat_on_to_google_calendar_recurrence_rule(doc): return [recurrence] -def get_week_number(dt): +def get_week_number(dt: date): """Return the week number of the month for the specified date. https://stackoverflow.com/questions/3806473/python-week-number-of-the-month/16804556 """ - from math import ceil - first_day = dt.replace(day=1) dom = dt.day