* fix(diff): add type hints to whitelisted methods * fix(global_search): add type hints to whitelisted methods * fix(custom_html_block): add type hints to whitelisted methods * fix(deleted_document): add type hints to whitelisted methods * fix(log_settings): add type hints to whitelisted methods * fix(role): add type hints to whitelisted methods * fix(user_type): add type hints to whitelisted methods * fix(rq_job): add type hints to whitelisted methods * fix(link_preview): add type hints to whitelisted methods * fix(email_account): add type hints to whitelisted methods * fix(web_form): add type hints to whitelisted methods * fix(web_page_view): add type hints to whitelisted methods * fix(csvutils): add type hints to whitelisted methods * fix(file_manager): add type hints to whitelisted methods * fix(email_body): add type hints to whitelisted methods * fix(email_queue): add type hints to whitelisted methods * fix(email_template): add type hints to whitelisted methods * fix(notification): add type hints to whitelisted methods * fix(email_group): add type hints to whitelisted methods * fix(inbox): add type hints to whitelisted methods * fix(recorder): add type hints to whitelisted methods * fix(sms_settings): add type hints to whitelisted methods * fix: tighten type hints * fix(data_import): add type hints to whitelisted methods * fix(user_permission): add type hints to whitelisted methods * fix(gantt): add type hints to whitelisted methods * fix(like): add type hints to whitelisted methods * fix(search): add type hints to whitelisted methods * fix(onboarding_step): add type hints to whitelisted methods * fix(system_console): add type hints to whitelisted methods * fix(workspace_sidebar): add type hints to whitelisted methods * fix(todo): add type hints to whitelisted methods * fix: correct type hints * fix(print_format): add type hints to whitelisted methods * fix(client): add type hints to whitelisted methods
662 lines
17 KiB
Python
Executable file
662 lines
17 KiB
Python
Executable file
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
|
|
# License: MIT. See LICENSE
|
|
|
|
from __future__ import annotations
|
|
|
|
import email.utils
|
|
import os
|
|
import re
|
|
from email import policy
|
|
from email.header import Header
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.utils import formataddr
|
|
from typing import TYPE_CHECKING
|
|
|
|
import frappe
|
|
from frappe import _
|
|
from frappe.email.doctype.email_account.email_account import EmailAccount
|
|
from frappe.utils import (
|
|
cint,
|
|
expand_relative_urls,
|
|
get_url,
|
|
markdown,
|
|
parse_addr,
|
|
random_string,
|
|
scrub_urls,
|
|
split_emails,
|
|
strip,
|
|
to_markdown,
|
|
validate_email_address,
|
|
)
|
|
from frappe.utils.pdf import get_pdf
|
|
|
|
if TYPE_CHECKING:
|
|
from typing import Literal
|
|
|
|
EMBED_PATTERN = re.compile("""embed=["'](.*?)["']""")
|
|
|
|
|
|
def get_email(
|
|
recipients,
|
|
sender="",
|
|
msg="",
|
|
subject="[No Subject]",
|
|
text_content=None,
|
|
footer=None,
|
|
print_html=None,
|
|
formatted=None,
|
|
attachments=None,
|
|
content=None,
|
|
reply_to=None,
|
|
cc=None,
|
|
bcc=None,
|
|
email_account=None,
|
|
expose_recipients=None,
|
|
inline_images=None,
|
|
header=None,
|
|
x_priority: Literal[1, 3, 5] = 3,
|
|
):
|
|
"""Prepare an email with the following format:
|
|
- multipart/mixed
|
|
- multipart/alternative
|
|
- text/plain
|
|
- multipart/related
|
|
- text/html
|
|
- inline image
|
|
- attachment
|
|
"""
|
|
content = content or msg
|
|
|
|
if cc is None:
|
|
cc = []
|
|
if bcc is None:
|
|
bcc = []
|
|
if inline_images is None:
|
|
inline_images = []
|
|
|
|
emailobj = EMail(
|
|
sender,
|
|
recipients,
|
|
subject,
|
|
reply_to=reply_to,
|
|
cc=cc,
|
|
bcc=bcc,
|
|
email_account=email_account,
|
|
expose_recipients=expose_recipients,
|
|
x_priority=x_priority,
|
|
)
|
|
|
|
if not content.strip().startswith("<"):
|
|
content = markdown(content)
|
|
|
|
emailobj.set_html(
|
|
content,
|
|
text_content,
|
|
footer=footer,
|
|
header=header,
|
|
print_html=print_html,
|
|
formatted=formatted,
|
|
inline_images=inline_images,
|
|
)
|
|
|
|
if isinstance(attachments, dict):
|
|
attachments = [attachments]
|
|
|
|
for attach in attachments or []:
|
|
# cannot attach if no filecontent
|
|
if attach.get("fcontent") is None:
|
|
continue
|
|
emailobj.add_attachment(**attach)
|
|
|
|
return emailobj
|
|
|
|
|
|
class EMail:
|
|
"""
|
|
Wrapper on the email module. Email object represents emails to be sent to the client.
|
|
Also provides a clean way to add binary `FileData` attachments
|
|
Also sets all messages as multipart/alternative for cleaner reading in text-only clients
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
sender="",
|
|
recipients=(),
|
|
subject="",
|
|
alternative=0,
|
|
reply_to=None,
|
|
cc=(),
|
|
bcc=(),
|
|
email_account=None,
|
|
expose_recipients=None,
|
|
x_priority: Literal[1, 3, 5] = 3,
|
|
):
|
|
from email import charset as Charset
|
|
|
|
Charset.add_charset("utf-8", Charset.QP, Charset.QP, "utf-8")
|
|
|
|
if isinstance(recipients, str):
|
|
recipients = recipients.replace(";", ",").replace("\n", "")
|
|
recipients = split_emails(recipients)
|
|
|
|
# remove null
|
|
recipients = [r for r in (strip(r) for r in recipients) if r]
|
|
|
|
self.sender = sender
|
|
self.reply_to = reply_to or sender
|
|
self.recipients = recipients
|
|
self.subject = subject
|
|
self.expose_recipients = expose_recipients
|
|
|
|
self.msg_root = MIMEMultipart("mixed", policy=policy.SMTP)
|
|
self.msg_alternative = MIMEMultipart("alternative", policy=policy.SMTP)
|
|
self.msg_root.attach(self.msg_alternative)
|
|
self.cc = cc or []
|
|
self.bcc = bcc or []
|
|
self.html_set = False
|
|
|
|
self.x_priority: Literal[1, 3, 5] = x_priority
|
|
|
|
self.email_account = email_account or EmailAccount.find_outgoing(
|
|
match_by_email=sender, _raise_error=True
|
|
)
|
|
|
|
def set_html(
|
|
self,
|
|
message,
|
|
text_content=None,
|
|
footer=None,
|
|
print_html=None,
|
|
formatted=None,
|
|
inline_images=None,
|
|
header=None,
|
|
):
|
|
"""Attach message in the html portion of multipart/alternative"""
|
|
if not formatted:
|
|
formatted = get_formatted_html(
|
|
self.subject,
|
|
message,
|
|
footer,
|
|
print_html,
|
|
email_account=self.email_account,
|
|
header=header,
|
|
sender=self.sender,
|
|
)
|
|
|
|
# this is the first html part of a multi-part message,
|
|
# convert to text well
|
|
if not self.html_set:
|
|
if text_content:
|
|
self.set_text(expand_relative_urls(text_content))
|
|
else:
|
|
self.set_html_as_text(expand_relative_urls(formatted))
|
|
|
|
self.set_part_html(formatted, inline_images)
|
|
self.html_set = True
|
|
|
|
def set_text(self, message):
|
|
"""
|
|
Attach message in the text portion of multipart/alternative
|
|
"""
|
|
from email.mime.text import MIMEText
|
|
|
|
part = MIMEText(message, "plain", "utf-8", policy=policy.SMTP)
|
|
self.msg_alternative.attach(part)
|
|
|
|
def set_part_html(self, message, inline_images):
|
|
from email.mime.text import MIMEText
|
|
|
|
has_inline_images = EMBED_PATTERN.search(message)
|
|
|
|
if has_inline_images:
|
|
# process inline images
|
|
message, _inline_images = replace_filename_with_cid(message)
|
|
|
|
# prepare parts
|
|
msg_related = MIMEMultipart("related", policy=policy.SMTP)
|
|
|
|
html_part = MIMEText(message, "html", "utf-8", policy=policy.SMTP)
|
|
msg_related.attach(html_part)
|
|
|
|
for image in _inline_images:
|
|
self.add_attachment(
|
|
image.get("filename"),
|
|
image.get("filecontent"),
|
|
content_id=image.get("content_id"),
|
|
parent=msg_related,
|
|
inline=True,
|
|
)
|
|
|
|
self.msg_alternative.attach(msg_related)
|
|
else:
|
|
self.msg_alternative.attach(MIMEText(message, "html", "utf-8", policy=policy.SMTP))
|
|
|
|
def set_html_as_text(self, html):
|
|
"""Set plain text from HTML"""
|
|
self.set_text(to_markdown(html))
|
|
|
|
def set_message(self, message, mime_type="text/html", as_attachment=0, filename="attachment.html"):
|
|
"""Append the message with MIME content to the root node (as attachment)"""
|
|
from email.mime.text import MIMEText
|
|
|
|
_maintype, subtype = mime_type.split("/")
|
|
part = MIMEText(message, _subtype=subtype, policy=policy.SMTP)
|
|
|
|
if as_attachment:
|
|
clean_filename = re.sub("[\r\n]", "", str(filename))
|
|
part.add_header("Content-Disposition", "attachment", filename=clean_filename)
|
|
|
|
self.msg_root.attach(part)
|
|
|
|
def attach_file(self, n):
|
|
"""attach a file from the `FileData` table"""
|
|
_file = frappe.get_doc("File", {"file_name": n})
|
|
content = _file.get_content()
|
|
if not content:
|
|
return
|
|
|
|
self.add_attachment(_file.file_name, content)
|
|
|
|
def add_attachment(self, fname, fcontent, content_type=None, parent=None, content_id=None, inline=False):
|
|
"""add attachment"""
|
|
|
|
if not parent:
|
|
parent = self.msg_root
|
|
|
|
add_attachment(fname, fcontent, content_type, parent, content_id, inline)
|
|
|
|
def add_pdf_attachment(self, name, html, options=None):
|
|
self.add_attachment(name, get_pdf(html, options), "application/octet-stream")
|
|
|
|
def validate(self):
|
|
"""validate the Email Addresses"""
|
|
|
|
if not self.sender:
|
|
self.sender = self.email_account.default_sender
|
|
|
|
validate_email_address(strip(self.sender), True)
|
|
self.validate_reply_to()
|
|
|
|
if self.email_account.add_x_original_from:
|
|
self.set_header("X-Original-From", self.sender)
|
|
|
|
self.replace_sender()
|
|
self.replace_sender_name()
|
|
|
|
self.recipients = [strip(r) for r in self.recipients if r not in frappe.STANDARD_USERS]
|
|
self.cc = [strip(r) for r in self.cc if r not in frappe.STANDARD_USERS]
|
|
self.bcc = [strip(r) for r in self.bcc if r not in frappe.STANDARD_USERS]
|
|
|
|
for e in self.recipients + (self.cc or []) + (self.bcc or []):
|
|
validate_email_address(e, True)
|
|
|
|
def validate_reply_to(self) -> None:
|
|
if not self.email_account.add_reply_to_header:
|
|
self.reply_to = None
|
|
return
|
|
|
|
if self.email_account.reply_to_addresses:
|
|
valid_addresses = [
|
|
formataddr((reply_to._name, reply_to.email))
|
|
for reply_to in self.email_account.reply_to_addresses
|
|
if reply_to.email and validate_email_address(reply_to.email, True)
|
|
]
|
|
self.reply_to = ", ".join(valid_addresses) if valid_addresses else None
|
|
return
|
|
|
|
fallback = strip(self.reply_to) or self.sender
|
|
self.reply_to = validate_email_address(fallback, True)
|
|
|
|
def replace_sender(self):
|
|
if cint(self.email_account.always_use_account_email_id_as_sender):
|
|
sender_name, _ = parse_addr(self.sender)
|
|
self.sender = email.utils.formataddr(
|
|
(str(Header(sender_name or self.email_account.name, "utf-8")), self.email_account.email_id)
|
|
)
|
|
|
|
def replace_sender_name(self):
|
|
if cint(self.email_account.always_use_account_name_as_sender_name):
|
|
_, sender_email = parse_addr(self.sender)
|
|
self.sender = email.utils.formataddr(
|
|
(str(Header(self.email_account.name, "utf-8")), sender_email)
|
|
)
|
|
|
|
def set_message_id(self, message_id, is_notification=False):
|
|
if message_id:
|
|
message_id = "<" + message_id + ">"
|
|
else:
|
|
message_id = get_message_id()
|
|
self.set_header("isnotification", "<notification>")
|
|
|
|
if is_notification:
|
|
self.set_header("isnotification", "<notification>")
|
|
|
|
self.set_header("Message-Id", message_id)
|
|
|
|
def set_in_reply_to(self, in_reply_to):
|
|
"""Used to send the Message-Id of a received email back as In-Reply-To"""
|
|
self.set_header("In-Reply-To", in_reply_to)
|
|
|
|
def add_headers(self, headers):
|
|
"""Add custom headers to the email"""
|
|
if not isinstance(headers, dict):
|
|
frappe.throw(_("Headers must be a dictionary"))
|
|
|
|
for key, value in headers.items():
|
|
if value is not None:
|
|
key = "X-" + key if not key.startswith("X-") else key
|
|
self.set_header(key, value)
|
|
|
|
def make(self):
|
|
"""build into msg_root"""
|
|
headers = {
|
|
"Subject": strip(self.subject),
|
|
"From": self.sender,
|
|
"To": ", ".join(self.recipients) if self.expose_recipients == "header" else "<!--recipient-->",
|
|
"Date": email.utils.formatdate(),
|
|
"Reply-To": self.reply_to if self.reply_to else None,
|
|
# cc should always be visible - as that is the semantic meaning of cc, this should not be dependent on expose_recipients
|
|
"CC": ", ".join(self.cc) if self.cc else None,
|
|
"X-Frappe-Site": get_url(),
|
|
}
|
|
|
|
if self.x_priority != 3:
|
|
headers.update(
|
|
{
|
|
"X-Priority": str(self.x_priority),
|
|
}
|
|
)
|
|
|
|
# reset headers as values may be changed.
|
|
for key, val in headers.items():
|
|
if val:
|
|
self.set_header(key, val)
|
|
|
|
# call hook to enable apps to modify msg_root before sending
|
|
for hook in frappe.get_hooks("make_email_body_message"):
|
|
frappe.get_attr(hook)(self)
|
|
|
|
def set_header(self, key, value):
|
|
if key in self.msg_root:
|
|
# delete key if found
|
|
# this is done because adding the same key doesn't override
|
|
# the existing key, rather appends another header with same key.
|
|
del self.msg_root[key]
|
|
|
|
self.msg_root[key] = sanitize_email_header(value)
|
|
|
|
def as_string(self):
|
|
"""validate, build message and convert to string"""
|
|
self.validate()
|
|
self.make()
|
|
return self.msg_root.as_string(policy=policy.SMTP)
|
|
|
|
|
|
def get_formatted_html(
|
|
subject,
|
|
message,
|
|
footer=None,
|
|
print_html=None,
|
|
email_account=None,
|
|
header=None,
|
|
unsubscribe_link: frappe._dict | None = None,
|
|
sender=None,
|
|
with_container=False,
|
|
raw_html=False,
|
|
add_css=True,
|
|
):
|
|
email_account = email_account or EmailAccount.find_outgoing(match_by_email=sender)
|
|
|
|
params = {
|
|
"site_url": get_url(),
|
|
"title": subject,
|
|
"print_html": print_html,
|
|
"subject": subject,
|
|
}
|
|
|
|
if raw_html:
|
|
rendered_email = message
|
|
else:
|
|
params.update(
|
|
{
|
|
"brand_logo": get_brand_logo(email_account) if with_container or header else None,
|
|
"with_container": with_container,
|
|
"header": get_header(header),
|
|
"content": message,
|
|
"footer": get_footer(email_account, footer),
|
|
}
|
|
)
|
|
rendered_email = frappe.get_template("templates/emails/standard.html").render(params)
|
|
|
|
html = scrub_urls(rendered_email)
|
|
|
|
if unsubscribe_link:
|
|
html = html.replace("<!--unsubscribe link here-->", unsubscribe_link.html)
|
|
|
|
return inline_style_in_html(html, add_css=add_css)
|
|
|
|
|
|
@frappe.whitelist()
|
|
def get_email_html(
|
|
template: str,
|
|
args: str,
|
|
subject: str,
|
|
header: str | list | None = None,
|
|
with_container: str | int | bool = False,
|
|
):
|
|
import json
|
|
|
|
with_container = cint(with_container)
|
|
args = json.loads(args)
|
|
if header and header.startswith("["):
|
|
header = json.loads(header)
|
|
email = frappe.utils.jinja.get_email_from_template(template, args)
|
|
return get_formatted_html(subject, email[0], header=header, with_container=with_container)
|
|
|
|
|
|
def inline_style_in_html(html, add_css=True):
|
|
"""Convert email.css and html to inline-styled html."""
|
|
from premailer import Premailer
|
|
|
|
from frappe.utils.jinja_globals import bundled_asset
|
|
|
|
if add_css:
|
|
# get email css files from hooks
|
|
css_files = frappe.get_hooks("email_css")
|
|
css_files = [bundled_asset(path) for path in css_files]
|
|
css_files = [path.lstrip("/") for path in css_files]
|
|
css_files = [css_file for css_file in css_files if os.path.exists(os.path.abspath(css_file))]
|
|
else:
|
|
css_files = None
|
|
|
|
p = Premailer(
|
|
html=html, external_styles=css_files, strip_important=False, allow_loading_external_files=True
|
|
)
|
|
|
|
return p.transform()
|
|
|
|
|
|
def add_attachment(fname, fcontent, content_type=None, parent=None, content_id=None, inline=False):
|
|
"""Add attachment to parent which must an email object"""
|
|
import mimetypes
|
|
from email.mime.audio import MIMEAudio
|
|
from email.mime.base import MIMEBase
|
|
from email.mime.image import MIMEImage
|
|
from email.mime.text import MIMEText
|
|
|
|
if not content_type:
|
|
content_type, _encoding = mimetypes.guess_type(fname)
|
|
|
|
if not parent:
|
|
return
|
|
|
|
if content_type is None:
|
|
# No guess could be made, or the file is encoded (compressed), so
|
|
# use a generic bag-of-bits type.
|
|
content_type = "application/octet-stream"
|
|
|
|
maintype, subtype = content_type.split("/", 1)
|
|
if maintype == "text":
|
|
# Note: we should handle calculating the charset
|
|
if isinstance(fcontent, str):
|
|
fcontent = fcontent.encode("utf-8")
|
|
part = MIMEText(fcontent, _subtype=subtype, _charset="utf-8")
|
|
elif maintype == "image":
|
|
part = MIMEImage(fcontent, _subtype=subtype)
|
|
elif maintype == "audio":
|
|
part = MIMEAudio(fcontent, _subtype=subtype)
|
|
else:
|
|
part = MIMEBase(maintype, subtype)
|
|
part.set_payload(fcontent)
|
|
# Encode the payload using Base64
|
|
from email import encoders
|
|
|
|
encoders.encode_base64(part)
|
|
|
|
# Set the filename parameter
|
|
if fname:
|
|
attachment_type = "inline" if inline else "attachment"
|
|
clean_filename = re.sub("[\r\n]", "", str(fname))
|
|
part.add_header("Content-Disposition", attachment_type, filename=clean_filename)
|
|
if content_id:
|
|
part.add_header("Content-ID", f"<{content_id}>")
|
|
|
|
parent.attach(part)
|
|
|
|
|
|
def get_message_id():
|
|
"""Return Message ID created from doctype and name."""
|
|
return email.utils.make_msgid(domain=frappe.local.site)
|
|
|
|
|
|
def get_signature(email_account):
|
|
if email_account and email_account.add_signature and email_account.signature:
|
|
return "<br>" + email_account.signature
|
|
else:
|
|
return ""
|
|
|
|
|
|
def get_footer(email_account, footer=None):
|
|
"""append a footer (signature)"""
|
|
footer = footer or ""
|
|
|
|
args = {}
|
|
|
|
if email_account and email_account.footer:
|
|
args.update({"email_account_footer": email_account.footer})
|
|
|
|
sender_address = frappe.db.get_default("email_footer_address")
|
|
|
|
if sender_address:
|
|
args.update({"sender_address": sender_address})
|
|
|
|
if not cint(frappe.db.get_default("disable_standard_email_footer")):
|
|
args.update({"default_mail_footer": frappe.get_hooks("default_mail_footer")})
|
|
|
|
footer += frappe.utils.jinja.get_email_from_template("email_footer", args)[0]
|
|
|
|
return footer
|
|
|
|
|
|
def replace_filename_with_cid(message):
|
|
"""Replaces <img embed="assets/frappe/images/filename.jpg" ...> with
|
|
<img src="cid:content_id" ...> and return the modified message and
|
|
a list of inline_images with {filename, filecontent, content_id}
|
|
"""
|
|
|
|
inline_images = []
|
|
|
|
while True:
|
|
matches = EMBED_PATTERN.search(message)
|
|
if not matches:
|
|
break
|
|
groups = matches.groups()
|
|
|
|
# found match
|
|
img_path = groups[0]
|
|
img_path_escaped = frappe.utils.html_utils.unescape_html(img_path)
|
|
filename = img_path_escaped.rsplit("/")[-1]
|
|
|
|
filecontent = get_filecontent_from_path(img_path_escaped)
|
|
if not filecontent:
|
|
message = re.sub(f"""embed=['"]{re.escape(img_path)}['"]""", "", message)
|
|
continue
|
|
|
|
content_id = random_string(10)
|
|
|
|
inline_images.append({"filename": filename, "filecontent": filecontent, "content_id": content_id})
|
|
|
|
message = re.sub(f"""embed=['"]{re.escape(img_path)}['"]""", f'src="cid:{content_id}"', message)
|
|
|
|
return (message, inline_images)
|
|
|
|
|
|
def get_filecontent_from_path(path):
|
|
if not path:
|
|
return
|
|
|
|
if path.startswith("/"):
|
|
path = path[1:]
|
|
|
|
if path.startswith("assets/"):
|
|
# from public folder
|
|
full_path = os.path.abspath(path)
|
|
elif path.startswith("files/"):
|
|
# public file
|
|
full_path = frappe.get_site_path("public", path)
|
|
elif path.startswith("private/files/"):
|
|
# private file
|
|
full_path = frappe.get_site_path(path)
|
|
else:
|
|
full_path = path
|
|
|
|
if os.path.exists(full_path):
|
|
with open(full_path, "rb") as f:
|
|
filecontent = f.read()
|
|
|
|
return filecontent
|
|
else:
|
|
return None
|
|
|
|
|
|
def get_header(header=None):
|
|
"""Build header from template"""
|
|
from frappe.utils.jinja import get_email_from_template
|
|
|
|
if not header:
|
|
return None
|
|
|
|
if isinstance(header, str):
|
|
# header = 'My Title'
|
|
header = [header, None]
|
|
if len(header) == 1:
|
|
# header = ['My Title']
|
|
header.append(None)
|
|
# header = ['My Title', 'orange']
|
|
title, indicator = header
|
|
|
|
if not title:
|
|
title = frappe.get_hooks("app_title")[-1]
|
|
|
|
email_header, _text = get_email_from_template(
|
|
"email_header", {"header_title": title, "indicator": indicator}
|
|
)
|
|
|
|
return email_header
|
|
|
|
|
|
def sanitize_email_header(header: str):
|
|
"""
|
|
Removes all line boundaries in the headers.
|
|
|
|
Email Policy (python's std) has some bugs in it which uses splitlines
|
|
and raises ValueError (ref: https://github.com/python/cpython/blob/main/Lib/email/policy.py#L143).
|
|
Hence removing all line boundaries while sanitization of headers to prevent such faliures.
|
|
The line boundaries which are removed can be found here: https://docs.python.org/3/library/stdtypes.html#str.splitlines
|
|
"""
|
|
|
|
return "".join(header.splitlines())
|
|
|
|
|
|
def get_brand_logo(email_account):
|
|
return email_account.get("brand_logo")
|