diff --git a/frappe/core/doctype/file/file.py b/frappe/core/doctype/file/file.py
index effa475651..3f4c2069e1 100755
--- a/frappe/core/doctype/file/file.py
+++ b/frappe/core/doctype/file/file.py
@@ -6,7 +6,7 @@ import mimetypes
import os
import re
import shutil
-from typing import List
+from typing import Union, List
import zipfile
from requests.exceptions import HTTPError, SSLError
@@ -31,19 +31,10 @@ ImageFile.LOAD_TRUNCATED_IMAGES = True
class File(Document):
no_feed_on_delete = True
- def before_insert(self):
- frappe.local.rollback_observers.append(self)
- self.set_folder_name()
- if self.file_name:
- self.file_name = re.sub(r'/', '', self.file_name)
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
self.content = self.get("content", None)
self.decode = self.get("decode", False)
- if self.content:
- self.save_file(content=self.content, decode=self.decode)
-
- def get_name_based_on_parent_folder(self):
- if self.folder:
- return os.path.join(self.folder, self.file_name)
def autoname(self):
"""Set name for folder"""
@@ -56,90 +47,109 @@ class File(Document):
else:
self.name = frappe.generate_hash(length=10)
+ def before_insert(self):
+ frappe.local.rollback_observers.append(self)
+ self.set_folder_name()
+ self.set_file_name()
+ self.save_file(content=self.content, decode=self.decode)
+
def after_insert(self):
if not self.is_folder:
- self.add_comment_in_reference_doc('Attachment',
- _('Added {0}').format("{file_name}{icon}".format(**{
- "icon": ' ' if self.is_private else "",
- "file_url": quote(frappe.safe_encode(self.file_url)) if self.file_url else self.file_name,
- "file_name": self.file_name or self.file_url
- })))
-
- def after_rename(self, olddn, newdn, merge=False):
- for successor in self.get_successor():
- setup_folder_path(successor[0], self.name)
-
- def get_successor(self):
- return frappe.db.get_values(doctype='File',
- filters={'folder': self.name},
- fieldname='name')
+ self.create_attachment_record()
+ self.set_is_private()
+ self.set_file_name()
+ self.validate_duplicate_entry()
+ self.validate_attachment_limit()
def validate(self):
- if self.is_new():
- self.set_is_private()
- self.set_file_name()
- self.validate_duplicate_entry()
- self.validate_attachment_limit()
+ # Ensure correct formatting and type
+ self.file_url = unquote(self.file_url)
- self.validate_folder()
+ if not self.is_new() and self.has_value_changed("is_private"):
+ self.handle_is_private_changed()
if self.is_folder:
self.file_url = ""
else:
- self.validate_url()
+ self.validate_file_on_disk()
+ self.validate_file_url()
+ self.validate_file_path()
self.file_size = frappe.form_dict.file_size or self.file_size
- def validate_url(self):
- if not self.file_url or self.file_url.startswith(("http://", "https://")):
- if not self.flags.ignore_file_validate:
- self.validate_file()
+ def after_rename(self, *args, **kwargs):
+ for successor in self.get_successors():
+ setup_folder_path(successor, self.name)
- return
-
- # Probably an invalid web URL
- if not self.file_url.startswith(("/files/", "/private/files/")):
- frappe.throw(
- _("URL must start with http:// or https://"),
- title=_('Invalid URL')
+ def on_trash(self):
+ if self.is_home_folder or self.is_attachments_folder:
+ frappe.throw(_("Cannot delete Home and Attachments folders"))
+ self.validate_empty_folder()
+ self._delete_file_on_disk()
+ if not self.is_folder:
+ self.add_comment_in_reference_doc(
+ "Attachment Removed", _("Removed {0}").format(self.file_name)
)
- # Ensure correct formatting and type
- self.file_url = unquote(self.file_url)
- self.is_private = cint(self.is_private)
+ def on_rollback(self):
+ # if original_content flag is set, this rollback should revert the file to its original state
+ if self.flags.original_content:
+ file_path = self.get_full_path()
+ with open(file_path, "wb+") as f:
+ f.write(self.flags.original_content)
- self.handle_is_private_changed()
+ # following condition is only executed when an insert has been rolledback
+ else:
+ self.flags.on_rollback = True
+ self.on_trash()
+
+ def get_name_based_on_parent_folder(self) -> Union[str, None]:
+ if self.folder:
+ return os.path.join(self.folder, self.file_name)
+
+ def get_successors(self):
+ return frappe.get_all(
+ "File", filters={"folder": self.name}, pluck="name"
+ )
+
+ def validate_file_path(self):
+ if self.file_url.startswith(("http://", "https://")):
+ return
base_path = os.path.realpath(get_files_path(is_private=self.is_private))
if not os.path.realpath(self.get_full_path()).startswith(base_path):
frappe.throw(
_("The File URL you've entered is incorrect"),
- title=_('Invalid File URL')
+ title=_("Invalid File URL"),
+ )
+
+ def validate_file_url(self):
+ if self.file_url.startswith(("http://", "https://")):
+ return
+
+ if not self.file_url:
+ return
+
+ if not self.file_url.startswith(("/files/", "/private/files/")):
+ # Probably an invalid URL since it doesn't start with http either
+ frappe.throw(
+ _("URL must start with http:// or https://"), title=_("Invalid URL")
)
def handle_is_private_changed(self):
- if not frappe.db.exists(
- 'File', {
- 'name': self.name,
- 'is_private': cint(not self.is_private)
- }
- ):
- return
-
old_file_url = self.file_url
+ file_name = self.file_url.split("/")[-1]
+ private_file_path = frappe.get_site_path("private", "files", file_name)
+ public_file_path = frappe.get_site_path("public", "files", file_name)
- file_name = self.file_url.split('/')[-1]
- private_file_path = frappe.get_site_path('private', 'files', file_name)
- public_file_path = frappe.get_site_path('public', 'files', file_name)
-
- if self.is_private:
+ if cint(self.is_private):
shutil.move(public_file_path, private_file_path)
url_starts_with = "/private/files/"
else:
shutil.move(private_file_path, public_file_path)
url_starts_with = "/files/"
- self.file_url = "{0}{1}".format(url_starts_with, file_name)
+ self.file_url = f"{url_starts_with}{file_name}"
update_existing_file_docs(self)
if (
@@ -149,8 +159,12 @@ class File(Document):
):
return
- frappe.db.set_value(self.attached_to_doctype, self.attached_to_name,
- self.attached_to_field, self.file_url)
+ frappe.db.set_value(
+ self.attached_to_doctype,
+ self.attached_to_name,
+ self.attached_to_field,
+ self.file_url,
+ )
def fetch_attached_to_field(self, old_file_url):
if self.attached_to_field:
@@ -186,21 +200,21 @@ class File(Document):
def set_folder_name(self):
"""Make parent folders if not exists based on reference doctype and name"""
- if self.attached_to_doctype and not self.folder:
+ if self.folder:
+ return
+
+ if self.attached_to_doctype:
self.folder = frappe.db.get_value("File", {"is_attachments_folder": 1})
- def validate_folder(self):
- if not self.is_home_folder and not self.folder and \
- not self.flags.ignore_folder_validate:
+ if not self.is_home_folder:
self.folder = "Home"
- def validate_file(self):
- """Validates existence of public file
- TODO: validate for private file
+ def validate_file_on_disk(self):
+ """Validates existence file
"""
full_path = self.get_full_path()
- if full_path.startswith('http'):
+ if full_path.startswith("http"):
return True
if not os.path.exists(full_path):
@@ -214,33 +228,37 @@ class File(Document):
# check duplicate name
# check duplicate assignment
filters = {
- 'content_hash': self.content_hash,
- 'is_private': self.is_private,
- 'name': ('!=', self.name)
+ "content_hash": self.content_hash,
+ "is_private": self.is_private,
+ "name": ("!=", self.name),
}
if self.attached_to_doctype and self.attached_to_name:
- filters.update({
- 'attached_to_doctype': self.attached_to_doctype,
- 'attached_to_name': self.attached_to_name
- })
- duplicate_file = frappe.db.get_value('File', filters, ['name', 'file_url'], as_dict=1)
+ filters.update(
+ {
+ "attached_to_doctype": self.attached_to_doctype,
+ "attached_to_name": self.attached_to_name,
+ }
+ )
+ duplicate_file = frappe.db.get_value(
+ "File", filters, ["name", "file_url"], as_dict=1
+ )
if duplicate_file:
- duplicate_file_doc = frappe.get_cached_doc('File', duplicate_file.name)
+ duplicate_file_doc = frappe.get_cached_doc("File", duplicate_file.name)
if duplicate_file_doc.exists_on_disk():
- # just use the url, to avoid uploading a duplicate
- self.file_url = duplicate_file.file_url
+ # just use the url, to avoid uploading a duplicate
+ self.file_url = duplicate_file.file_url
def set_file_name(self):
if not self.file_name and self.file_url:
- self.file_name = self.file_url.split('/')[-1]
+ self.file_name = self.file_url.split("/")[-1]
else:
- self.file_name = re.sub(r'/', '', self.file_name)
+ self.file_name = re.sub(r"/", "", self.file_name)
def generate_content_hash(self):
- if self.content_hash or not self.file_url or self.file_url.startswith('http'):
+ if self.content_hash or not self.file_url or self.file_url.startswith("http"):
return
- file_name = self.file_url.split('/')[-1]
+ file_name = self.file_url.split("/")[-1]
try:
file_path = get_files_path(file_name, is_private=self.is_private)
with open(file_path, "rb") as f:
@@ -248,73 +266,68 @@ class File(Document):
except IOError:
frappe.throw(_("File {0} does not exist").format(file_path))
- def on_trash(self):
- if self.is_home_folder or self.is_attachments_folder:
- frappe.throw(_("Cannot delete Home and Attachments folders"))
- self.check_folder_is_empty()
- self.call_delete_file()
- if not self.is_folder:
- self.add_comment_in_reference_doc('Attachment Removed', _("Removed {0}").format(self.file_name))
+ def make_thumbnail(
+ self,
+ set_as_thumbnail: bool = True,
+ width: int = 300,
+ height: int = 300,
+ suffix: str = "small",
+ crop: bool = False,
+ ) -> str:
+ if not self.file_url:
+ return
- def make_thumbnail(self, set_as_thumbnail=True, width=300, height=300, suffix="small", crop=False):
- if self.file_url:
- try:
- if self.file_url.startswith(("/files", "/private/files")):
- image, filename, extn = get_local_image(self.file_url)
- else:
- image, filename, extn = get_web_image(self.file_url)
- except (HTTPError, SSLError, IOError, TypeError):
- return
-
- size = width, height
- if crop:
- image = ImageOps.fit(image, size, Image.ANTIALIAS)
+ try:
+ if self.file_url.startswith(("/files", "/private/files")):
+ image, filename, extn = get_local_image(self.file_url)
else:
- image.thumbnail(size, Image.ANTIALIAS)
+ image, filename, extn = get_web_image(self.file_url)
+ except (HTTPError, SSLError, IOError, TypeError):
+ return
- thumbnail_url = filename + "_" + suffix + "." + extn
- path = os.path.abspath(frappe.get_site_path("public", thumbnail_url.lstrip("/")))
+ size = width, height
+ if crop:
+ image = ImageOps.fit(image, size, Image.ANTIALIAS)
+ else:
+ image.thumbnail(size, Image.ANTIALIAS)
- try:
- image.save(path)
- if set_as_thumbnail:
- self.db_set("thumbnail_url", thumbnail_url)
+ thumbnail_url = f"{filename}_{suffix}.{extn}"
+ path = os.path.abspath(
+ frappe.get_site_path("public", thumbnail_url.lstrip("/"))
+ )
- except IOError:
- frappe.msgprint(_("Unable to write file format for {0}").format(path))
- return
+ try:
+ image.save(path)
+ if set_as_thumbnail:
+ self.db_set("thumbnail_url", thumbnail_url)
- return thumbnail_url
+ except IOError:
+ frappe.msgprint(_("Unable to write file format for {0}").format(path))
+ return
- def check_folder_is_empty(self):
+ return thumbnail_url
+
+ def validate_empty_folder(self):
"""Throw exception if folder is not empty"""
- files = frappe.get_all("File", filters={"folder": self.name}, fields=("name", "file_name"))
-
- if self.is_folder and files:
+ if self.is_folder and frappe.get_all("File", filters={"folder": self.name}, limit=1):
frappe.throw(_("Folder {0} is not empty").format(self.name), FolderNotEmpty)
- def call_delete_file(self):
+ def _delete_file_on_disk(self):
"""If file not attached to any other record, delete it"""
- if self.file_name and self.content_hash and (not frappe.db.count("File",
- {"content_hash": self.content_hash, "name": ["!=", self.name]})):
- self.delete_file_data_content()
+ on_disk_file_not_shared = (
+ self.content_hash
+ and not frappe.get_all("File",
+ filters={"content_hash": self.content_hash, "name": ["!=", self.name]},
+ limit=1,
+ )
+ )
+ if self.file_name and on_disk_file_not_shared:
+ self.delete_file_data_content()
elif self.file_url:
self.delete_file_data_content(only_thumbnail=True)
- def on_rollback(self):
- # if original_content flag is set, this rollback should revert the file to its original state
- if self.flags.original_content:
- file_path = self.get_full_path()
- with open(file_path, "wb+") as f:
- f.write(self.flags.original_content)
-
- # following condition is only executed when an insert has been rolledback
- else:
- self.flags.on_rollback = True
- self.on_trash()
-
def unzip(self) -> List["File"]:
- '''Unzip current file and replace it by its children'''
+ """Unzip current file and replace it by its children"""
if not self.file_url.endswith(".zip"):
frappe.throw(_("{0} is not a zip file").format(self.file_name))
@@ -323,16 +336,16 @@ class File(Document):
files = []
with zipfile.ZipFile(zip_path) as z:
for file in z.filelist:
- if file.is_dir() or file.filename.startswith('__MACOSX/'):
+ if file.is_dir() or file.filename.startswith("__MACOSX/"):
# skip directories and macos hidden directory
continue
filename = os.path.basename(file.filename)
- if filename.startswith('.'):
+ if filename.startswith("."):
# skip hidden files
continue
- file_doc = frappe.new_doc('File')
+ file_doc = frappe.new_doc("File")
file_doc.content = z.read(file.filename)
file_doc.file_name = filename
file_doc.folder = self.folder
@@ -342,28 +355,25 @@ class File(Document):
file_doc.save()
files.append(file_doc)
- frappe.delete_doc('File', self.name)
+ frappe.delete_doc("File", self.name)
return files
-
def exists_on_disk(self):
- exists = os.path.exists(self.get_full_path())
- return exists
+ return os.path.exists(self.get_full_path())
-
- def get_content(self):
+ def get_content(self) -> bytes:
"""Returns [`file_name`, `content`] for given file name `fname`"""
if self.is_folder:
frappe.throw(_("Cannot get file contents of a Folder"))
- if self.get('content'):
+ if self.get("content"):
return self.content
- self.validate_url()
+ self.validate_file_url()
file_path = self.get_full_path()
# read the file
- with io.open(encode(file_path), mode='rb') as f:
+ with io.open(encode(file_path), mode="rb") as f:
content = f.read()
try:
# for plain text files
@@ -383,7 +393,9 @@ class File(Document):
file_path = "/files/" + file_path
if file_path.startswith("/private/files/"):
- file_path = get_files_path(*file_path.split("/private/files/", 1)[1].split("/"), is_private=1)
+ file_path = get_files_path(
+ *file_path.split("/private/files/", 1)[1].split("/"), is_private=1
+ )
elif file_path.startswith("/files/"):
file_path = get_files_path(*file_path.split("/files/", 1)[1].split("/"))
@@ -392,7 +404,9 @@ class File(Document):
pass
elif not self.file_url:
- frappe.throw(_("There is some problem with the file url: {0}").format(file_path))
+ frappe.throw(
+ _("There is some problem with the file url: {0}").format(file_path)
+ )
if not is_safe_path(file_path):
frappe.throw(f"Cannot access file path {file_path}")
@@ -404,7 +418,7 @@ class File(Document):
file_path = get_files_path(is_private=self.is_private)
if os.path.sep in self.file_name:
- frappe.throw(_('File name cannot have {0}').format(os.path.sep))
+ frappe.throw(_("File name cannot have {0}").format(os.path.sep))
# create directory (if not exists)
frappe.create_folder(file_path)
@@ -412,12 +426,16 @@ class File(Document):
self.content = self.get_content()
if isinstance(self.content, str):
self.content = self.content.encode()
- with open(os.path.join(file_path.encode('utf-8'), self.file_name.encode('utf-8')), 'wb+') as f:
+ _file_path = os.path.join(file_path.encode("utf-8"), self.file_name.encode("utf-8"))
+ with open(_file_path, "wb+") as f:
f.write(self.content)
return get_files_path(self.file_name, is_private=self.is_private)
def save_file(self, content=None, decode=False, ignore_existing_file_check=False):
+ if not self.content:
+ return
+
file_exists = False
duplicate_file = None
@@ -437,7 +455,8 @@ class File(Document):
# transform file content based on site settings
if (
- self.content_type and self.content_type == "image/jpeg"
+ self.content_type
+ and self.content_type == "image/jpeg"
and frappe.get_system_settings("strip_exif_metadata_from_uploaded_images")
):
self.content = strip_exif_data(self.content, self.content_type)
@@ -446,57 +465,57 @@ class File(Document):
# check if a file exists with the same content hash and is also in the same folder (public or private)
if not ignore_existing_file_check:
- duplicate_file = frappe.get_value("File", {
- "content_hash": self.content_hash,
- "is_private": self.is_private
- },
- ["file_url", "name"], as_dict=True)
+ duplicate_file = frappe.get_value(
+ "File",
+ {"content_hash": self.content_hash, "is_private": self.is_private},
+ ["file_url", "name"],
+ as_dict=True,
+ )
if duplicate_file:
- file_doc = frappe.get_cached_doc('File', duplicate_file.name)
+ file_doc = frappe.get_cached_doc("File", duplicate_file.name)
if file_doc.exists_on_disk():
- self.file_url = duplicate_file.file_url
+ self.file_url = duplicate_file.file_url
file_exists = True
if not file_exists:
self.file_name = generate_file_name(
name=self.file_name,
suffix=self.content_hash[-6:],
- is_private=self.is_private
+ is_private=self.is_private,
)
call_hook_method("before_write_file", file_size=self.file_size)
- write_file_method = get_hook_method('write_file')
+ write_file_method = get_hook_method("write_file")
if write_file_method:
return write_file_method(self)
return self.save_file_on_filesystem()
-
def save_file_on_filesystem(self):
fpath = self.write_file()
if self.is_private:
- self.file_url = "/private/files/{0}".format(self.file_name)
+ self.file_url = f"/private/files/{self.file_name}"
else:
- self.file_url = "/files/{0}".format(self.file_name)
+ self.file_url = f"/files/{self.file_name}"
- return {
- 'file_name': os.path.basename(fpath),
- 'file_url': self.file_url
- }
+ return {"file_name": os.path.basename(fpath), "file_url": self.file_url}
def check_max_file_size(self):
max_file_size = get_max_file_size()
file_size = len(self.content)
if file_size > max_file_size:
- frappe.msgprint(_("File size exceeded the maximum allowed size of {0} MB").format(
- max_file_size / 1048576),
- raise_exception=MaxFileSizeReachedError)
+ frappe.throw(
+ _("File size exceeded the maximum allowed size of {0} MB").format(
+ max_file_size / 1048576
+ ),
+ exc=MaxFileSizeReachedError,
+ )
return file_size
def delete_file_data_content(self, only_thumbnail=False):
- method = get_hook_method('delete_file_data_content')
+ method = get_hook_method("delete_file_data_content")
if method:
method(self, only_thumbnail=only_thumbnail)
else:
@@ -511,12 +530,22 @@ class File(Document):
delete_file(self.thumbnail_url)
def is_downloadable(self):
- return has_permission(self, 'read')
+ return has_permission(self, "read")
def get_extension(self):
- '''returns split filename and extension'''
+ """returns split filename and extension"""
return os.path.splitext(self.file_name)
+ def create_attachment_record(self):
+ icon = ' ' if self.is_private else ""
+ file_url = quote(frappe.safe_encode(self.file_url)) if self.file_url else self.file_name
+ file_name = self.file_name or self.file_url
+
+ self.add_comment_in_reference_doc(
+ "Attachment",
+ _("Added {0}").format(f"{file_name}{icon}"),
+ )
+
def add_comment_in_reference_doc(self, comment_type, text):
if self.attached_to_doctype and self.attached_to_name:
try:
@@ -527,28 +556,28 @@ class File(Document):
def set_is_private(self):
if self.file_url:
- self.is_private = cint(self.file_url.startswith('/private'))
+ self.is_private = cint(self.file_url.startswith("/private"))
@frappe.whitelist()
def optimize_file(self):
if self.is_folder:
- raise TypeError('Folders cannot be optimized')
+ raise TypeError("Folders cannot be optimized")
content_type = mimetypes.guess_type(self.file_name)[0]
- is_local_image = content_type.startswith('image/') and self.file_size > 0
- is_svg = content_type == 'image/svg+xml'
+ is_local_image = content_type.startswith("image/") and self.file_size > 0
+ is_svg = content_type == "image/svg+xml"
if not is_local_image:
- raise NotImplementedError('Only local image files can be optimized')
+ raise NotImplementedError("Only local image files can be optimized")
if is_svg:
- raise TypeError('Optimization of SVG images is not supported')
+ raise TypeError("Optimization of SVG images is not supported")
content = self.get_content()
file_path = self.get_full_path()
optimized_content = optimize_image(content, content_type)
- with open(file_path, 'wb+') as f:
+ with open(file_path, "wb+") as f:
f.write(optimized_content)
self.file_size = len(optimized_content)
@@ -611,5 +640,6 @@ def has_permission(doc, ptype=None, user=None):
return has_access
+
# Note: kept at the end to not cause circular, partial imports & maintain backwards compatibility
from frappe.core.api.file import *