refactor: Google Calendar

* Simplify logical flows based on flow of data & transactions
 * Reduce indents
 * Remove redundant blocks
 * Make code less brittle in redirect_uri generation
 * Add typing wherever valuable
This commit is contained in:
Gavin D'souza 2025-03-17 19:18:00 +01:00
parent e9691a1b08
commit ead0cb5870
No known key found for this signature in database
GPG key ID: 3A7BF4D4340DE6F7

View file

@ -2,8 +2,9 @@
# License: MIT. See LICENSE
from urllib.parse import quote
from contextlib import suppress
from datetime import date, datetime, timedelta
from math import ceil
from typing import TypedDict
from zoneinfo import ZoneInfo
@ -14,7 +15,7 @@ from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import frappe
from frappe import _
from frappe import _, _lt
from frappe.integrations.google_oauth import GoogleOAuth
from frappe.model.document import Document
from frappe.utils import (
@ -72,6 +73,9 @@ framework_days = {
}
allow_google_calendar_label = frappe.bold(_lt("Allow Google Calendar Access"))
class GoogleCalendar(Document):
# begin: auto-generated types
# This code is auto-generated. Do not modify anything in this block.
@ -107,8 +111,9 @@ class GoogleCalendar(Document):
google_settings = self.validate()
if not self.refresh_token:
button_label = frappe.bold(_("Allow Google Calendar Access"))
raise frappe.ValidationError(_("Click on {0} to generate Refresh Token.").format(button_label))
raise frappe.ValidationError(
_("Click on {0} to generate Refresh Token.").format(allow_google_calendar_label)
)
data = {
"client_id": google_settings.client_id,
@ -121,69 +126,64 @@ class GoogleCalendar(Document):
try:
r = requests.post(GoogleOAuth.OAUTH_URL, data=data).json()
except requests.exceptions.HTTPError:
button_label = frappe.bold(_("Allow Google Calendar Access"))
frappe.throw(
_(
"Something went wrong during the token generation. Click on {0} to generate a new one."
).format(button_label)
).format(allow_google_calendar_label)
)
return r.get("access_token")
@frappe.whitelist()
def authorize_access(g_calendar, reauthorize=None):
def authorize_access(g_calendar: str, reauthorize: bool = False):
"""
If no Authorization code get it from Google and then request for Refresh Token.
Google Calendar Name is set to flags to set_value after Authorization Code is obtained.
"""
google_settings = frappe.get_doc("Google Settings")
google_calendar = frappe.get_doc("Google Calendar", g_calendar)
google_calendar.check_permission("write")
google_settings = frappe.get_cached_doc("Google Settings")
redirect_uri = (
get_request_site_address(True)
+ "?cmd=frappe.integrations.doctype.google_calendar.google_calendar.google_callback"
f"{get_request_site_address(full_address=True)}"
f"?cmd={google_callback.__module__}.{google_callback.__qualname__}"
)
if not google_calendar.authorization_code or reauthorize:
frappe.cache.hset("google_calendar", "google_calendar", google_calendar.name)
return get_authentication_url(client_id=google_settings.client_id, redirect_uri=redirect_uri)
else:
try:
data = {
"code": google_calendar.get_password(fieldname="authorization_code", raise_exception=False),
"client_id": google_settings.client_id,
"client_secret": google_settings.get_password(
fieldname="client_secret", raise_exception=False
),
"redirect_uri": redirect_uri,
"grant_type": "authorization_code",
}
r = requests.post(GoogleOAuth.OAUTH_URL, data=data).json()
if "refresh_token" in r:
frappe.db.set_value(
"Google Calendar", google_calendar.name, "refresh_token", r.get("refresh_token")
)
frappe.db.commit()
data = {
"code": google_calendar.get_password(fieldname="authorization_code", raise_exception=False),
"client_id": google_settings.client_id,
"client_secret": google_settings.get_password(fieldname="client_secret", raise_exception=False),
"redirect_uri": redirect_uri,
"grant_type": "authorization_code",
}
frappe.local.response["type"] = "redirect"
frappe.local.response["location"] = "/app/Form/{}/{}".format(
quote("Google Calendar"), quote(google_calendar.name)
)
try:
r = requests.post(GoogleOAuth.OAUTH_URL, data=data).json()
except Exception as e:
frappe.throw(e)
frappe.msgprint(_("Google Calendar has been configured."))
except Exception as e:
frappe.throw(e)
if "refresh_token" in r:
frappe.db.set_value("Google Calendar", google_calendar.name, "refresh_token", r["refresh_token"])
frappe.db.commit()
frappe.local.response["type"] = "redirect"
frappe.local.response["location"] = google_calendar.get_url()
frappe.msgprint(_("Google Calendar has been configured."), indicator="green")
def get_authentication_url(client_id=None, redirect_uri=None):
return {
"url": "https://accounts.google.com/o/oauth2/v2/auth?access_type=offline&response_type=code&prompt=consent&client_id={}&include_granted_scopes=true&scope={}&redirect_uri={}".format(
client_id, SCOPES, redirect_uri
"url": (
"https://accounts.google.com/o/oauth2/v2/auth?"
f"access_type=offline&response_type=code&prompt=consent&client_id={client_id}"
f"&include_granted_scopes=true&scope={SCOPES}&redirect_uri={redirect_uri}"
)
}
@ -218,52 +218,53 @@ def get_google_calendar_object(g_calendar):
google_settings = frappe.get_cached_doc("Google Settings")
account: GoogleCalendar = frappe.get_doc("Google Calendar", g_calendar)
credentials_dict = {
"token": account.get_access_token(),
"refresh_token": account.get_password(fieldname="refresh_token", raise_exception=False),
"token_uri": GoogleOAuth.OAUTH_URL,
"client_id": google_settings.client_id,
"client_secret": google_settings.get_password(fieldname="client_secret", raise_exception=False),
"scopes": [SCOPES],
}
credentials = google.oauth2.credentials.Credentials(**credentials_dict)
credentials = google.oauth2.credentials.Credentials(
token=account.get_access_token(),
refresh_token=account.get_password(fieldname="refresh_token", raise_exception=False),
token_uri=GoogleOAuth.OAUTH_URL,
client_id=google_settings.client_id,
client_secret=google_settings.get_password(fieldname="client_secret", raise_exception=False),
scopes=[SCOPES],
)
google_calendar = build(
serviceName="calendar", version="v3", credentials=credentials, static_discovery=False
)
check_google_calendar(account, google_calendar)
check_google_calendar(account.reload(), google_calendar)
account.load_from_db()
return google_calendar, account
return google_calendar, account.reload()
def check_google_calendar(account, google_calendar):
def check_google_calendar(account: GoogleCalendar, google_calendar):
"""
Checks if Google Calendar is present with the specified name.
If not, creates one.
"""
account.load_from_db()
try:
if account.google_calendar_id:
google_calendar.calendars().get(calendarId=account.google_calendar_id).execute()
else:
# If no Calendar ID create a new Calendar
calendar = {
"summary": account.calendar_name,
"timeZone": frappe.get_system_settings("time_zone"),
}
created_calendar = google_calendar.calendars().insert(body=calendar).execute()
frappe.db.set_value(
"Google Calendar", account.name, "google_calendar_id", created_calendar.get("id")
if account.google_calendar_id:
try:
return google_calendar.calendars().get(calendarId=account.google_calendar_id).execute()
except HttpError as err:
frappe.throw(
_("Google Calendar - Could not find Calendar for {0}, error code {1}.").format(
account.name, err.resp.status
)
)
frappe.db.commit()
# If no Calendar ID create a new Calendar
calendar = {
"summary": account.calendar_name,
"timeZone": frappe.get_system_settings("time_zone"),
}
try:
created_calendar = google_calendar.calendars().insert(body=calendar).execute()
except HttpError as err:
frappe.throw(
_("Google Calendar - Could not create Calendar for {0}, error code {1}.").format(
account.name, err.resp.status
)
)
account.db_set("google_calendar_id", created_calendar.get("id"))
frappe.db.commit()
def sync_events_from_google_calendar(g_calendar, method=None):
@ -318,17 +319,15 @@ def sync_events_from_google_calendar(g_calendar, method=None):
for idx, event in enumerate(results):
frappe.publish_realtime(
"import_google_calendar", dict(progress=idx + 1, total=len(results)), user=frappe.session.user
"import_google_calendar", {"progress": idx + 1, "total": len(results)}, user=frappe.session.user
)
# If Google Calendar Event if confirmed, then create an Event
if event.get("status") == "confirmed":
recurrence = None
if event.get("recurrence"):
try:
with suppress(IndexError):
recurrence = event.get("recurrence")[0]
except IndexError:
pass
if not frappe.db.exists("Event", {"google_calendar_event_id": event.get("id")}):
insert_event_to_calendar(account, event, recurrence)
@ -359,8 +358,6 @@ def sync_events_from_google_calendar(g_calendar, method=None):
"content": " - Event deleted from Google Calendar.",
}
).insert(ignore_permissions=True)
else:
pass
if not results:
return _("No Google Calendar Event to sync.")
@ -583,28 +580,23 @@ def delete_event_from_google_calendar(doc, method=None):
)
def google_calendar_to_repeat_on(start, end, recurrence=None):
def parse_google_calendar_date(dt):
if dt.get("date"):
return get_datetime(dt.get("date"))
return parser.parse(dt.get("dateTime")).astimezone(ZoneInfo(get_system_timezone())).replace(tzinfo=None)
def google_calendar_to_repeat_on(*, start, end, recurrence=None):
"""
recurrence is in the form ['RRULE:FREQ=WEEKLY;BYDAY=MO,TU,TH']
has the frequency and then the days on which the event recurs
Both have been mapped in a dict for easier mapping.
"""
repeat_on = {
"starts_on": (
get_datetime(start.get("date"))
if start.get("date")
else parser.parse(start.get("dateTime"))
.astimezone(ZoneInfo(get_system_timezone()))
.replace(tzinfo=None)
),
"ends_on": (
get_datetime(end.get("date"))
if end.get("date")
else parser.parse(end.get("dateTime"))
.astimezone(ZoneInfo(get_system_timezone()))
.replace(tzinfo=None)
),
"starts_on": parse_google_calendar_date(start),
"ends_on": parse_google_calendar_date(end),
"all_day": 1 if start.get("date") else 0,
"repeat_this_event": 1 if recurrence else 0,
"repeat_on": None,
@ -619,44 +611,45 @@ def google_calendar_to_repeat_on(start, end, recurrence=None):
}
# recurrence rule "RRULE:FREQ=WEEKLY;BYDAY=MO,TU,TH"
if recurrence:
# google_calendar_frequency = RRULE:FREQ=WEEKLY, byday = BYDAY=MO,TU,TH, until = 20191028
google_calendar_frequency, until, byday = get_recurrence_parameters(recurrence)
repeat_on["repeat_on"] = google_calendar_frequencies.get(google_calendar_frequency)
if not recurrence:
return repeat_on
if repeat_on["repeat_on"] == "Daily":
repeat_on["ends_on"] = None
repeat_on["repeat_till"] = datetime.strptime(until, "%Y%m%d") if until else None
# google_calendar_frequency = RRULE:FREQ=WEEKLY, byday = BYDAY=MO,TU,TH, until = 20191028
google_calendar_frequency, until, byday = get_recurrence_parameters(recurrence)
repeat_on["repeat_on"] = google_calendar_frequencies.get(google_calendar_frequency)
if byday and repeat_on["repeat_on"] == "Weekly":
repeat_on["repeat_till"] = datetime.strptime(until, "%Y%m%d") if until else None
byday = byday.split("=")[1].split(",")
for repeat_day in byday:
repeat_on[google_calendar_days[repeat_day]] = 1
if repeat_on["repeat_on"] == "Daily":
repeat_on["ends_on"] = None
repeat_on["repeat_till"] = until
if byday and repeat_on["repeat_on"] == "Monthly":
byday = byday.split("=")[1]
repeat_day_week_number, repeat_day_name = None, None
if byday and repeat_on["repeat_on"] == "Weekly":
repeat_on["repeat_till"] = until
for repeat_day in byday:
repeat_on[google_calendar_days[repeat_day]] = 1
for num in ["-2", "-1", "1", "2", "3", "4", "5"]:
if num in byday:
repeat_day_week_number = num
break
if byday and repeat_on["repeat_on"] == "Monthly":
byday = byday.split("=")[1]
repeat_day_week_number, repeat_day_name = None, None
for day in ["MO", "TU", "WE", "TH", "FR", "SA", "SU"]:
if day in byday:
repeat_day_name = google_calendar_days.get(day)
break
for num in ["-2", "-1", "1", "2", "3", "4", "5"]:
if num in byday:
repeat_day_week_number = num
break
# Only Set starts_on for the event to repeat monthly
start_date = parse_google_calendar_recurrence_rule(int(repeat_day_week_number), repeat_day_name)
repeat_on["starts_on"] = start_date
repeat_on["ends_on"] = add_to_date(start_date, minutes=5)
repeat_on["repeat_till"] = datetime.strptime(until, "%Y%m%d") if until else None
for day in ["MO", "TU", "WE", "TH", "FR", "SA", "SU"]:
if day in byday:
repeat_day_name = google_calendar_days.get(day)
break
if repeat_on["repeat_till"] == "Yearly":
repeat_on["ends_on"] = None
repeat_on["repeat_till"] = datetime.strptime(until, "%Y%m%d") if until else None
# Only Set starts_on for the event to repeat monthly
start_date = parse_google_calendar_recurrence_rule(int(repeat_day_week_number), repeat_day_name)
repeat_on["starts_on"] = start_date
repeat_on["ends_on"] = add_to_date(start_date, minutes=5)
repeat_on["repeat_till"] = until
if repeat_on["repeat_till"] == "Yearly":
repeat_on["ends_on"] = None
repeat_on["repeat_till"] = until
return repeat_on
@ -730,13 +723,11 @@ def repeat_on_to_google_calendar_recurrence_rule(doc):
return [recurrence]
def get_week_number(dt):
def get_week_number(dt: date):
"""Return the week number of the month for the specified date.
https://stackoverflow.com/questions/3806473/python-week-number-of-the-month/16804556
"""
from math import ceil
first_day = dt.replace(day=1)
dom = dt.day