refactor: File controller

* Simplify object lifecycle flows
* Re-organize and separate distinct actions / APIs
* Style black-ish
* Simplify logic in methods "touched"
This commit is contained in:
Gavin D'souza 2022-03-15 19:34:26 +05:30
parent c6bad81f24
commit b87f31a94e

View file

@ -6,7 +6,7 @@ import mimetypes
import os
import re
import shutil
from typing import List
from typing import Union, List
import zipfile
from requests.exceptions import HTTPError, SSLError
@ -31,19 +31,10 @@ ImageFile.LOAD_TRUNCATED_IMAGES = True
class File(Document):
no_feed_on_delete = True
def before_insert(self):
frappe.local.rollback_observers.append(self)
self.set_folder_name()
if self.file_name:
self.file_name = re.sub(r'/', '', self.file_name)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.content = self.get("content", None)
self.decode = self.get("decode", False)
if self.content:
self.save_file(content=self.content, decode=self.decode)
def get_name_based_on_parent_folder(self):
if self.folder:
return os.path.join(self.folder, self.file_name)
def autoname(self):
"""Set name for folder"""
@ -56,90 +47,109 @@ class File(Document):
else:
self.name = frappe.generate_hash(length=10)
def before_insert(self):
frappe.local.rollback_observers.append(self)
self.set_folder_name()
self.set_file_name()
self.save_file(content=self.content, decode=self.decode)
def after_insert(self):
if not self.is_folder:
self.add_comment_in_reference_doc('Attachment',
_('Added {0}').format("<a href='{file_url}' target='_blank'>{file_name}</a>{icon}".format(**{
"icon": ' <i class="fa fa-lock text-warning"></i>' if self.is_private else "",
"file_url": quote(frappe.safe_encode(self.file_url)) if self.file_url else self.file_name,
"file_name": self.file_name or self.file_url
})))
def after_rename(self, olddn, newdn, merge=False):
for successor in self.get_successor():
setup_folder_path(successor[0], self.name)
def get_successor(self):
return frappe.db.get_values(doctype='File',
filters={'folder': self.name},
fieldname='name')
self.create_attachment_record()
self.set_is_private()
self.set_file_name()
self.validate_duplicate_entry()
self.validate_attachment_limit()
def validate(self):
if self.is_new():
self.set_is_private()
self.set_file_name()
self.validate_duplicate_entry()
self.validate_attachment_limit()
# Ensure correct formatting and type
self.file_url = unquote(self.file_url)
self.validate_folder()
if not self.is_new() and self.has_value_changed("is_private"):
self.handle_is_private_changed()
if self.is_folder:
self.file_url = ""
else:
self.validate_url()
self.validate_file_on_disk()
self.validate_file_url()
self.validate_file_path()
self.file_size = frappe.form_dict.file_size or self.file_size
def validate_url(self):
if not self.file_url or self.file_url.startswith(("http://", "https://")):
if not self.flags.ignore_file_validate:
self.validate_file()
def after_rename(self, *args, **kwargs):
for successor in self.get_successors():
setup_folder_path(successor, self.name)
return
# Probably an invalid web URL
if not self.file_url.startswith(("/files/", "/private/files/")):
frappe.throw(
_("URL must start with http:// or https://"),
title=_('Invalid URL')
def on_trash(self):
if self.is_home_folder or self.is_attachments_folder:
frappe.throw(_("Cannot delete Home and Attachments folders"))
self.validate_empty_folder()
self._delete_file_on_disk()
if not self.is_folder:
self.add_comment_in_reference_doc(
"Attachment Removed", _("Removed {0}").format(self.file_name)
)
# Ensure correct formatting and type
self.file_url = unquote(self.file_url)
self.is_private = cint(self.is_private)
def on_rollback(self):
# if original_content flag is set, this rollback should revert the file to its original state
if self.flags.original_content:
file_path = self.get_full_path()
with open(file_path, "wb+") as f:
f.write(self.flags.original_content)
self.handle_is_private_changed()
# following condition is only executed when an insert has been rolledback
else:
self.flags.on_rollback = True
self.on_trash()
def get_name_based_on_parent_folder(self) -> Union[str, None]:
if self.folder:
return os.path.join(self.folder, self.file_name)
def get_successors(self):
return frappe.get_all(
"File", filters={"folder": self.name}, pluck="name"
)
def validate_file_path(self):
if self.file_url.startswith(("http://", "https://")):
return
base_path = os.path.realpath(get_files_path(is_private=self.is_private))
if not os.path.realpath(self.get_full_path()).startswith(base_path):
frappe.throw(
_("The File URL you've entered is incorrect"),
title=_('Invalid File URL')
title=_("Invalid File URL"),
)
def validate_file_url(self):
if self.file_url.startswith(("http://", "https://")):
return
if not self.file_url:
return
if not self.file_url.startswith(("/files/", "/private/files/")):
# Probably an invalid URL since it doesn't start with http either
frappe.throw(
_("URL must start with http:// or https://"), title=_("Invalid URL")
)
def handle_is_private_changed(self):
if not frappe.db.exists(
'File', {
'name': self.name,
'is_private': cint(not self.is_private)
}
):
return
old_file_url = self.file_url
file_name = self.file_url.split("/")[-1]
private_file_path = frappe.get_site_path("private", "files", file_name)
public_file_path = frappe.get_site_path("public", "files", file_name)
file_name = self.file_url.split('/')[-1]
private_file_path = frappe.get_site_path('private', 'files', file_name)
public_file_path = frappe.get_site_path('public', 'files', file_name)
if self.is_private:
if cint(self.is_private):
shutil.move(public_file_path, private_file_path)
url_starts_with = "/private/files/"
else:
shutil.move(private_file_path, public_file_path)
url_starts_with = "/files/"
self.file_url = "{0}{1}".format(url_starts_with, file_name)
self.file_url = f"{url_starts_with}{file_name}"
update_existing_file_docs(self)
if (
@ -149,8 +159,12 @@ class File(Document):
):
return
frappe.db.set_value(self.attached_to_doctype, self.attached_to_name,
self.attached_to_field, self.file_url)
frappe.db.set_value(
self.attached_to_doctype,
self.attached_to_name,
self.attached_to_field,
self.file_url,
)
def fetch_attached_to_field(self, old_file_url):
if self.attached_to_field:
@ -186,21 +200,21 @@ class File(Document):
def set_folder_name(self):
"""Make parent folders if not exists based on reference doctype and name"""
if self.attached_to_doctype and not self.folder:
if self.folder:
return
if self.attached_to_doctype:
self.folder = frappe.db.get_value("File", {"is_attachments_folder": 1})
def validate_folder(self):
if not self.is_home_folder and not self.folder and \
not self.flags.ignore_folder_validate:
if not self.is_home_folder:
self.folder = "Home"
def validate_file(self):
"""Validates existence of public file
TODO: validate for private file
def validate_file_on_disk(self):
"""Validates existence file
"""
full_path = self.get_full_path()
if full_path.startswith('http'):
if full_path.startswith("http"):
return True
if not os.path.exists(full_path):
@ -214,33 +228,37 @@ class File(Document):
# check duplicate name
# check duplicate assignment
filters = {
'content_hash': self.content_hash,
'is_private': self.is_private,
'name': ('!=', self.name)
"content_hash": self.content_hash,
"is_private": self.is_private,
"name": ("!=", self.name),
}
if self.attached_to_doctype and self.attached_to_name:
filters.update({
'attached_to_doctype': self.attached_to_doctype,
'attached_to_name': self.attached_to_name
})
duplicate_file = frappe.db.get_value('File', filters, ['name', 'file_url'], as_dict=1)
filters.update(
{
"attached_to_doctype": self.attached_to_doctype,
"attached_to_name": self.attached_to_name,
}
)
duplicate_file = frappe.db.get_value(
"File", filters, ["name", "file_url"], as_dict=1
)
if duplicate_file:
duplicate_file_doc = frappe.get_cached_doc('File', duplicate_file.name)
duplicate_file_doc = frappe.get_cached_doc("File", duplicate_file.name)
if duplicate_file_doc.exists_on_disk():
# just use the url, to avoid uploading a duplicate
self.file_url = duplicate_file.file_url
# just use the url, to avoid uploading a duplicate
self.file_url = duplicate_file.file_url
def set_file_name(self):
if not self.file_name and self.file_url:
self.file_name = self.file_url.split('/')[-1]
self.file_name = self.file_url.split("/")[-1]
else:
self.file_name = re.sub(r'/', '', self.file_name)
self.file_name = re.sub(r"/", "", self.file_name)
def generate_content_hash(self):
if self.content_hash or not self.file_url or self.file_url.startswith('http'):
if self.content_hash or not self.file_url or self.file_url.startswith("http"):
return
file_name = self.file_url.split('/')[-1]
file_name = self.file_url.split("/")[-1]
try:
file_path = get_files_path(file_name, is_private=self.is_private)
with open(file_path, "rb") as f:
@ -248,73 +266,68 @@ class File(Document):
except IOError:
frappe.throw(_("File {0} does not exist").format(file_path))
def on_trash(self):
if self.is_home_folder or self.is_attachments_folder:
frappe.throw(_("Cannot delete Home and Attachments folders"))
self.check_folder_is_empty()
self.call_delete_file()
if not self.is_folder:
self.add_comment_in_reference_doc('Attachment Removed', _("Removed {0}").format(self.file_name))
def make_thumbnail(
self,
set_as_thumbnail: bool = True,
width: int = 300,
height: int = 300,
suffix: str = "small",
crop: bool = False,
) -> str:
if not self.file_url:
return
def make_thumbnail(self, set_as_thumbnail=True, width=300, height=300, suffix="small", crop=False):
if self.file_url:
try:
if self.file_url.startswith(("/files", "/private/files")):
image, filename, extn = get_local_image(self.file_url)
else:
image, filename, extn = get_web_image(self.file_url)
except (HTTPError, SSLError, IOError, TypeError):
return
size = width, height
if crop:
image = ImageOps.fit(image, size, Image.ANTIALIAS)
try:
if self.file_url.startswith(("/files", "/private/files")):
image, filename, extn = get_local_image(self.file_url)
else:
image.thumbnail(size, Image.ANTIALIAS)
image, filename, extn = get_web_image(self.file_url)
except (HTTPError, SSLError, IOError, TypeError):
return
thumbnail_url = filename + "_" + suffix + "." + extn
path = os.path.abspath(frappe.get_site_path("public", thumbnail_url.lstrip("/")))
size = width, height
if crop:
image = ImageOps.fit(image, size, Image.ANTIALIAS)
else:
image.thumbnail(size, Image.ANTIALIAS)
try:
image.save(path)
if set_as_thumbnail:
self.db_set("thumbnail_url", thumbnail_url)
thumbnail_url = f"{filename}_{suffix}.{extn}"
path = os.path.abspath(
frappe.get_site_path("public", thumbnail_url.lstrip("/"))
)
except IOError:
frappe.msgprint(_("Unable to write file format for {0}").format(path))
return
try:
image.save(path)
if set_as_thumbnail:
self.db_set("thumbnail_url", thumbnail_url)
return thumbnail_url
except IOError:
frappe.msgprint(_("Unable to write file format for {0}").format(path))
return
def check_folder_is_empty(self):
return thumbnail_url
def validate_empty_folder(self):
"""Throw exception if folder is not empty"""
files = frappe.get_all("File", filters={"folder": self.name}, fields=("name", "file_name"))
if self.is_folder and files:
if self.is_folder and frappe.get_all("File", filters={"folder": self.name}, limit=1):
frappe.throw(_("Folder {0} is not empty").format(self.name), FolderNotEmpty)
def call_delete_file(self):
def _delete_file_on_disk(self):
"""If file not attached to any other record, delete it"""
if self.file_name and self.content_hash and (not frappe.db.count("File",
{"content_hash": self.content_hash, "name": ["!=", self.name]})):
self.delete_file_data_content()
on_disk_file_not_shared = (
self.content_hash
and not frappe.get_all("File",
filters={"content_hash": self.content_hash, "name": ["!=", self.name]},
limit=1,
)
)
if self.file_name and on_disk_file_not_shared:
self.delete_file_data_content()
elif self.file_url:
self.delete_file_data_content(only_thumbnail=True)
def on_rollback(self):
# if original_content flag is set, this rollback should revert the file to its original state
if self.flags.original_content:
file_path = self.get_full_path()
with open(file_path, "wb+") as f:
f.write(self.flags.original_content)
# following condition is only executed when an insert has been rolledback
else:
self.flags.on_rollback = True
self.on_trash()
def unzip(self) -> List["File"]:
'''Unzip current file and replace it by its children'''
"""Unzip current file and replace it by its children"""
if not self.file_url.endswith(".zip"):
frappe.throw(_("{0} is not a zip file").format(self.file_name))
@ -323,16 +336,16 @@ class File(Document):
files = []
with zipfile.ZipFile(zip_path) as z:
for file in z.filelist:
if file.is_dir() or file.filename.startswith('__MACOSX/'):
if file.is_dir() or file.filename.startswith("__MACOSX/"):
# skip directories and macos hidden directory
continue
filename = os.path.basename(file.filename)
if filename.startswith('.'):
if filename.startswith("."):
# skip hidden files
continue
file_doc = frappe.new_doc('File')
file_doc = frappe.new_doc("File")
file_doc.content = z.read(file.filename)
file_doc.file_name = filename
file_doc.folder = self.folder
@ -342,28 +355,25 @@ class File(Document):
file_doc.save()
files.append(file_doc)
frappe.delete_doc('File', self.name)
frappe.delete_doc("File", self.name)
return files
def exists_on_disk(self):
exists = os.path.exists(self.get_full_path())
return exists
return os.path.exists(self.get_full_path())
def get_content(self):
def get_content(self) -> bytes:
"""Returns [`file_name`, `content`] for given file name `fname`"""
if self.is_folder:
frappe.throw(_("Cannot get file contents of a Folder"))
if self.get('content'):
if self.get("content"):
return self.content
self.validate_url()
self.validate_file_url()
file_path = self.get_full_path()
# read the file
with io.open(encode(file_path), mode='rb') as f:
with io.open(encode(file_path), mode="rb") as f:
content = f.read()
try:
# for plain text files
@ -383,7 +393,9 @@ class File(Document):
file_path = "/files/" + file_path
if file_path.startswith("/private/files/"):
file_path = get_files_path(*file_path.split("/private/files/", 1)[1].split("/"), is_private=1)
file_path = get_files_path(
*file_path.split("/private/files/", 1)[1].split("/"), is_private=1
)
elif file_path.startswith("/files/"):
file_path = get_files_path(*file_path.split("/files/", 1)[1].split("/"))
@ -392,7 +404,9 @@ class File(Document):
pass
elif not self.file_url:
frappe.throw(_("There is some problem with the file url: {0}").format(file_path))
frappe.throw(
_("There is some problem with the file url: {0}").format(file_path)
)
if not is_safe_path(file_path):
frappe.throw(f"Cannot access file path {file_path}")
@ -404,7 +418,7 @@ class File(Document):
file_path = get_files_path(is_private=self.is_private)
if os.path.sep in self.file_name:
frappe.throw(_('File name cannot have {0}').format(os.path.sep))
frappe.throw(_("File name cannot have {0}").format(os.path.sep))
# create directory (if not exists)
frappe.create_folder(file_path)
@ -412,12 +426,16 @@ class File(Document):
self.content = self.get_content()
if isinstance(self.content, str):
self.content = self.content.encode()
with open(os.path.join(file_path.encode('utf-8'), self.file_name.encode('utf-8')), 'wb+') as f:
_file_path = os.path.join(file_path.encode("utf-8"), self.file_name.encode("utf-8"))
with open(_file_path, "wb+") as f:
f.write(self.content)
return get_files_path(self.file_name, is_private=self.is_private)
def save_file(self, content=None, decode=False, ignore_existing_file_check=False):
if not self.content:
return
file_exists = False
duplicate_file = None
@ -437,7 +455,8 @@ class File(Document):
# transform file content based on site settings
if (
self.content_type and self.content_type == "image/jpeg"
self.content_type
and self.content_type == "image/jpeg"
and frappe.get_system_settings("strip_exif_metadata_from_uploaded_images")
):
self.content = strip_exif_data(self.content, self.content_type)
@ -446,57 +465,57 @@ class File(Document):
# check if a file exists with the same content hash and is also in the same folder (public or private)
if not ignore_existing_file_check:
duplicate_file = frappe.get_value("File", {
"content_hash": self.content_hash,
"is_private": self.is_private
},
["file_url", "name"], as_dict=True)
duplicate_file = frappe.get_value(
"File",
{"content_hash": self.content_hash, "is_private": self.is_private},
["file_url", "name"],
as_dict=True,
)
if duplicate_file:
file_doc = frappe.get_cached_doc('File', duplicate_file.name)
file_doc = frappe.get_cached_doc("File", duplicate_file.name)
if file_doc.exists_on_disk():
self.file_url = duplicate_file.file_url
self.file_url = duplicate_file.file_url
file_exists = True
if not file_exists:
self.file_name = generate_file_name(
name=self.file_name,
suffix=self.content_hash[-6:],
is_private=self.is_private
is_private=self.is_private,
)
call_hook_method("before_write_file", file_size=self.file_size)
write_file_method = get_hook_method('write_file')
write_file_method = get_hook_method("write_file")
if write_file_method:
return write_file_method(self)
return self.save_file_on_filesystem()
def save_file_on_filesystem(self):
fpath = self.write_file()
if self.is_private:
self.file_url = "/private/files/{0}".format(self.file_name)
self.file_url = f"/private/files/{self.file_name}"
else:
self.file_url = "/files/{0}".format(self.file_name)
self.file_url = f"/files/{self.file_name}"
return {
'file_name': os.path.basename(fpath),
'file_url': self.file_url
}
return {"file_name": os.path.basename(fpath), "file_url": self.file_url}
def check_max_file_size(self):
max_file_size = get_max_file_size()
file_size = len(self.content)
if file_size > max_file_size:
frappe.msgprint(_("File size exceeded the maximum allowed size of {0} MB").format(
max_file_size / 1048576),
raise_exception=MaxFileSizeReachedError)
frappe.throw(
_("File size exceeded the maximum allowed size of {0} MB").format(
max_file_size / 1048576
),
exc=MaxFileSizeReachedError,
)
return file_size
def delete_file_data_content(self, only_thumbnail=False):
method = get_hook_method('delete_file_data_content')
method = get_hook_method("delete_file_data_content")
if method:
method(self, only_thumbnail=only_thumbnail)
else:
@ -511,12 +530,22 @@ class File(Document):
delete_file(self.thumbnail_url)
def is_downloadable(self):
return has_permission(self, 'read')
return has_permission(self, "read")
def get_extension(self):
'''returns split filename and extension'''
"""returns split filename and extension"""
return os.path.splitext(self.file_name)
def create_attachment_record(self):
icon = ' <i class="fa fa-lock text-warning"></i>' if self.is_private else ""
file_url = quote(frappe.safe_encode(self.file_url)) if self.file_url else self.file_name
file_name = self.file_name or self.file_url
self.add_comment_in_reference_doc(
"Attachment",
_("Added {0}").format(f"<a href='{file_url}' target='_blank'>{file_name}</a>{icon}"),
)
def add_comment_in_reference_doc(self, comment_type, text):
if self.attached_to_doctype and self.attached_to_name:
try:
@ -527,28 +556,28 @@ class File(Document):
def set_is_private(self):
if self.file_url:
self.is_private = cint(self.file_url.startswith('/private'))
self.is_private = cint(self.file_url.startswith("/private"))
@frappe.whitelist()
def optimize_file(self):
if self.is_folder:
raise TypeError('Folders cannot be optimized')
raise TypeError("Folders cannot be optimized")
content_type = mimetypes.guess_type(self.file_name)[0]
is_local_image = content_type.startswith('image/') and self.file_size > 0
is_svg = content_type == 'image/svg+xml'
is_local_image = content_type.startswith("image/") and self.file_size > 0
is_svg = content_type == "image/svg+xml"
if not is_local_image:
raise NotImplementedError('Only local image files can be optimized')
raise NotImplementedError("Only local image files can be optimized")
if is_svg:
raise TypeError('Optimization of SVG images is not supported')
raise TypeError("Optimization of SVG images is not supported")
content = self.get_content()
file_path = self.get_full_path()
optimized_content = optimize_image(content, content_type)
with open(file_path, 'wb+') as f:
with open(file_path, "wb+") as f:
f.write(optimized_content)
self.file_size = len(optimized_content)
@ -611,5 +640,6 @@ def has_permission(doc, ptype=None, user=None):
return has_access
# Note: kept at the end to not cause circular, partial imports & maintain backwards compatibility
from frappe.core.api.file import *