seitime-frappe/frappe/email/email_body.py
2026-04-01 18:10:30 +05:30

701 lines
19 KiB
Python
Executable file

# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# License: MIT. See LICENSE
from __future__ import annotations
import email.utils
import os
import re
from email import policy
from email.header import Header
from email.mime.multipart import MIMEMultipart
from email.utils import formataddr
from typing import TYPE_CHECKING
import frappe
from frappe import _
from frappe.email.doctype.email_account.email_account import EmailAccount
from frappe.utils import (
cint,
expand_relative_urls,
get_url,
markdown,
parse_addr,
random_string,
scrub_urls,
split_emails,
strip,
to_markdown,
validate_email_address,
)
from frappe.utils.pdf import get_pdf
if TYPE_CHECKING:
from typing import Literal
EMBED_PATTERN = re.compile("""embed=["'](.*?)["']""")
def get_email(
recipients,
sender="",
msg="",
subject="[No Subject]",
text_content=None,
footer=None,
print_html=None,
formatted=None,
attachments=None,
content=None,
reply_to=None,
cc=None,
bcc=None,
email_account=None,
expose_recipients=None,
inline_images=None,
header=None,
x_priority: Literal[1, 3, 5] = 3,
):
"""Prepare an email with the following format:
- multipart/mixed
- multipart/alternative
- text/plain
- multipart/related
- text/html
- inline image
- attachment
"""
content = content or msg
if cc is None:
cc = []
if bcc is None:
bcc = []
if inline_images is None:
inline_images = []
emailobj = EMail(
sender,
recipients,
subject,
reply_to=reply_to,
cc=cc,
bcc=bcc,
email_account=email_account,
expose_recipients=expose_recipients,
x_priority=x_priority,
)
if not content.strip().startswith("<"):
content = markdown(content)
emailobj.set_html(
content,
text_content,
footer=footer,
header=header,
print_html=print_html,
formatted=formatted,
inline_images=inline_images,
)
if isinstance(attachments, dict):
attachments = [attachments]
for attach in attachments or []:
# cannot attach if no filecontent
if attach.get("fcontent") is None:
continue
emailobj.add_attachment(**attach)
return emailobj
class EMail:
"""
Wrapper on the email module. Email object represents emails to be sent to the client.
Also provides a clean way to add binary `FileData` attachments
Also sets all messages as multipart/alternative for cleaner reading in text-only clients
"""
def __init__(
self,
sender="",
recipients=(),
subject="",
alternative=0,
reply_to=None,
cc=(),
bcc=(),
email_account=None,
expose_recipients=None,
x_priority: Literal[1, 3, 5] = 3,
):
from email import charset as Charset
Charset.add_charset("utf-8", Charset.QP, Charset.QP, "utf-8")
if isinstance(recipients, str):
recipients = recipients.replace(";", ",").replace("\n", "")
recipients = split_emails(recipients)
# remove null
recipients = [r for r in (strip(r) for r in recipients) if r]
self.sender = sender
self.reply_to = reply_to or sender
self.recipients = recipients
self.subject = subject
self.expose_recipients = expose_recipients
self.msg_root = MIMEMultipart("mixed", policy=policy.SMTP)
self.msg_alternative = MIMEMultipart("alternative", policy=policy.SMTP)
self.msg_root.attach(self.msg_alternative)
self.cc = cc or []
self.bcc = bcc or []
self.html_set = False
self.x_priority: Literal[1, 3, 5] = x_priority
self.email_account = email_account or EmailAccount.find_outgoing(
match_by_email=sender, _raise_error=True
)
def set_html(
self,
message,
text_content=None,
footer=None,
print_html=None,
formatted=None,
inline_images=None,
header=None,
):
"""Attach message in the html portion of multipart/alternative"""
if not formatted:
formatted = get_formatted_html(
self.subject,
message,
footer,
print_html,
email_account=self.email_account,
header=header,
sender=self.sender,
)
# this is the first html part of a multi-part message,
# convert to text well
if not self.html_set:
if text_content:
self.set_text(expand_relative_urls(text_content))
else:
self.set_html_as_text(expand_relative_urls(formatted))
self.set_part_html(formatted, inline_images)
self.html_set = True
def set_text(self, message):
"""
Attach message in the text portion of multipart/alternative
"""
from email.mime.text import MIMEText
part = MIMEText(message, "plain", "utf-8", policy=policy.SMTP)
self.msg_alternative.attach(part)
def set_part_html(self, message, inline_images):
from email.mime.text import MIMEText
has_inline_images = EMBED_PATTERN.search(message)
if has_inline_images:
# process inline images
provided_images = {}
if inline_images:
for img in inline_images:
if img.get("filename") and img.get("filecontent"):
# index by full path and basename for flexible matching
provided_images[img["filename"]] = img["filecontent"]
basename = img["filename"].rsplit("/", 1)[-1]
if basename not in provided_images:
provided_images[basename] = img["filecontent"]
# process inline images while preferring provided_images over disk reads
message, _inline_images = replace_filename_with_cid(message, provided_images)
# prepare parts
msg_related = MIMEMultipart("related", policy=policy.SMTP)
html_part = MIMEText(message, "html", "utf-8", policy=policy.SMTP)
msg_related.attach(html_part)
for image in _inline_images:
self.add_attachment(
image.get("filename"),
image.get("filecontent"),
content_id=image.get("content_id"),
parent=msg_related,
inline=True,
)
self.msg_alternative.attach(msg_related)
else:
self.msg_alternative.attach(MIMEText(message, "html", "utf-8", policy=policy.SMTP))
def set_html_as_text(self, html):
"""Set plain text from HTML"""
self.set_text(to_markdown(html))
def set_message(self, message, mime_type="text/html", as_attachment=0, filename="attachment.html"):
"""Append the message with MIME content to the root node (as attachment)"""
from email.mime.text import MIMEText
_maintype, subtype = mime_type.split("/")
part = MIMEText(message, _subtype=subtype, policy=policy.SMTP)
if as_attachment:
clean_filename = re.sub("[\r\n]", "", str(filename))
part.add_header("Content-Disposition", "attachment", filename=clean_filename)
self.msg_root.attach(part)
def attach_file(self, n):
"""attach a file from the `FileData` table"""
_file = frappe.get_doc("File", {"file_name": n})
content = _file.get_content()
if not content:
return
self.add_attachment(_file.file_name, content)
def add_attachment(self, fname, fcontent, content_type=None, parent=None, content_id=None, inline=False):
"""add attachment"""
if not parent:
parent = self.msg_root
add_attachment(fname, fcontent, content_type, parent, content_id, inline)
def add_pdf_attachment(self, name, html, options=None):
self.add_attachment(name, get_pdf(html, options), "application/octet-stream")
def validate(self):
"""validate the Email Addresses"""
if not self.sender:
self.sender = self.email_account.default_sender
validate_email_address(strip(self.sender), True)
self.validate_reply_to()
if self.email_account.add_x_original_from:
self.set_header("X-Original-From", self.sender)
self.replace_sender()
self.replace_sender_name()
self.recipients = [strip(r) for r in self.recipients if r not in frappe.STANDARD_USERS]
self.cc = [strip(r) for r in self.cc if r not in frappe.STANDARD_USERS]
self.bcc = [strip(r) for r in self.bcc if r not in frappe.STANDARD_USERS]
for e in self.recipients + (self.cc or []) + (self.bcc or []):
validate_email_address(e, True)
def validate_reply_to(self) -> None:
if not self.email_account.add_reply_to_header:
self.reply_to = None
return
if self.email_account.reply_to_addresses:
valid_addresses = [
formataddr((reply_to._name, reply_to.email))
for reply_to in self.email_account.reply_to_addresses
if reply_to.email and validate_email_address(reply_to.email, True)
]
self.reply_to = ", ".join(valid_addresses) if valid_addresses else None
return
fallback = strip(self.reply_to) or self.sender
self.reply_to = validate_email_address(fallback, True)
def replace_sender(self):
if cint(self.email_account.always_use_account_email_id_as_sender):
sender_name, _ = parse_addr(self.sender)
self.sender = email.utils.formataddr(
(str(Header(sender_name or self.email_account.name, "utf-8")), self.email_account.email_id)
)
def replace_sender_name(self):
if cint(self.email_account.always_use_account_name_as_sender_name):
_, sender_email = parse_addr(self.sender)
self.sender = email.utils.formataddr(
(str(Header(self.email_account.name, "utf-8")), sender_email)
)
def set_message_id(self, message_id, is_notification=False):
if message_id:
message_id = "<" + message_id + ">"
else:
message_id = get_message_id()
self.set_header("isnotification", "<notification>")
if is_notification:
self.set_header("isnotification", "<notification>")
self.set_header("Message-Id", message_id)
def set_in_reply_to(self, in_reply_to):
"""Used to send the Message-Id of a received email back as In-Reply-To"""
self.set_header("In-Reply-To", in_reply_to)
def add_headers(self, headers):
"""Add custom headers to the email"""
if not isinstance(headers, dict):
frappe.throw(_("Headers must be a dictionary"))
for key, value in headers.items():
if value is not None:
key = "X-" + key if not key.startswith("X-") else key
self.set_header(key, value)
def make(self):
"""build into msg_root"""
headers = {
"Subject": strip(self.subject),
"From": self.sender,
"To": ", ".join(self.recipients) if self.expose_recipients == "header" else "<!--recipient-->",
"Date": email.utils.formatdate(),
"Reply-To": self.reply_to if self.reply_to else None,
# cc should always be visible - as that is the semantic meaning of cc, this should not be dependent on expose_recipients
"CC": ", ".join(self.cc) if self.cc else None,
"X-Frappe-Site": get_url(),
}
if self.x_priority != 3:
headers.update(
{
"X-Priority": str(self.x_priority),
}
)
# reset headers as values may be changed.
for key, val in headers.items():
if val:
self.set_header(key, val)
# call hook to enable apps to modify msg_root before sending
for hook in frappe.get_hooks("make_email_body_message"):
frappe.get_attr(hook)(self)
def set_header(self, key, value):
if key in self.msg_root:
# delete key if found
# this is done because adding the same key doesn't override
# the existing key, rather appends another header with same key.
del self.msg_root[key]
self.msg_root[key] = sanitize_email_header(value)
def as_string(self):
"""validate, build message and convert to string"""
self.validate()
self.make()
return self.msg_root.as_string(policy=policy.SMTP)
def get_formatted_html(
subject,
message,
footer=None,
print_html=None,
email_account=None,
header=None,
unsubscribe_link: frappe._dict | None = None,
sender=None,
with_container=False,
raw_html=False,
add_css=True,
):
email_account = email_account or EmailAccount.find_outgoing(match_by_email=sender)
params = {
"site_url": get_url(),
"title": subject,
"print_html": print_html,
"subject": subject,
}
if raw_html:
rendered_email = message
else:
params.update(
{
"brand_logo": get_brand_logo(email_account) if with_container or header else None,
"with_container": with_container,
"header": get_header(header),
"content": message,
"footer": get_footer(email_account, footer),
}
)
rendered_email = frappe.get_template("templates/emails/standard.html").render(params)
html = scrub_urls(rendered_email)
if unsubscribe_link:
html = html.replace("<!--unsubscribe link here-->", unsubscribe_link.html)
return inline_style_in_html(html, add_css=add_css)
@frappe.whitelist()
def get_email_html(
template: str,
args: str,
subject: str,
header: str | list | None = None,
with_container: str | int | bool = False,
):
import json
with_container = cint(with_container)
args = json.loads(args)
if header and header.startswith("["):
header = json.loads(header)
email = frappe.utils.jinja.get_email_from_template(template, args)
return get_formatted_html(subject, email[0], header=header, with_container=with_container)
def inline_style_in_html(html, add_css=True):
"""Convert email.css and html to inline-styled html."""
from premailer import Premailer
from frappe.utils.jinja_globals import bundled_asset
if add_css:
# get email css files from hooks
css_files = frappe.get_hooks("email_css")
css_files = [bundled_asset(path) for path in css_files]
css_files = [path.lstrip("/") for path in css_files]
css_files = [css_file for css_file in css_files if os.path.exists(os.path.abspath(css_file))]
else:
css_files = None
p = Premailer(
html=html, external_styles=css_files, strip_important=False, allow_loading_external_files=True
)
return p.transform()
def add_attachment(fname, fcontent, content_type=None, parent=None, content_id=None, inline=False):
"""Add attachment to parent which must an email object"""
import mimetypes
from email import encoders
from email.mime.audio import MIMEAudio
from email.mime.base import MIMEBase
from email.mime.image import MIMEImage
from email.mime.text import MIMEText
if not parent:
return
# Guess content type if not provided
if not content_type:
content_type, _encoding = mimetypes.guess_type(fname)
if content_type is None:
# No guess could be made, or the file is encoded (compressed), so
# use a generic bag-of-bits type.
content_type = "application/octet-stream"
maintype, subtype = content_type.split("/", 1)
if maintype == "text":
if isinstance(fcontent, bytes):
# If bytes are provided, assume UTF-8
fcontent = fcontent.decode("utf-8")
part = MIMEText(fcontent, _subtype=subtype, _charset="utf-8")
elif maintype == "image":
if isinstance(fcontent, str):
fcontent = fcontent.encode("utf-8")
part = MIMEImage(fcontent, _subtype=subtype)
elif maintype == "audio":
if isinstance(fcontent, str):
fcontent = fcontent.encode("utf-8")
part = MIMEAudio(fcontent, _subtype=subtype)
else:
if isinstance(fcontent, str):
fcontent = fcontent.encode("utf-8")
part = MIMEBase(maintype, subtype)
part.set_payload(fcontent)
encoders.encode_base64(part)
# Set the filename parameter
if fname:
attachment_type = "inline" if inline else "attachment"
clean_filename = re.sub(r"[\r\n]", "", str(fname))
part.add_header("Content-Disposition", attachment_type, filename=clean_filename)
if content_id:
part.add_header("Content-ID", f"<{content_id}>")
parent.attach(part)
def get_message_id():
"""Return Message ID created from doctype and name."""
return email.utils.make_msgid(domain=frappe.local.site)
def get_signature(email_account):
if email_account and email_account.add_signature and email_account.signature:
return "<br>" + email_account.signature
else:
return ""
def get_footer(email_account, footer=None):
"""append a footer (signature)"""
footer = footer or ""
args = {}
if email_account and email_account.footer:
args.update({"email_account_footer": email_account.footer})
sender_address = frappe.db.get_default("email_footer_address")
if sender_address:
args.update({"sender_address": sender_address})
if not cint(frappe.db.get_default("disable_standard_email_footer")):
args.update({"default_mail_footer": frappe.get_hooks("default_mail_footer")})
footer += frappe.utils.jinja.get_email_from_template("email_footer", args)[0]
return footer
def replace_filename_with_cid(message, provided_images=None):
"""Replaces <img embed="assets/frappe/images/filename.jpg" ...> with
<img src="cid:content_id" ...> and return the modified message and
a list of inline_images with {filename, filecontent, content_id}
Args:
message: The HTML message to process
provided_images: A dictionary of images to use instead of reading from disk
Example:
{
"assets/frappe/images/filename.jpg": filecontent,
"filename.jpg": filecontent,
}
"""
if provided_images is None:
provided_images = {}
inline_images = []
while True:
matches = EMBED_PATTERN.search(message)
if not matches:
break
groups = matches.groups()
# found match
img_path = groups[0]
img_path_escaped = frappe.utils.html_utils.unescape_html(img_path)
filename = img_path_escaped.rsplit("/")[-1]
# check if the image is provided in the provided_images(by checking full path and basename)
filecontent = provided_images.get(img_path_escaped) or provided_images.get(filename)
if not filecontent:
filecontent = get_filecontent_from_path(img_path_escaped)
if not filecontent:
message = re.sub(f"""embed=['"]{re.escape(img_path)}['"]""", "", message)
continue
content_id = random_string(10)
inline_images.append({"filename": filename, "filecontent": filecontent, "content_id": content_id})
message = re.sub(f"""embed=['"]{re.escape(img_path)}['"]""", f'src="cid:{content_id}"', message)
return (message, inline_images)
def get_filecontent_from_path(path):
if not path:
return
if path.startswith("/"):
path = path[1:]
if path.startswith("assets/"):
# from public folder
full_path = os.path.abspath(path)
elif path.startswith("files/"):
# public file
full_path = frappe.get_site_path("public", path)
elif path.startswith("private/files/"):
# private file
full_path = frappe.get_site_path(path)
else:
full_path = path
if os.path.exists(full_path):
with open(full_path, "rb") as f:
filecontent = f.read()
return filecontent
else:
return None
def get_header(header=None):
"""Build header from template"""
from frappe.utils.jinja import get_email_from_template
if not header:
return None
if isinstance(header, str):
# header = 'My Title'
header = [header, None]
if len(header) == 1:
# header = ['My Title']
header.append(None)
# header = ['My Title', 'orange']
title, indicator = header
if not title:
title = frappe.get_hooks("app_title")[-1]
email_header, _text = get_email_from_template(
"email_header", {"header_title": title, "indicator": indicator}
)
return email_header
def sanitize_email_header(header: str):
"""
Removes all line boundaries in the headers.
Email Policy (python's std) has some bugs in it which uses splitlines
and raises ValueError (ref: https://github.com/python/cpython/blob/main/Lib/email/policy.py#L143).
Hence removing all line boundaries while sanitization of headers to prevent such faliures.
The line boundaries which are removed can be found here: https://docs.python.org/3/library/stdtypes.html#str.splitlines
"""
return "".join(header.splitlines())
def get_brand_logo(email_account):
return email_account.get("brand_logo")