# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors # License: MIT. See LICENSE import email.utils import os import re from email import policy from email.header import Header from email.mime.multipart import MIMEMultipart import frappe from frappe.email.doctype.email_account.email_account import EmailAccount from frappe.utils import ( cint, expand_relative_urls, get_url, markdown, parse_addr, random_string, scrub_urls, split_emails, strip, to_markdown, ) from frappe.utils.pdf import get_pdf EMBED_PATTERN = re.compile("""embed=["'](.*?)["']""") def get_email( recipients, sender="", msg="", subject="[No Subject]", text_content=None, footer=None, print_html=None, formatted=None, attachments=None, content=None, reply_to=None, cc=None, bcc=None, email_account=None, expose_recipients=None, inline_images=None, header=None, ): """Prepare an email with the following format: - multipart/mixed - multipart/alternative - text/plain - multipart/related - text/html - inline image - attachment """ content = content or msg if cc is None: cc = [] if bcc is None: bcc = [] if inline_images is None: inline_images = [] emailobj = EMail( sender, recipients, subject, reply_to=reply_to, cc=cc, bcc=bcc, email_account=email_account, expose_recipients=expose_recipients, ) if not content.strip().startswith("<"): content = markdown(content) emailobj.set_html( content, text_content, footer=footer, header=header, print_html=print_html, formatted=formatted, inline_images=inline_images, ) if isinstance(attachments, dict): attachments = [attachments] for attach in attachments or []: # cannot attach if no filecontent if attach.get("fcontent") is None: continue emailobj.add_attachment(**attach) return emailobj class EMail: """ Wrapper on the email module. Email object represents emails to be sent to the client. Also provides a clean way to add binary `FileData` attachments Also sets all messages as multipart/alternative for cleaner reading in text-only clients """ def __init__( self, sender="", recipients=(), subject="", alternative=0, reply_to=None, cc=(), bcc=(), email_account=None, expose_recipients=None, ): from email import charset as Charset Charset.add_charset("utf-8", Charset.QP, Charset.QP, "utf-8") if isinstance(recipients, str): recipients = recipients.replace(";", ",").replace("\n", "") recipients = split_emails(recipients) # remove null recipients = filter(None, (strip(r) for r in recipients)) self.sender = sender self.reply_to = reply_to or sender self.recipients = recipients self.subject = subject self.expose_recipients = expose_recipients self.msg_root = MIMEMultipart("mixed", policy=policy.SMTPUTF8) self.msg_alternative = MIMEMultipart("alternative", policy=policy.SMTPUTF8) self.msg_root.attach(self.msg_alternative) self.cc = cc or [] self.bcc = bcc or [] self.html_set = False self.email_account = email_account or EmailAccount.find_outgoing( match_by_email=sender, _raise_error=True ) def set_html( self, message, text_content=None, footer=None, print_html=None, formatted=None, inline_images=None, header=None, ): """Attach message in the html portion of multipart/alternative""" if not formatted: formatted = get_formatted_html( self.subject, message, footer, print_html, email_account=self.email_account, header=header, sender=self.sender, ) # this is the first html part of a multi-part message, # convert to text well if not self.html_set: if text_content: self.set_text(expand_relative_urls(text_content)) else: self.set_html_as_text(expand_relative_urls(formatted)) self.set_part_html(formatted, inline_images) self.html_set = True def set_text(self, message): """ Attach message in the text portion of multipart/alternative """ from email.mime.text import MIMEText part = MIMEText(message, "plain", "utf-8", policy=policy.SMTPUTF8) self.msg_alternative.attach(part) def set_part_html(self, message, inline_images): from email.mime.text import MIMEText has_inline_images = EMBED_PATTERN.search(message) if has_inline_images: # process inline images message, _inline_images = replace_filename_with_cid(message) # prepare parts msg_related = MIMEMultipart("related", policy=policy.SMTPUTF8) html_part = MIMEText(message, "html", "utf-8", policy=policy.SMTPUTF8) msg_related.attach(html_part) for image in _inline_images: self.add_attachment( image.get("filename"), image.get("filecontent"), content_id=image.get("content_id"), parent=msg_related, inline=True, ) self.msg_alternative.attach(msg_related) else: self.msg_alternative.attach(MIMEText(message, "html", "utf-8", policy=policy.SMTPUTF8)) def set_html_as_text(self, html): """Set plain text from HTML""" self.set_text(to_markdown(html)) def set_message( self, message, mime_type="text/html", as_attachment=0, filename="attachment.html" ): """Append the message with MIME content to the root node (as attachment)""" from email.mime.text import MIMEText maintype, subtype = mime_type.split("/") part = MIMEText(message, _subtype=subtype, policy=policy.SMTPUTF8) if as_attachment: part.add_header("Content-Disposition", "attachment", filename=filename) self.msg_root.attach(part) def attach_file(self, n): """attach a file from the `FileData` table""" _file = frappe.get_doc("File", {"file_name": n}) content = _file.get_content() if not content: return self.add_attachment(_file.file_name, content) def add_attachment( self, fname, fcontent, content_type=None, parent=None, content_id=None, inline=False ): """add attachment""" if not parent: parent = self.msg_root add_attachment(fname, fcontent, content_type, parent, content_id, inline) def add_pdf_attachment(self, name, html, options=None): self.add_attachment(name, get_pdf(html, options), "application/octet-stream") def validate(self): """validate the Email Addresses""" from frappe.utils import validate_email_address if not self.sender: self.sender = self.email_account.default_sender validate_email_address(strip(self.sender), True) self.reply_to = validate_email_address(strip(self.reply_to) or self.sender, True) self.set_header("X-Original-From", self.sender) self.replace_sender() self.replace_sender_name() self.recipients = [strip(r) for r in self.recipients if r not in frappe.STANDARD_USERS] self.cc = [strip(r) for r in self.cc if r not in frappe.STANDARD_USERS] self.bcc = [strip(r) for r in self.bcc if r not in frappe.STANDARD_USERS] for e in self.recipients + (self.cc or []) + (self.bcc or []): validate_email_address(e, True) def replace_sender(self): if cint(self.email_account.always_use_account_email_id_as_sender): sender_name, _ = parse_addr(self.sender) self.sender = email.utils.formataddr( (str(Header(sender_name or self.email_account.name, "utf-8")), self.email_account.email_id) ) def replace_sender_name(self): if cint(self.email_account.always_use_account_name_as_sender_name): _, sender_email = parse_addr(self.sender) self.sender = email.utils.formataddr( (str(Header(self.email_account.name, "utf-8")), sender_email) ) def set_message_id(self, message_id, is_notification=False): if message_id: message_id = "<" + message_id + ">" else: message_id = get_message_id() self.set_header("isnotification", "") if is_notification: self.set_header("isnotification", "") self.set_header("Message-Id", message_id) def set_in_reply_to(self, in_reply_to): """Used to send the Message-Id of a received email back as In-Reply-To""" self.set_header("In-Reply-To", in_reply_to) def make(self): """build into msg_root""" headers = { "Subject": strip(self.subject), "From": self.sender, "To": ", ".join(self.recipients) if self.expose_recipients == "header" else "", "Date": email.utils.formatdate(), "Reply-To": self.reply_to if self.reply_to else None, "CC": ", ".join(self.cc) if self.cc and self.expose_recipients == "header" else None, "X-Frappe-Site": get_url(), } # reset headers as values may be changed. for key, val in headers.items(): if val: self.set_header(key, val) # call hook to enable apps to modify msg_root before sending for hook in frappe.get_hooks("make_email_body_message"): frappe.get_attr(hook)(self) def set_header(self, key, value): if key in self.msg_root: # delete key if found # this is done because adding the same key doesn't override # the existing key, rather appends another header with same key. del self.msg_root[key] self.msg_root[key] = sanitize_email_header(value) def as_string(self): """validate, build message and convert to string""" self.validate() self.make() return self.msg_root.as_string(policy=policy.SMTPUTF8) def get_formatted_html( subject, message, footer=None, print_html=None, email_account=None, header=None, unsubscribe_link: frappe._dict | None = None, sender=None, with_container=False, ): email_account = email_account or EmailAccount.find_outgoing(match_by_email=sender) rendered_email = frappe.get_template("templates/emails/standard.html").render( { "brand_logo": get_brand_logo(email_account) if with_container or header else None, "with_container": with_container, "site_url": get_url(), "header": get_header(header), "content": message, "footer": get_footer(email_account, footer), "title": subject, "print_html": print_html, "subject": subject, } ) html = scrub_urls(rendered_email) if unsubscribe_link: html = html.replace("", unsubscribe_link.html) return inline_style_in_html(html) @frappe.whitelist() def get_email_html(template, args, subject, header=None, with_container=False): import json with_container = cint(with_container) args = json.loads(args) if header and header.startswith("["): header = json.loads(header) email = frappe.utils.jinja.get_email_from_template(template, args) return get_formatted_html(subject, email[0], header=header, with_container=with_container) def inline_style_in_html(html): """Convert email.css and html to inline-styled html""" from premailer import Premailer from frappe.utils.jinja_globals import bundled_asset # get email css files from hooks css_files = frappe.get_hooks("email_css") css_files = [bundled_asset(path) for path in css_files] css_files = [path.lstrip("/") for path in css_files] css_files = [css_file for css_file in css_files if os.path.exists(os.path.abspath(css_file))] p = Premailer( html=html, external_styles=css_files, strip_important=False, allow_loading_external_files=True ) return p.transform() def add_attachment(fname, fcontent, content_type=None, parent=None, content_id=None, inline=False): """Add attachment to parent which must an email object""" import mimetypes from email.mime.audio import MIMEAudio from email.mime.base import MIMEBase from email.mime.image import MIMEImage from email.mime.text import MIMEText if not content_type: content_type, encoding = mimetypes.guess_type(fname) if not parent: return if content_type is None: # No guess could be made, or the file is encoded (compressed), so # use a generic bag-of-bits type. content_type = "application/octet-stream" maintype, subtype = content_type.split("/", 1) if maintype == "text": # Note: we should handle calculating the charset if isinstance(fcontent, str): fcontent = fcontent.encode("utf-8") part = MIMEText(fcontent, _subtype=subtype, _charset="utf-8") elif maintype == "image": part = MIMEImage(fcontent, _subtype=subtype) elif maintype == "audio": part = MIMEAudio(fcontent, _subtype=subtype) else: part = MIMEBase(maintype, subtype) part.set_payload(fcontent) # Encode the payload using Base64 from email import encoders encoders.encode_base64(part) # Set the filename parameter if fname: attachment_type = "inline" if inline else "attachment" part.add_header("Content-Disposition", attachment_type, filename=str(fname)) if content_id: part.add_header("Content-ID", f"<{content_id}>") parent.attach(part) def get_message_id(): """Returns Message ID created from doctype and name""" return email.utils.make_msgid(domain=frappe.local.site) def get_signature(email_account): if email_account and email_account.add_signature and email_account.signature: return "
" + email_account.signature else: return "" def get_footer(email_account, footer=None): """append a footer (signature)""" footer = footer or "" args = {} if email_account and email_account.footer: args.update({"email_account_footer": email_account.footer}) sender_address = frappe.db.get_default("email_footer_address") if sender_address: args.update({"sender_address": sender_address}) if not cint(frappe.db.get_default("disable_standard_email_footer")): args.update({"default_mail_footer": frappe.get_hooks("default_mail_footer")}) footer += frappe.utils.jinja.get_email_from_template("email_footer", args)[0] return footer def replace_filename_with_cid(message): """Replaces with and return the modified message and a list of inline_images with {filename, filecontent, content_id} """ inline_images = [] while True: matches = EMBED_PATTERN.search(message) if not matches: break groups = matches.groups() # found match img_path = groups[0] filename = img_path.rsplit("/")[-1] filecontent = get_filecontent_from_path(img_path) if not filecontent: message = re.sub(f"""embed=['"]{img_path}['"]""", "", message) continue content_id = random_string(10) inline_images.append( {"filename": filename, "filecontent": filecontent, "content_id": content_id} ) message = re.sub(f"""embed=['"]{img_path}['"]""", f'src="cid:{content_id}"', message) return (message, inline_images) def get_filecontent_from_path(path): if not path: return if path.startswith("/"): path = path[1:] if path.startswith("assets/"): # from public folder full_path = os.path.abspath(path) elif path.startswith("files/"): # public file full_path = frappe.get_site_path("public", path) elif path.startswith("private/files/"): # private file full_path = frappe.get_site_path(path) else: full_path = path if os.path.exists(full_path): with open(full_path, "rb") as f: filecontent = f.read() return filecontent else: return None def get_header(header=None): """Build header from template""" from frappe.utils.jinja import get_email_from_template if not header: return None if isinstance(header, str): # header = 'My Title' header = [header, None] if len(header) == 1: # header = ['My Title'] header.append(None) # header = ['My Title', 'orange'] title, indicator = header if not title: title = frappe.get_hooks("app_title")[-1] email_header, text = get_email_from_template( "email_header", {"header_title": title, "indicator": indicator} ) return email_header def sanitize_email_header(header: str): """ Removes all line boundaries in the headers. Email Policy (python's std) has some bugs in it which uses splitlines and raises ValueError (ref: https://github.com/python/cpython/blob/main/Lib/email/policy.py#L143). Hence removing all line boundaries while sanitization of headers to prevent such faliures. The line boundaries which are removed can be found here: https://docs.python.org/3/library/stdtypes.html#str.splitlines """ return "".join(header.splitlines()) def get_brand_logo(email_account): return email_account.get("brand_logo")