From 03bff35b5e89d6e11571de08ab3b4624925ba614 Mon Sep 17 00:00:00 2001 From: Safwan Samsudeen Date: Mon, 25 Aug 2025 13:23:32 +0530 Subject: [PATCH] chore: pre-commit --- frappe/core/doctype/file/file.py | 1732 ++++++++--------- .../file_uploader/file_uploader.bundle.js | 2 +- frappe/utils/file_manager.py | 2 +- 3 files changed, 850 insertions(+), 886 deletions(-) diff --git a/frappe/core/doctype/file/file.py b/frappe/core/doctype/file/file.py index adaf37d8b1..d111498c49 100755 --- a/frappe/core/doctype/file/file.py +++ b/frappe/core/doctype/file/file.py @@ -18,20 +18,20 @@ from frappe.exceptions import DoesNotExistError from frappe.model.document import Document from frappe.permissions import SYSTEM_USER_ROLE, get_doctypes_with_read from frappe.utils import ( - call_hook_method, - cint, - get_files_path, - get_hook_method, - get_url, + call_hook_method, + cint, + get_files_path, + get_hook_method, + get_url, ) from frappe.utils.file_manager import is_safe_path from frappe.utils.image import optimize_image, strip_exif_data from .exceptions import ( - AttachmentLimitReached, - FileTypeNotAllowed, - FolderNotEmpty, - MaxFileSizeReachedError, + AttachmentLimitReached, + FileTypeNotAllowed, + FolderNotEmpty, + MaxFileSizeReachedError, ) from .utils import * @@ -42,899 +42,863 @@ FILE_ENCODING_OPTIONS = ("utf-8-sig", "utf-8", "windows-1250", "windows-1252") class File(Document): - # begin: auto-generated types - # This code is auto-generated. Do not modify anything in this block. - - from typing import TYPE_CHECKING - - if TYPE_CHECKING: - from frappe.types import DF - - attached_to_doctype: DF.Link | None - attached_to_field: DF.Data | None - attached_to_name: DF.Data | None - content_hash: DF.Data | None - file_name: DF.Data | None - file_size: DF.Int - file_type: DF.Data | None - file_url: DF.Code | None - folder: DF.Link | None - is_attachments_folder: DF.Check - is_folder: DF.Check - is_home_folder: DF.Check - is_private: DF.Check - old_parent: DF.Data | None - thumbnail_url: DF.SmallText | None - uploaded_to_dropbox: DF.Check - uploaded_to_google_drive: DF.Check - # end: auto-generated types - - no_feed_on_delete = True - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - # if content is set, file_url will be generated - # decode comes in the picture if content passed has to be decoded before writing to disk - - self.content = self.get("content") or b"" - self.decode = self.get("decode", False) - - @property - def is_remote_file(self): - if self.file_url: - return self.file_url.startswith(URL_PREFIXES) - return not self.content - - def autoname(self): - """Set name for folder""" - if self.is_folder: - if self.folder: - self.name = self.get_name_based_on_parent_folder() - else: - # home - self.name = self.file_name - else: - self.name = frappe.generate_hash(length=10) - - def before_insert(self): - # Ensure correct formatting and type - self.file_url = unquote(self.file_url) if self.file_url else "" - - self.set_folder_name() - self.set_is_private() - self.set_file_name() - self.validate_attachment_limit() - self.set_file_type() - self.validate_file_extension() - - if self.is_folder: - return - - if self.is_remote_file: - self.validate_remote_file() - else: - self.save_file(content=self.get_content()) - self.flags.new_file = True - frappe.db.after_rollback.add(self.on_rollback) - - self.validate_duplicate_entry() # Hash is generated in save_file - - def after_insert(self): - if not self.is_folder: - self.create_attachment_record() - - def validate(self): - if self.is_folder: - return - - self.validate_attachment_references() - - # when dict is passed to get_doc for creation of new_doc, is_new returns None - # this case is handled inside handle_is_private_changed - if not self.is_new() and self.has_value_changed("is_private"): - self.handle_is_private_changed() - - self.validate_file_path() - self.validate_file_url() - self.validate_file_on_disk() - - self.file_size = frappe.form_dict.file_size or self.file_size - - def validate_attachment_references(self): - if not self.attached_to_doctype: - return - - if not self.attached_to_name or not isinstance( - self.attached_to_name, str | int - ): - frappe.throw( - _("Attached To Name must be a string or an integer"), - frappe.ValidationError, - ) - - if self.attached_to_field and SPECIAL_CHAR_PATTERN.search( - self.attached_to_field - ): - frappe.throw( - _("The fieldname you've specified in Attached To Field is invalid") - ) - - def after_rename(self, *args, **kwargs): - for successor in self.get_successors(): - setup_folder_path(successor, self.name) - - def on_trash(self): - if self.is_home_folder or self.is_attachments_folder: - frappe.throw(_("Cannot delete Home and Attachments folders")) - self.validate_empty_folder() - self.validate_protected_file() - self._delete_file_on_disk() - if not self.is_folder: - self.add_comment_in_reference_doc("Attachment Removed", self.file_name) - - def on_rollback(self): - rollback_flags = ("new_file", "original_content", "original_path") - - def pop_rollback_flags(): - for flag in rollback_flags: - self.flags.pop(flag, None) - - # following condition is only executed when an insert has been rolledback - if self.flags.new_file: - self._delete_file_on_disk() - pop_rollback_flags() - return - - # if original_content flag is set, this rollback should revert the file to its original state - if self.flags.original_content: - file_path = self.get_full_path() - - if isinstance(self.flags.original_content, bytes): - mode = "wb+" - elif isinstance(self.flags.original_content, str): - mode = "w+" - - with open(file_path, mode) as f: - f.write(self.flags.original_content) - os.fsync(f.fileno()) - pop_rollback_flags() - - # used in case file path (File.file_url) has been changed - if self.flags.original_path: - target = self.flags.original_path["old"] - source = self.flags.original_path["new"] - shutil.move(source, target) - pop_rollback_flags() - - def get_name_based_on_parent_folder(self) -> str | None: - if self.folder: - return os.path.join(self.folder, self.file_name) - - def get_successors(self): - return frappe.get_all("File", filters={"folder": self.name}, pluck="name") - - def validate_file_path(self): - if self.is_remote_file: - return - - base_path = os.path.realpath(get_files_path(is_private=self.is_private)) - if not os.path.realpath(self.get_full_path()).startswith(base_path): - frappe.throw( - _("The File URL you've entered is incorrect"), - title=_("Invalid File URL"), - ) - - def validate_file_url(self): - if self.is_remote_file or not self.file_url: - return - - if not self.file_url.startswith(("/files/", "/private/files/", "/api/method/")): - # Probably an invalid URL since it doesn't start with http and isn't an internal URL either - frappe.throw( - _("URL must start with http:// or https://"), - title=_("Invalid URL"), - ) - - def handle_is_private_changed(self): - if self.is_remote_file: - return - - from pathlib import Path - - old_file_url = self.file_url - file_name = self.file_url.split("/")[-1] - private_file_path = Path(frappe.get_site_path("private", "files", file_name)) - public_file_path = Path(frappe.get_site_path("public", "files", file_name)) - - if cint(self.is_private): - source = public_file_path - target = private_file_path - url_starts_with = "/private/files/" - else: - source = private_file_path - target = public_file_path - url_starts_with = "/files/" - updated_file_url = f"{url_starts_with}{file_name}" - - # if a file document is created by passing dict throught get_doc and __local is not set, - # handle_is_private_changed would be executed; we're checking if updated_file_url is same - # as old_file_url to avoid a FileNotFoundError for this case. - if updated_file_url == old_file_url: - return - - if not source.exists(): - frappe.throw( - _("Cannot find file {} on disk").format(source), - exc=FileNotFoundError, - ) - if target.exists(): - frappe.throw( - _("A file with same name {} already exists").format(target), - exc=FileExistsError, - ) - - # Uses os.rename which is an atomic operation - shutil.move(source, target) - self.flags.original_path = {"old": source, "new": target} - frappe.db.after_rollback.add(self.on_rollback) - - self.file_url = updated_file_url - update_existing_file_docs(self) - - if ( - not self.attached_to_doctype - or not self.attached_to_name - or not self.fetch_attached_to_field(old_file_url) - ): - return - - if frappe.get_meta(self.attached_to_doctype).issingle: - frappe.db.set_single_value( - self.attached_to_doctype, - self.attached_to_field, - self.file_url, - ) - else: - frappe.db.set_value( - self.attached_to_doctype, - self.attached_to_name, - self.attached_to_field, - self.file_url, - ) - - def fetch_attached_to_field(self, old_file_url): - if self.attached_to_field: - return True - - reference_dict = frappe.get_doc( - self.attached_to_doctype, self.attached_to_name - ).as_dict() - - for key, value in reference_dict.items(): - if value == old_file_url: - self.attached_to_field = key - return True - - def validate_attachment_limit(self): - attachment_limit = 0 - if self.attached_to_doctype and self.attached_to_name: - attachment_limit = cint( - frappe.get_meta(self.attached_to_doctype).max_attachments - ) - - if attachment_limit: - current_attachment_count = len( - frappe.get_all( - "File", - filters={ - "attached_to_doctype": self.attached_to_doctype, - "attached_to_name": self.attached_to_name, - }, - limit=attachment_limit + 1, - ) - ) - - if current_attachment_count >= attachment_limit: - frappe.throw( - _( - "Maximum Attachment Limit of {0} has been reached for {1} {2}." - ).format( - frappe.bold(attachment_limit), - self.attached_to_doctype, - self.attached_to_name, - ), - exc=AttachmentLimitReached, - title=_("Attachment Limit Reached"), - ) - - def validate_remote_file(self): - """Validates if file uploaded using URL already exist""" - site_url = get_url() - if ( - self.file_url - and "/files/" in self.file_url - and self.file_url.startswith(site_url) - ): - self.file_url = self.file_url.split(site_url, 1)[1] - - def set_folder_name(self): - """Make parent folders if not exists based on reference doctype and name""" - if self.folder: - return - - if self.attached_to_doctype: - self.folder = frappe.db.get_value("File", {"is_attachments_folder": 1}) - - elif not self.is_home_folder: - self.folder = "Home" - - def set_file_type(self): - if self.is_folder: - return - - file_type = mimetypes.guess_type(self.file_name)[0] - if not file_type: - return - - file_extension = mimetypes.guess_extension(file_type) - self.file_type = file_extension.lstrip(".").upper() if file_extension else None - - def validate_file_on_disk(self): - """Validates existence file""" - full_path = self.get_full_path() - - if full_path.startswith(URL_PREFIXES): - return True - - if not os.path.exists(full_path): - frappe.throw(_("File {0} does not exist").format(self.file_url), IOError) - - def validate_file_extension(self): - # Only validate uploaded files, not generated by code/integrations. - if not self.file_type or not frappe.request: - return - - allowed_extensions = frappe.get_system_settings("allowed_file_extensions") - if not allowed_extensions: - return - - if self.file_type not in allowed_extensions.splitlines(): - frappe.throw( - _("File type of {0} is not allowed").format(self.file_type), - exc=FileTypeNotAllowed, - ) - - def validate_duplicate_entry(self): - if not self.flags.ignore_duplicate_entry_error and not self.is_folder: - if not self.content_hash: - self.generate_content_hash() - - # check duplicate name - # check duplicate assignment - filters = { - "content_hash": self.content_hash, - "is_private": self.is_private, - } - - if self.name: - filters.update({"name": ("!=", self.name)}) - - if self.attached_to_doctype and self.attached_to_name: - filters.update( - { - "attached_to_doctype": self.attached_to_doctype, - "attached_to_name": self.attached_to_name, - } - ) - duplicate_file = frappe.db.get_value( - "File", filters, ["name", "file_url"], as_dict=1 - ) - - if duplicate_file: - duplicate_file_doc = frappe.get_cached_doc("File", duplicate_file.name) - if duplicate_file_doc.exists_on_disk(): - # just use the url, to avoid uploading a duplicate - self.file_url = duplicate_file.file_url - - def set_file_name(self): - if not self.file_name and not self.file_url: - frappe.throw( - _("Fields `file_name` or `file_url` must be set for File"), - exc=frappe.MandatoryError, - ) - elif not self.file_name and self.file_url: - self.file_name = self.file_url.split("/")[-1] - else: - self.file_name = re.sub(r"/", "", self.file_name) - - def generate_content_hash(self): - if self.content_hash or not self.file_url or self.is_remote_file: - return - file_name = self.file_url.split("/")[-1] - try: - file_path = get_files_path(file_name, is_private=self.is_private) - with open(file_path, "rb") as f: - self.content_hash = get_content_hash(f.read()) - except OSError: - frappe.throw(_("File {0} does not exist").format(file_path)) - - def make_thumbnail( - self, - set_as_thumbnail: bool = True, - width: int = 300, - height: int = 300, - suffix: str = "small", - crop: bool = False, - ) -> str: - from requests.exceptions import HTTPError, SSLError - - if not self.file_url: - return - - try: - if self.file_url.startswith(("/files", "/private/files")): - image, filename, extn = get_local_image(self.file_url) - else: - image, filename, extn = get_web_image(self.file_url) - except (HTTPError, SSLError, OSError, TypeError): - return - - size = width, height - if crop: - image = ImageOps.fit(image, size, Image.Resampling.LANCZOS) - else: - image.thumbnail(size, Image.Resampling.LANCZOS) - - thumbnail_url = f"{filename}_{suffix}.{extn}" - path = os.path.abspath( - frappe.get_site_path("public", thumbnail_url.lstrip("/")) - ) - - try: - image.save(path) - if set_as_thumbnail: - self.db_set("thumbnail_url", thumbnail_url) - - except OSError: - frappe.msgprint(_("Unable to write file format for {0}").format(path)) - return - - return thumbnail_url - - def validate_empty_folder(self): - """Throw exception if folder is not empty""" - if self.is_folder and frappe.get_all( - "File", filters={"folder": self.name}, limit=1 - ): - frappe.throw(_("Folder {0} is not empty").format(self.name), FolderNotEmpty) - - def validate_protected_file(self): - """Throw an exception if this file is attached to a doctype that protects files. - - Allows deleting the attached file if the linked document is in draft. If submitted, - deletion is not allowed. If canceled, requires delete permissions on the linked document. - """ - if not (self.attached_to_doctype and self.attached_to_name): - return - - try: - ref_doc = frappe.get_doc(self.attached_to_doctype, self.attached_to_name) - except DoesNotExistError: - return - - if ref_doc.docstatus == 0: - # If the document is not submitted yet, users can correct wrong attachments - return - - if not ref_doc.meta.protect_attached_files: - return - - if ref_doc.docstatus == 2 and ref_doc.has_permission("delete"): - # Deletion must still be possible if users have the permission to delete the linked document - return - - frappe.throw( - msg=_( - "This file is attached to a protected document and cannot be deleted." - ), - title=_("Protected File"), - ) - - def _delete_file_on_disk(self): - """If file not attached to any other record, delete it""" - on_disk_file_not_shared = self.content_hash and not frappe.get_all( - "File", - filters={ - "content_hash": self.content_hash, - "name": ["!=", self.name], - # NOTE: Some old Files might share file_urls while not sharing the is_private value - # "is_private": self.is_private, - }, - limit=1, - ) - if on_disk_file_not_shared: - self.delete_file_data_content() - else: - self.delete_file_data_content(only_thumbnail=True) - - def unzip(self) -> list["File"]: - """Unzip current file and replace it by its children""" - if not self.file_url.endswith(".zip"): - frappe.throw(_("{0} is not a zip file").format(self.file_name)) - - zip_path = self.get_full_path() - - files = [] - with zipfile.ZipFile(zip_path) as z: - for file in z.filelist: - if file.is_dir() or file.filename.startswith("__MACOSX/"): - # skip directories and macos hidden directory - continue - - filename = os.path.basename(file.filename) - if filename.startswith("."): - # skip hidden files - continue - - file_doc = frappe.new_doc("File") - try: - file_doc.content = z.read(file.filename) - except zipfile.BadZipFile: - frappe.throw( - _("{0} is a not a valid zip file").format(self.file_name) - ) - file_doc.file_name = filename - file_doc.folder = self.folder - file_doc.is_private = self.is_private - file_doc.attached_to_doctype = self.attached_to_doctype - file_doc.attached_to_name = self.attached_to_name - file_doc.save() - files.append(file_doc) - - frappe.delete_doc("File", self.name) - return files - - def exists_on_disk(self): - return os.path.exists(self.get_full_path()) - - def get_content(self, encodings=None) -> bytes | str: - if self.is_folder: - frappe.throw(_("Cannot get file contents of a Folder")) - - # if doc was just created, content field is already populated, return it as-is - if self.get("content"): - self._content = self.content - if self.decode: - self._content = decode_file_content(self._content) - self.decode = False - # self.content = None # TODO: This needs to happen; make it happen somehow - return self._content - - if self.file_url: - self.validate_file_url() - file_path = self.get_full_path() - - if encodings is None: - encodings = FILE_ENCODING_OPTIONS - with open(file_path, mode="rb") as f: - self._content = f.read() - # looping will not result in slowdown, as the content is usually utf-8 or utf-8-sig - # encoded so the first iteration will be enough most of the time - for encoding in encodings: - try: - # read file with proper encoding - self._content = self._content.decode(encoding) - break - except UnicodeDecodeError: - # for .png, .jpg, etc - continue - - return self._content - - def get_full_path(self): - """Return file path using the set file name.""" - - file_path = self.file_url or self.file_name - - site_url = get_url() - if "/files/" in file_path and file_path.startswith(site_url): - file_path = file_path.split(site_url, 1)[1] - - if "/" not in file_path: - if self.is_private: - file_path = f"/private/files/{file_path}" - else: - file_path = f"/files/{file_path}" - - if file_path.startswith("/private/files/"): - file_path = get_files_path( - *file_path.split("/private/files/", 1)[1].split("/"), is_private=1 - ) - - elif file_path.startswith("/files/"): - file_path = get_files_path(*file_path.split("/files/", 1)[1].split("/")) - - elif file_path.startswith(URL_PREFIXES): - pass - - elif not self.file_url: - frappe.throw( - _("There is some problem with the file url: {0}").format(file_path) - ) - - if not is_safe_path(file_path): - frappe.throw(_("Cannot access file path {0}").format(file_path)) - - if os.path.sep in self.file_name: - frappe.throw(_("File name cannot have {0}").format(os.path.sep)) - - return file_path - - def write_file(self): - """write file to disk with a random name (to compare)""" - if self.is_remote_file: - return - - file_path = self.get_full_path() - - if isinstance(self._content, str): - self._content = self._content.encode() - - with open(file_path, "wb+") as f: - f.write(self._content) - os.fsync(f.fileno()) - - frappe.db.after_rollback.add(self.on_rollback) - - return file_path - - def save_file( - self, - content: bytes | str | None = None, - decode=False, - ignore_existing_file_check=False, - overwrite=False, - ): - if self.is_remote_file: - return - - if not self.flags.new_file: - self.flags.original_content = self.get_content() - - if content: - self.content = content - self.decode = decode - self.get_content() - - if not self._content: - return - - file_exists = False - duplicate_file = None - - self.is_private = cint(self.is_private) - self.content_type = mimetypes.guess_type(self.file_name)[0] - - # transform file content based on site settings - if ( - self.content_type - and self.content_type == "image/jpeg" - and frappe.get_system_settings("strip_exif_metadata_from_uploaded_images") - ): - self._content = strip_exif_data(self._content, self.content_type) - - self.file_size = self.check_max_file_size() - self.content_hash = get_content_hash(self._content) - - # check if a file exists with the same content hash and is also in the same folder (public or private) - if not ignore_existing_file_check: - duplicate_file = frappe.get_value( - "File", - {"content_hash": self.content_hash, "is_private": self.is_private}, - ["file_url", "name"], - as_dict=True, - ) - - if duplicate_file: - file_doc: File = frappe.get_cached_doc("File", duplicate_file.name) - if file_doc.exists_on_disk(): - if self.exists_on_disk(): - if not self.file_url: - self.file_url = duplicate_file.file_url - else: - self.file_url = duplicate_file.file_url - file_exists = True - - if not file_exists: - if not overwrite: - self.file_name = generate_file_name( - name=self.file_name, - suffix=self.content_hash[-6:], - is_private=self.is_private, - ) - call_hook_method("before_write_file", file_size=self.file_size) - write_file_method = get_hook_method("write_file") - if write_file_method: - return write_file_method(self) - return self.save_file_on_filesystem() - - def save_file_on_filesystem(self): - safe_file_name = re.sub(r"[/\\%?#]", "_", self.file_name) - if self.is_private: - self.file_url = f"/private/files/{safe_file_name}" - else: - self.file_url = f"/files/{safe_file_name}" - - fpath = self.write_file() - - return {"file_name": os.path.basename(fpath), "file_url": self.file_url} - - def check_max_file_size(self): - from frappe.core.api.file import get_max_file_size - - max_file_size = get_max_file_size() - file_size = len(self._content or b"") - - if file_size > max_file_size: - msg = _("File size exceeded the maximum allowed size of {0} MB").format( - max_file_size / 1048576 - ) - if frappe.has_permission("System Settings", "write"): - msg += ".
" + _("You can increase the limit from System Settings.") - frappe.throw(msg, exc=MaxFileSizeReachedError) - - return file_size - - def delete_file_data_content(self, only_thumbnail=False): - method = get_hook_method("delete_file_data_content") - if method: - method(self, only_thumbnail=only_thumbnail) - else: - self.delete_file_from_filesystem(only_thumbnail=only_thumbnail) - - def delete_file_from_filesystem(self, only_thumbnail=False): - """Delete file, thumbnail from File document""" - if only_thumbnail: - delete_file(self.thumbnail_url) - else: - delete_file(self.file_url) - delete_file(self.thumbnail_url) - - def is_downloadable(self): - return has_permission(self, "read") - - def get_extension(self): - """Split and return filename and extension for the set `file_name`.""" - return os.path.splitext(self.file_name) - - def create_attachment_record(self): - icon = ' ' if self.is_private else "" - file_url = ( - quote(frappe.safe_encode(self.file_url), safe="/:") - if self.file_url - else self.file_name - ) - file_name = self.file_name or self.file_url - - self.add_comment_in_reference_doc( - "Attachment", - f"{file_name}{icon}", - ) - - def add_comment_in_reference_doc(self, comment_type, text): - if self.attached_to_doctype and self.attached_to_name: - try: - doc = frappe.get_doc(self.attached_to_doctype, self.attached_to_name) - doc.add_comment(comment_type, text) - except frappe.DoesNotExistError: - frappe.clear_messages() - - def set_is_private(self): - if self.is_private: - return - - if self.file_url: - self.is_private = cint(self.file_url.startswith("/private")) - - @frappe.whitelist() - def optimize_file(self): - if self.is_folder: - raise TypeError("Folders cannot be optimized") - - content_type = mimetypes.guess_type(self.file_name)[0] - is_local_image = content_type.startswith("image/") and self.file_size > 0 - is_svg = content_type == "image/svg+xml" - - if not is_local_image: - raise NotImplementedError("Only local image files can be optimized") - - if is_svg: - raise TypeError("Optimization of SVG images is not supported") - - original_content = self.get_content() - optimized_content = optimize_image( - content=original_content, - content_type=content_type, - ) - - self.save_file(content=optimized_content, overwrite=True) - self.save() - - @property - def unique_url(self) -> str: - """Unique URL contains file ID in URL to speed up permisison checks.""" - from urllib.parse import urlencode - - if self.is_private: - return self.file_url + "?" + urlencode({"fid": self.name}) - else: - return self.file_url - - @staticmethod - def zip_files(files): - zip_file = io.BytesIO() - zf = zipfile.ZipFile(zip_file, "w", zipfile.ZIP_DEFLATED) - for _file in files: - if isinstance(_file, str): - _file = frappe.get_doc("File", _file) - if not isinstance(_file, File): - continue - if _file.is_folder: - continue - if not has_permission(_file, "read"): - continue - zf.writestr(_file.file_name, _file.get_content()) - zf.close() - return zip_file.getvalue() + # begin: auto-generated types + # This code is auto-generated. Do not modify anything in this block. + + from typing import TYPE_CHECKING + + if TYPE_CHECKING: + from frappe.types import DF + + attached_to_doctype: DF.Link | None + attached_to_field: DF.Data | None + attached_to_name: DF.Data | None + content_hash: DF.Data | None + file_name: DF.Data | None + file_size: DF.Int + file_type: DF.Data | None + file_url: DF.Code | None + folder: DF.Link | None + is_attachments_folder: DF.Check + is_folder: DF.Check + is_home_folder: DF.Check + is_private: DF.Check + old_parent: DF.Data | None + thumbnail_url: DF.SmallText | None + uploaded_to_dropbox: DF.Check + uploaded_to_google_drive: DF.Check + # end: auto-generated types + + no_feed_on_delete = True + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + # if content is set, file_url will be generated + # decode comes in the picture if content passed has to be decoded before writing to disk + + self.content = self.get("content") or b"" + self.decode = self.get("decode", False) + + @property + def is_remote_file(self): + if self.file_url: + return self.file_url.startswith(URL_PREFIXES) + return not self.content + + def autoname(self): + """Set name for folder""" + if self.is_folder: + if self.folder: + self.name = self.get_name_based_on_parent_folder() + else: + # home + self.name = self.file_name + else: + self.name = frappe.generate_hash(length=10) + + def before_insert(self): + # Ensure correct formatting and type + self.file_url = unquote(self.file_url) if self.file_url else "" + + self.set_folder_name() + self.set_is_private() + self.set_file_name() + self.validate_attachment_limit() + self.set_file_type() + self.validate_file_extension() + + if self.is_folder: + return + + if self.is_remote_file: + self.validate_remote_file() + else: + self.save_file(content=self.get_content()) + self.flags.new_file = True + frappe.db.after_rollback.add(self.on_rollback) + + self.validate_duplicate_entry() # Hash is generated in save_file + + def after_insert(self): + if not self.is_folder: + self.create_attachment_record() + + def validate(self): + if self.is_folder: + return + + self.validate_attachment_references() + + # when dict is passed to get_doc for creation of new_doc, is_new returns None + # this case is handled inside handle_is_private_changed + if not self.is_new() and self.has_value_changed("is_private"): + self.handle_is_private_changed() + + self.validate_file_path() + self.validate_file_url() + self.validate_file_on_disk() + + self.file_size = frappe.form_dict.file_size or self.file_size + + def validate_attachment_references(self): + if not self.attached_to_doctype: + return + + if not self.attached_to_name or not isinstance(self.attached_to_name, str | int): + frappe.throw( + _("Attached To Name must be a string or an integer"), + frappe.ValidationError, + ) + + if self.attached_to_field and SPECIAL_CHAR_PATTERN.search(self.attached_to_field): + frappe.throw(_("The fieldname you've specified in Attached To Field is invalid")) + + def after_rename(self, *args, **kwargs): + for successor in self.get_successors(): + setup_folder_path(successor, self.name) + + def on_trash(self): + if self.is_home_folder or self.is_attachments_folder: + frappe.throw(_("Cannot delete Home and Attachments folders")) + self.validate_empty_folder() + self.validate_protected_file() + self._delete_file_on_disk() + if not self.is_folder: + self.add_comment_in_reference_doc("Attachment Removed", self.file_name) + + def on_rollback(self): + rollback_flags = ("new_file", "original_content", "original_path") + + def pop_rollback_flags(): + for flag in rollback_flags: + self.flags.pop(flag, None) + + # following condition is only executed when an insert has been rolledback + if self.flags.new_file: + self._delete_file_on_disk() + pop_rollback_flags() + return + + # if original_content flag is set, this rollback should revert the file to its original state + if self.flags.original_content: + file_path = self.get_full_path() + + if isinstance(self.flags.original_content, bytes): + mode = "wb+" + elif isinstance(self.flags.original_content, str): + mode = "w+" + + with open(file_path, mode) as f: + f.write(self.flags.original_content) + os.fsync(f.fileno()) + pop_rollback_flags() + + # used in case file path (File.file_url) has been changed + if self.flags.original_path: + target = self.flags.original_path["old"] + source = self.flags.original_path["new"] + shutil.move(source, target) + pop_rollback_flags() + + def get_name_based_on_parent_folder(self) -> str | None: + if self.folder: + return os.path.join(self.folder, self.file_name) + + def get_successors(self): + return frappe.get_all("File", filters={"folder": self.name}, pluck="name") + + def validate_file_path(self): + if self.is_remote_file: + return + + base_path = os.path.realpath(get_files_path(is_private=self.is_private)) + if not os.path.realpath(self.get_full_path()).startswith(base_path): + frappe.throw( + _("The File URL you've entered is incorrect"), + title=_("Invalid File URL"), + ) + + def validate_file_url(self): + if self.is_remote_file or not self.file_url: + return + + if not self.file_url.startswith(("/files/", "/private/files/", "/api/method/")): + # Probably an invalid URL since it doesn't start with http and isn't an internal URL either + frappe.throw( + _("URL must start with http:// or https://"), + title=_("Invalid URL"), + ) + + def handle_is_private_changed(self): + if self.is_remote_file: + return + + from pathlib import Path + + old_file_url = self.file_url + file_name = self.file_url.split("/")[-1] + private_file_path = Path(frappe.get_site_path("private", "files", file_name)) + public_file_path = Path(frappe.get_site_path("public", "files", file_name)) + + if cint(self.is_private): + source = public_file_path + target = private_file_path + url_starts_with = "/private/files/" + else: + source = private_file_path + target = public_file_path + url_starts_with = "/files/" + updated_file_url = f"{url_starts_with}{file_name}" + + # if a file document is created by passing dict throught get_doc and __local is not set, + # handle_is_private_changed would be executed; we're checking if updated_file_url is same + # as old_file_url to avoid a FileNotFoundError for this case. + if updated_file_url == old_file_url: + return + + if not source.exists(): + frappe.throw( + _("Cannot find file {} on disk").format(source), + exc=FileNotFoundError, + ) + if target.exists(): + frappe.throw( + _("A file with same name {} already exists").format(target), + exc=FileExistsError, + ) + + # Uses os.rename which is an atomic operation + shutil.move(source, target) + self.flags.original_path = {"old": source, "new": target} + frappe.db.after_rollback.add(self.on_rollback) + + self.file_url = updated_file_url + update_existing_file_docs(self) + + if ( + not self.attached_to_doctype + or not self.attached_to_name + or not self.fetch_attached_to_field(old_file_url) + ): + return + + if frappe.get_meta(self.attached_to_doctype).issingle: + frappe.db.set_single_value( + self.attached_to_doctype, + self.attached_to_field, + self.file_url, + ) + else: + frappe.db.set_value( + self.attached_to_doctype, + self.attached_to_name, + self.attached_to_field, + self.file_url, + ) + + def fetch_attached_to_field(self, old_file_url): + if self.attached_to_field: + return True + + reference_dict = frappe.get_doc(self.attached_to_doctype, self.attached_to_name).as_dict() + + for key, value in reference_dict.items(): + if value == old_file_url: + self.attached_to_field = key + return True + + def validate_attachment_limit(self): + attachment_limit = 0 + if self.attached_to_doctype and self.attached_to_name: + attachment_limit = cint(frappe.get_meta(self.attached_to_doctype).max_attachments) + + if attachment_limit: + current_attachment_count = len( + frappe.get_all( + "File", + filters={ + "attached_to_doctype": self.attached_to_doctype, + "attached_to_name": self.attached_to_name, + }, + limit=attachment_limit + 1, + ) + ) + + if current_attachment_count >= attachment_limit: + frappe.throw( + _("Maximum Attachment Limit of {0} has been reached for {1} {2}.").format( + frappe.bold(attachment_limit), + self.attached_to_doctype, + self.attached_to_name, + ), + exc=AttachmentLimitReached, + title=_("Attachment Limit Reached"), + ) + + def validate_remote_file(self): + """Validates if file uploaded using URL already exist""" + site_url = get_url() + if self.file_url and "/files/" in self.file_url and self.file_url.startswith(site_url): + self.file_url = self.file_url.split(site_url, 1)[1] + + def set_folder_name(self): + """Make parent folders if not exists based on reference doctype and name""" + if self.folder: + return + + if self.attached_to_doctype: + self.folder = frappe.db.get_value("File", {"is_attachments_folder": 1}) + + elif not self.is_home_folder: + self.folder = "Home" + + def set_file_type(self): + if self.is_folder: + return + + file_type = mimetypes.guess_type(self.file_name)[0] + if not file_type: + return + + file_extension = mimetypes.guess_extension(file_type) + self.file_type = file_extension.lstrip(".").upper() if file_extension else None + + def validate_file_on_disk(self): + """Validates existence file""" + full_path = self.get_full_path() + + if full_path.startswith(URL_PREFIXES): + return True + + if not os.path.exists(full_path): + frappe.throw(_("File {0} does not exist").format(self.file_url), IOError) + + def validate_file_extension(self): + # Only validate uploaded files, not generated by code/integrations. + if not self.file_type or not frappe.request: + return + + allowed_extensions = frappe.get_system_settings("allowed_file_extensions") + if not allowed_extensions: + return + + if self.file_type not in allowed_extensions.splitlines(): + frappe.throw( + _("File type of {0} is not allowed").format(self.file_type), + exc=FileTypeNotAllowed, + ) + + def validate_duplicate_entry(self): + if not self.flags.ignore_duplicate_entry_error and not self.is_folder: + if not self.content_hash: + self.generate_content_hash() + + # check duplicate name + # check duplicate assignment + filters = { + "content_hash": self.content_hash, + "is_private": self.is_private, + } + + if self.name: + filters.update({"name": ("!=", self.name)}) + + if self.attached_to_doctype and self.attached_to_name: + filters.update( + { + "attached_to_doctype": self.attached_to_doctype, + "attached_to_name": self.attached_to_name, + } + ) + duplicate_file = frappe.db.get_value("File", filters, ["name", "file_url"], as_dict=1) + + if duplicate_file: + duplicate_file_doc = frappe.get_cached_doc("File", duplicate_file.name) + if duplicate_file_doc.exists_on_disk(): + # just use the url, to avoid uploading a duplicate + self.file_url = duplicate_file.file_url + + def set_file_name(self): + if not self.file_name and not self.file_url: + frappe.throw( + _("Fields `file_name` or `file_url` must be set for File"), + exc=frappe.MandatoryError, + ) + elif not self.file_name and self.file_url: + self.file_name = self.file_url.split("/")[-1] + else: + self.file_name = re.sub(r"/", "", self.file_name) + + def generate_content_hash(self): + if self.content_hash or not self.file_url or self.is_remote_file: + return + file_name = self.file_url.split("/")[-1] + try: + file_path = get_files_path(file_name, is_private=self.is_private) + with open(file_path, "rb") as f: + self.content_hash = get_content_hash(f.read()) + except OSError: + frappe.throw(_("File {0} does not exist").format(file_path)) + + def make_thumbnail( + self, + set_as_thumbnail: bool = True, + width: int = 300, + height: int = 300, + suffix: str = "small", + crop: bool = False, + ) -> str: + from requests.exceptions import HTTPError, SSLError + + if not self.file_url: + return + + try: + if self.file_url.startswith(("/files", "/private/files")): + image, filename, extn = get_local_image(self.file_url) + else: + image, filename, extn = get_web_image(self.file_url) + except (HTTPError, SSLError, OSError, TypeError): + return + + size = width, height + if crop: + image = ImageOps.fit(image, size, Image.Resampling.LANCZOS) + else: + image.thumbnail(size, Image.Resampling.LANCZOS) + + thumbnail_url = f"{filename}_{suffix}.{extn}" + path = os.path.abspath(frappe.get_site_path("public", thumbnail_url.lstrip("/"))) + + try: + image.save(path) + if set_as_thumbnail: + self.db_set("thumbnail_url", thumbnail_url) + + except OSError: + frappe.msgprint(_("Unable to write file format for {0}").format(path)) + return + + return thumbnail_url + + def validate_empty_folder(self): + """Throw exception if folder is not empty""" + if self.is_folder and frappe.get_all("File", filters={"folder": self.name}, limit=1): + frappe.throw(_("Folder {0} is not empty").format(self.name), FolderNotEmpty) + + def validate_protected_file(self): + """Throw an exception if this file is attached to a doctype that protects files. + + Allows deleting the attached file if the linked document is in draft. If submitted, + deletion is not allowed. If canceled, requires delete permissions on the linked document. + """ + if not (self.attached_to_doctype and self.attached_to_name): + return + + try: + ref_doc = frappe.get_doc(self.attached_to_doctype, self.attached_to_name) + except DoesNotExistError: + return + + if ref_doc.docstatus == 0: + # If the document is not submitted yet, users can correct wrong attachments + return + + if not ref_doc.meta.protect_attached_files: + return + + if ref_doc.docstatus == 2 and ref_doc.has_permission("delete"): + # Deletion must still be possible if users have the permission to delete the linked document + return + + frappe.throw( + msg=_("This file is attached to a protected document and cannot be deleted."), + title=_("Protected File"), + ) + + def _delete_file_on_disk(self): + """If file not attached to any other record, delete it""" + on_disk_file_not_shared = self.content_hash and not frappe.get_all( + "File", + filters={ + "content_hash": self.content_hash, + "name": ["!=", self.name], + # NOTE: Some old Files might share file_urls while not sharing the is_private value + # "is_private": self.is_private, + }, + limit=1, + ) + if on_disk_file_not_shared: + self.delete_file_data_content() + else: + self.delete_file_data_content(only_thumbnail=True) + + def unzip(self) -> list["File"]: + """Unzip current file and replace it by its children""" + if not self.file_url.endswith(".zip"): + frappe.throw(_("{0} is not a zip file").format(self.file_name)) + + zip_path = self.get_full_path() + + files = [] + with zipfile.ZipFile(zip_path) as z: + for file in z.filelist: + if file.is_dir() or file.filename.startswith("__MACOSX/"): + # skip directories and macos hidden directory + continue + + filename = os.path.basename(file.filename) + if filename.startswith("."): + # skip hidden files + continue + + file_doc = frappe.new_doc("File") + try: + file_doc.content = z.read(file.filename) + except zipfile.BadZipFile: + frappe.throw(_("{0} is a not a valid zip file").format(self.file_name)) + file_doc.file_name = filename + file_doc.folder = self.folder + file_doc.is_private = self.is_private + file_doc.attached_to_doctype = self.attached_to_doctype + file_doc.attached_to_name = self.attached_to_name + file_doc.save() + files.append(file_doc) + + frappe.delete_doc("File", self.name) + return files + + def exists_on_disk(self): + return os.path.exists(self.get_full_path()) + + def get_content(self, encodings=None) -> bytes | str: + if self.is_folder: + frappe.throw(_("Cannot get file contents of a Folder")) + + # if doc was just created, content field is already populated, return it as-is + if self.get("content"): + self._content = self.content + if self.decode: + self._content = decode_file_content(self._content) + self.decode = False + # self.content = None # TODO: This needs to happen; make it happen somehow + return self._content + + if self.file_url: + self.validate_file_url() + file_path = self.get_full_path() + + if encodings is None: + encodings = FILE_ENCODING_OPTIONS + with open(file_path, mode="rb") as f: + self._content = f.read() + # looping will not result in slowdown, as the content is usually utf-8 or utf-8-sig + # encoded so the first iteration will be enough most of the time + for encoding in encodings: + try: + # read file with proper encoding + self._content = self._content.decode(encoding) + break + except UnicodeDecodeError: + # for .png, .jpg, etc + continue + + return self._content + + def get_full_path(self): + """Return file path using the set file name.""" + + file_path = self.file_url or self.file_name + + site_url = get_url() + if "/files/" in file_path and file_path.startswith(site_url): + file_path = file_path.split(site_url, 1)[1] + + if "/" not in file_path: + if self.is_private: + file_path = f"/private/files/{file_path}" + else: + file_path = f"/files/{file_path}" + + if file_path.startswith("/private/files/"): + file_path = get_files_path(*file_path.split("/private/files/", 1)[1].split("/"), is_private=1) + + elif file_path.startswith("/files/"): + file_path = get_files_path(*file_path.split("/files/", 1)[1].split("/")) + + elif file_path.startswith(URL_PREFIXES): + pass + + elif not self.file_url: + frappe.throw(_("There is some problem with the file url: {0}").format(file_path)) + + if not is_safe_path(file_path): + frappe.throw(_("Cannot access file path {0}").format(file_path)) + + if os.path.sep in self.file_name: + frappe.throw(_("File name cannot have {0}").format(os.path.sep)) + + return file_path + + def write_file(self): + """write file to disk with a random name (to compare)""" + if self.is_remote_file: + return + + file_path = self.get_full_path() + + if isinstance(self._content, str): + self._content = self._content.encode() + + with open(file_path, "wb+") as f: + f.write(self._content) + os.fsync(f.fileno()) + + frappe.db.after_rollback.add(self.on_rollback) + + return file_path + + def save_file( + self, + content: bytes | str | None = None, + decode=False, + ignore_existing_file_check=False, + overwrite=False, + ): + if self.is_remote_file: + return + + if not self.flags.new_file: + self.flags.original_content = self.get_content() + + if content: + self.content = content + self.decode = decode + self.get_content() + + if not self._content: + return + + file_exists = False + duplicate_file = None + + self.is_private = cint(self.is_private) + self.content_type = mimetypes.guess_type(self.file_name)[0] + + # transform file content based on site settings + if ( + self.content_type + and self.content_type == "image/jpeg" + and frappe.get_system_settings("strip_exif_metadata_from_uploaded_images") + ): + self._content = strip_exif_data(self._content, self.content_type) + + self.file_size = self.check_max_file_size() + self.content_hash = get_content_hash(self._content) + + # check if a file exists with the same content hash and is also in the same folder (public or private) + if not ignore_existing_file_check: + duplicate_file = frappe.get_value( + "File", + {"content_hash": self.content_hash, "is_private": self.is_private}, + ["file_url", "name"], + as_dict=True, + ) + + if duplicate_file: + file_doc: File = frappe.get_cached_doc("File", duplicate_file.name) + if file_doc.exists_on_disk(): + if self.exists_on_disk(): + if not self.file_url: + self.file_url = duplicate_file.file_url + else: + self.file_url = duplicate_file.file_url + file_exists = True + + if not file_exists: + if not overwrite: + self.file_name = generate_file_name( + name=self.file_name, + suffix=self.content_hash[-6:], + is_private=self.is_private, + ) + call_hook_method("before_write_file", file_size=self.file_size) + write_file_method = get_hook_method("write_file") + if write_file_method: + return write_file_method(self) + return self.save_file_on_filesystem() + + def save_file_on_filesystem(self): + safe_file_name = re.sub(r"[/\\%?#]", "_", self.file_name) + if self.is_private: + self.file_url = f"/private/files/{safe_file_name}" + else: + self.file_url = f"/files/{safe_file_name}" + + fpath = self.write_file() + + return {"file_name": os.path.basename(fpath), "file_url": self.file_url} + + def check_max_file_size(self): + from frappe.core.api.file import get_max_file_size + + max_file_size = get_max_file_size() + file_size = len(self._content or b"") + + if file_size > max_file_size: + msg = _("File size exceeded the maximum allowed size of {0} MB").format(max_file_size / 1048576) + if frappe.has_permission("System Settings", "write"): + msg += ".
" + _("You can increase the limit from System Settings.") + frappe.throw(msg, exc=MaxFileSizeReachedError) + + return file_size + + def delete_file_data_content(self, only_thumbnail=False): + method = get_hook_method("delete_file_data_content") + if method: + method(self, only_thumbnail=only_thumbnail) + else: + self.delete_file_from_filesystem(only_thumbnail=only_thumbnail) + + def delete_file_from_filesystem(self, only_thumbnail=False): + """Delete file, thumbnail from File document""" + if only_thumbnail: + delete_file(self.thumbnail_url) + else: + delete_file(self.file_url) + delete_file(self.thumbnail_url) + + def is_downloadable(self): + return has_permission(self, "read") + + def get_extension(self): + """Split and return filename and extension for the set `file_name`.""" + return os.path.splitext(self.file_name) + + def create_attachment_record(self): + icon = ' ' if self.is_private else "" + file_url = quote(frappe.safe_encode(self.file_url), safe="/:") if self.file_url else self.file_name + file_name = self.file_name or self.file_url + + self.add_comment_in_reference_doc( + "Attachment", + f"{file_name}{icon}", + ) + + def add_comment_in_reference_doc(self, comment_type, text): + if self.attached_to_doctype and self.attached_to_name: + try: + doc = frappe.get_doc(self.attached_to_doctype, self.attached_to_name) + doc.add_comment(comment_type, text) + except frappe.DoesNotExistError: + frappe.clear_messages() + + def set_is_private(self): + if self.is_private: + return + + if self.file_url: + self.is_private = cint(self.file_url.startswith("/private")) + + @frappe.whitelist() + def optimize_file(self): + if self.is_folder: + raise TypeError("Folders cannot be optimized") + + content_type = mimetypes.guess_type(self.file_name)[0] + is_local_image = content_type.startswith("image/") and self.file_size > 0 + is_svg = content_type == "image/svg+xml" + + if not is_local_image: + raise NotImplementedError("Only local image files can be optimized") + + if is_svg: + raise TypeError("Optimization of SVG images is not supported") + + original_content = self.get_content() + optimized_content = optimize_image( + content=original_content, + content_type=content_type, + ) + + self.save_file(content=optimized_content, overwrite=True) + self.save() + + @property + def unique_url(self) -> str: + """Unique URL contains file ID in URL to speed up permisison checks.""" + from urllib.parse import urlencode + + if self.is_private: + return self.file_url + "?" + urlencode({"fid": self.name}) + else: + return self.file_url + + @staticmethod + def zip_files(files): + zip_file = io.BytesIO() + zf = zipfile.ZipFile(zip_file, "w", zipfile.ZIP_DEFLATED) + for _file in files: + if isinstance(_file, str): + _file = frappe.get_doc("File", _file) + if not isinstance(_file, File): + continue + if _file.is_folder: + continue + if not has_permission(_file, "read"): + continue + zf.writestr(_file.file_name, _file.get_content()) + zf.close() + return zip_file.getvalue() def on_doctype_update(): - frappe.db.add_index("File", ["attached_to_doctype", "attached_to_name"]) - frappe.db.add_index("File", ["file_url(100)"]) + frappe.db.add_index("File", ["attached_to_doctype", "attached_to_name"]) + frappe.db.add_index("File", ["file_url(100)"]) def has_permission(doc, ptype=None, user=None, debug=False): - user = user or frappe.session.user + user = user or frappe.session.user - if user == "Administrator": - return True + if user == "Administrator": + return True - if not doc.is_private and ptype in ("read", "select"): - return True + if not doc.is_private and ptype in ("read", "select"): + return True - if user != "Guest" and doc.owner == user: - return True + if user != "Guest" and doc.owner == user: + return True - if doc.attached_to_doctype and doc.attached_to_name: - attached_to_doctype = doc.attached_to_doctype - attached_to_name = doc.attached_to_name + if doc.attached_to_doctype and doc.attached_to_name: + attached_to_doctype = doc.attached_to_doctype + attached_to_name = doc.attached_to_name - try: - ref_doc = frappe.get_doc(attached_to_doctype, attached_to_name) - except ModuleNotFoundError: - return False - except frappe.DoesNotExistError: - frappe.clear_last_message() - return False + try: + ref_doc = frappe.get_doc(attached_to_doctype, attached_to_name) + except ModuleNotFoundError: + return False + except frappe.DoesNotExistError: + frappe.clear_last_message() + return False - if ptype in ["write", "create", "delete"]: - return ref_doc.has_permission("write", debug=debug, user=user) - else: - return ref_doc.has_permission("read", debug=debug, user=user) + if ptype in ["write", "create", "delete"]: + return ref_doc.has_permission("write", debug=debug, user=user) + else: + return ref_doc.has_permission("read", debug=debug, user=user) - return False + return False def get_permission_query_conditions(user: str | None = None) -> str: - user = user or frappe.session.user - if user == "Administrator": - return "" + user = user or frappe.session.user + if user == "Administrator": + return "" - if SYSTEM_USER_ROLE not in frappe.get_roles(user): - return f""" `tabFile`.`owner` = {frappe.db.escape(user)} """ + if SYSTEM_USER_ROLE not in frappe.get_roles(user): + return f""" `tabFile`.`owner` = {frappe.db.escape(user)} """ - readable_doctypes = ", ".join(repr(dt) for dt in get_doctypes_with_read()) - return f""" + readable_doctypes = ", ".join(repr(dt) for dt in get_doctypes_with_read()) + return f""" (`tabFile`.`is_private` = 0) OR (`tabFile`.`attached_to_doctype` IS NULL AND `tabFile`.`owner` = {frappe.db.escape(user)}) OR (`tabFile`.`attached_to_doctype` IN ({readable_doctypes})) diff --git a/frappe/public/js/frappe/file_uploader/file_uploader.bundle.js b/frappe/public/js/frappe/file_uploader/file_uploader.bundle.js index fac859dbc6..323e608d3b 100644 --- a/frappe/public/js/frappe/file_uploader/file_uploader.bundle.js +++ b/frappe/public/js/frappe/file_uploader/file_uploader.bundle.js @@ -3,7 +3,7 @@ import FileUploaderComponent from "./FileUploader.vue"; import { watch } from "vue"; class FileUploader { - static UploadOptions = [] + static UploadOptions = []; constructor({ wrapper, method, diff --git a/frappe/utils/file_manager.py b/frappe/utils/file_manager.py index 08b22757d9..7867b612a1 100644 --- a/frappe/utils/file_manager.py +++ b/frappe/utils/file_manager.py @@ -417,7 +417,7 @@ def add_attachments(doctype, name, attachments): def is_safe_path(path: str) -> bool: - if path.startswith(("http://", "https://", "/api/method/")): + if path.startswith(("http://", "https://", "/api/method/")): return True basedir = frappe.get_site_path()