Merge pull request #6055 from Thunderbottom/file-api
Streamlined File API based on ORM
This commit is contained in:
commit
171c650ddf
27 changed files with 818 additions and 123 deletions
|
|
@ -9,7 +9,6 @@ import frappe.utils
|
|||
import json, os
|
||||
|
||||
from six import iteritems, string_types, integer_types
|
||||
from frappe.utils.file_manager import save_file
|
||||
|
||||
'''
|
||||
Handle RESTful requests that are mapped to the `/api/resource` route.
|
||||
|
|
@ -351,10 +350,20 @@ def attach_file(filename=None, filedata=None, doctype=None, docname=None, folder
|
|||
if not doc.has_permission():
|
||||
frappe.throw(_("Not permitted"), frappe.PermissionError)
|
||||
|
||||
f = save_file(filename, filedata, doctype, docname, folder, decode_base64, is_private, docfield)
|
||||
_file = frappe.get_doc({
|
||||
"doctype": "File",
|
||||
"file_name": filename,
|
||||
"attached_to_doctype": doctype,
|
||||
"attached_to_name": docname,
|
||||
"attached_to_field": docfield,
|
||||
"folder": folder,
|
||||
"is_private": is_private,
|
||||
"content": filedata,
|
||||
"decode": decode_base64})
|
||||
_file.save()
|
||||
|
||||
if docfield and doctype:
|
||||
doc.set(docfield, f.file_url)
|
||||
doc.set(docfield, _file.file_url)
|
||||
doc.save()
|
||||
|
||||
return f.as_dict()
|
||||
|
|
|
|||
|
|
@ -10,7 +10,6 @@ from email.utils import formataddr
|
|||
from frappe.core.utils import get_parent_doc
|
||||
from frappe.utils import (get_url, get_formatted_email, cint,
|
||||
validate_email_add, split_emails, time_diff_in_seconds, parse_addr, get_datetime)
|
||||
from frappe.utils.file_manager import get_file
|
||||
from frappe.email.queue import check_email_limit
|
||||
from frappe.utils.scheduler import log
|
||||
from frappe.email.email_body import get_message_id
|
||||
|
|
@ -281,7 +280,8 @@ def prepare_to_notify(doc, print_html=None, print_format=None, attachments=None)
|
|||
# is it a filename?
|
||||
try:
|
||||
# keep this for error handling
|
||||
file = get_file(a)
|
||||
_file = frappe.get_doc("File", {"file_name": a})
|
||||
_file.get_content()
|
||||
# these attachments will be attached on-demand
|
||||
# and won't be stored in the message
|
||||
doc.attachments.append({"fid": a})
|
||||
|
|
@ -393,8 +393,6 @@ def get_bcc(doc, recipients=None, fetched_from_email_account=False):
|
|||
|
||||
def add_attachments(name, attachments):
|
||||
'''Add attachments to the given Communiction'''
|
||||
from frappe.utils.file_manager import save_url
|
||||
|
||||
# loop through attachments
|
||||
for a in attachments:
|
||||
if isinstance(a, string_types):
|
||||
|
|
@ -402,8 +400,14 @@ def add_attachments(name, attachments):
|
|||
["file_name", "file_url", "is_private"], as_dict=1)
|
||||
|
||||
# save attachments to new doc
|
||||
save_url(attach.file_url, attach.file_name, "Communication", name,
|
||||
"Home/Attachments", attach.is_private)
|
||||
_file = frappe.get_doc({
|
||||
"doctype": "File",
|
||||
"file_url": attach.file_url,
|
||||
"attached_to_doctype": "Communication",
|
||||
"attached_to_name": name,
|
||||
"folder": "Home/Attachments"})
|
||||
_file.save()
|
||||
|
||||
|
||||
def filter_email_list(doc, email_list, exclude, is_cc=False, is_bcc=False):
|
||||
# temp variables
|
||||
|
|
|
|||
|
|
@ -12,7 +12,6 @@ from frappe import _
|
|||
|
||||
from frappe.utils.csvutils import getlink
|
||||
from frappe.utils.dateutils import parse_date
|
||||
from frappe.utils.file_manager import save_url
|
||||
|
||||
from frappe.utils import cint, cstr, flt, getdate, get_datetime, get_url, get_url_to_form
|
||||
from six import text_type, string_types
|
||||
|
|
@ -265,14 +264,22 @@ def upload(rows = None, submit_after_import=None, ignore_encoding_errors=False,
|
|||
# file is already attached
|
||||
return
|
||||
|
||||
save_url(file_url, None, doctype, docname, "Home/Attachments", 0)
|
||||
_file = frappe.get_doc({
|
||||
"doctype": "File",
|
||||
"file_url": file_url,
|
||||
"attached_to_name": docname,
|
||||
"attached_to_doctype": doctype,
|
||||
"attached_to_field": 0,
|
||||
"folder": "Home/Attachments"})
|
||||
_file.save()
|
||||
|
||||
|
||||
# header
|
||||
filename, file_extension = ['','']
|
||||
if not rows:
|
||||
from frappe.utils.file_manager import get_file # get_file_doc
|
||||
fname, fcontent = get_file(data_import_doc.import_file)
|
||||
filename, file_extension = os.path.splitext(fname)
|
||||
_file = frappe.get_doc("File", {"file_url": data_import_doc.import_file})
|
||||
fcontent = _file.get_content()
|
||||
filename, file_extension = _file.get_extension()
|
||||
|
||||
if file_extension == '.xlsx' and from_data_import == 'Yes':
|
||||
from frappe.utils.xlsxutils import read_xlsx_file_from_attached_file
|
||||
|
|
@ -455,7 +462,6 @@ def upload(rows = None, submit_after_import=None, ignore_encoding_errors=False,
|
|||
if error_flag and data_import_doc.skip_errors and len(data) != len(data_rows_with_error):
|
||||
import_status = "Partially Successful"
|
||||
# write the file with the faulty row
|
||||
from frappe.utils.file_manager import save_file
|
||||
file_name = 'error_' + filename + file_extension
|
||||
if file_extension == '.xlsx':
|
||||
from frappe.utils.xlsxutils import make_xlsx
|
||||
|
|
@ -464,9 +470,15 @@ def upload(rows = None, submit_after_import=None, ignore_encoding_errors=False,
|
|||
else:
|
||||
from frappe.utils.csvutils import to_csv
|
||||
file_data = to_csv(data_rows_with_error)
|
||||
error_data_file = save_file(file_name, file_data, "Data Import",
|
||||
data_import_doc.name, "Home/Attachments")
|
||||
data_import_doc.error_file = error_data_file.file_url
|
||||
_file = frappe.get_doc({
|
||||
"doctype": "File",
|
||||
"file_name": file_name,
|
||||
"attached_to_doctype": "Data Import",
|
||||
"attached_to_name": data_import_doc.name,
|
||||
"folder": "Home/Attachments",
|
||||
"content": file_data})
|
||||
_file.save()
|
||||
data_import_doc.error_file = _file.file_url
|
||||
|
||||
elif error_flag:
|
||||
import_status = "Failed"
|
||||
|
|
|
|||
|
|
@ -11,24 +11,36 @@ naming for same name files: file.gif, file-1.gif, file-2.gif etc
|
|||
import frappe
|
||||
import json
|
||||
import os
|
||||
import base64
|
||||
import re
|
||||
import hashlib
|
||||
import mimetypes
|
||||
import io
|
||||
import shutil
|
||||
import requests
|
||||
import requests.exceptions
|
||||
import mimetypes, imghdr
|
||||
import imghdr
|
||||
|
||||
from frappe.utils.file_manager import delete_file_data_content, get_content_hash, get_random_filename
|
||||
from frappe.utils import get_hook_method, get_files_path, random_string, encode, cstr, call_hook_method, cint
|
||||
from frappe import _
|
||||
from frappe import conf
|
||||
from frappe.utils.nestedset import NestedSet
|
||||
from frappe.utils import strip, get_files_path
|
||||
from frappe.utils import strip
|
||||
from PIL import Image, ImageOps
|
||||
from six import StringIO, string_types
|
||||
from six.moves.urllib.parse import unquote
|
||||
from six import text_type, PY2
|
||||
import zipfile
|
||||
|
||||
class MaxFileSizeReachedError(frappe.ValidationError):
|
||||
pass
|
||||
|
||||
|
||||
class FolderNotEmpty(frappe.ValidationError): pass
|
||||
|
||||
exclude_from_linked_with = True
|
||||
|
||||
|
||||
class File(NestedSet):
|
||||
nsm_parent_field = 'folder'
|
||||
no_feed_on_delete = True
|
||||
|
|
@ -36,6 +48,10 @@ class File(NestedSet):
|
|||
def before_insert(self):
|
||||
frappe.local.rollback_observers.append(self)
|
||||
self.set_folder_name()
|
||||
self.content = self.get("content", None)
|
||||
self.decode = self.get("decode", False)
|
||||
if self.content:
|
||||
self.save_file(content=self.content, decode=self.decode)
|
||||
|
||||
def get_name_based_on_parent_folder(self):
|
||||
path = get_breadcrumbs(self.folder)
|
||||
|
|
@ -69,10 +85,12 @@ class File(NestedSet):
|
|||
self.validate_folder()
|
||||
|
||||
if not self.flags.ignore_file_validate:
|
||||
self.validate_file()
|
||||
if not self.is_folder:
|
||||
self.validate_file()
|
||||
self.generate_content_hash()
|
||||
|
||||
self.set_folder_size()
|
||||
self.validate_url()
|
||||
|
||||
if frappe.db.exists('File', {'name': self.name, 'is_folder': 0}):
|
||||
old_file_url = self.file_url
|
||||
|
|
@ -104,7 +122,8 @@ class File(NestedSet):
|
|||
break
|
||||
self.attached_to_field = field_name
|
||||
if self.attached_to_field:
|
||||
frappe.db.set_value(self.attached_to_doctype, self.attached_to_name, self.attached_to_field, self.file_url)
|
||||
frappe.db.set_value(self.attached_to_doctype, self.attached_to_name,
|
||||
self.attached_to_field, self.file_url)
|
||||
|
||||
|
||||
def set_folder_size(self):
|
||||
|
|
@ -139,18 +158,16 @@ class File(NestedSet):
|
|||
def validate_folder(self):
|
||||
if not self.is_home_folder and not self.folder and \
|
||||
not self.flags.ignore_folder_validate:
|
||||
frappe.throw(_("Folder is mandatory"))
|
||||
self.folder = "Home"
|
||||
|
||||
def validate_file(self):
|
||||
"""Validates existence of public file
|
||||
TODO: validate for private file
|
||||
"""
|
||||
if (self.file_url or "").startswith("/files/"):
|
||||
if not self.file_name:
|
||||
self.file_name = self.file_url.split("/files/")[-1]
|
||||
full_path = self.get_full_path()
|
||||
|
||||
if not os.path.exists(get_files_path(frappe.as_unicode(self.file_name.lstrip("/")))):
|
||||
frappe.throw(_("File {0} does not exist").format(self.file_url), IOError)
|
||||
if not os.path.exists(full_path):
|
||||
frappe.throw(_("File {0} does not exist").format(self.file_url), IOError)
|
||||
|
||||
def validate_duplicate_entry(self):
|
||||
if not self.flags.ignore_duplicate_entry_error and not self.is_folder:
|
||||
|
|
@ -165,7 +182,8 @@ class File(NestedSet):
|
|||
self.attached_to_name))
|
||||
if len(n_records) > 0:
|
||||
self.duplicate_entry = n_records[0][0]
|
||||
frappe.throw(frappe._("Same file has already been attached to the record"), frappe.DuplicateEntryError)
|
||||
frappe.throw(frappe._("Same file has already been attached to the record"),
|
||||
frappe.DuplicateEntryError)
|
||||
|
||||
def generate_content_hash(self):
|
||||
if self.content_hash or not self.file_url:
|
||||
|
|
@ -185,7 +203,7 @@ class File(NestedSet):
|
|||
self.check_folder_is_empty()
|
||||
self.check_reference_doc_permission()
|
||||
super(File, self).on_trash()
|
||||
self.delete_file()
|
||||
self.call_delete_file()
|
||||
|
||||
def make_thumbnail(self, set_as_thumbnail=True, width=300, height=300, suffix="small", crop=False):
|
||||
if self.file_url:
|
||||
|
|
@ -246,19 +264,20 @@ class File(NestedSet):
|
|||
if not self.flags.ignore_permissions and \
|
||||
not frappe.has_permission(self.attached_to_doctype,
|
||||
"write", self.attached_to_name):
|
||||
frappe.throw(frappe._("Cannot delete file as it belongs to {0} {1} for which you do not have permissions").format(self.attached_to_doctype, self.attached_to_name),
|
||||
frappe.throw(frappe._(
|
||||
"Cannot delete file as it belongs to {0} {1} for which you do not have permissions").format(
|
||||
self.attached_to_doctype, self.attached_to_name),
|
||||
frappe.PermissionError)
|
||||
except frappe.DoesNotExistError:
|
||||
pass
|
||||
|
||||
def delete_file(self):
|
||||
def call_delete_file(self):
|
||||
"""If file not attached to any other record, delete it"""
|
||||
if self.file_name and self.content_hash and (not frappe.db.count("File",
|
||||
{"content_hash": self.content_hash, "name": ["!=", self.name]})):
|
||||
delete_file_data_content(self)
|
||||
|
||||
self.delete_file_data_content()
|
||||
elif self.file_url:
|
||||
delete_file_data_content(self, only_thumbnail=True)
|
||||
self.delete_file_data_content(only_thumbnail=True)
|
||||
|
||||
def on_rollback(self):
|
||||
self.flags.on_rollback = True
|
||||
|
|
@ -293,6 +312,253 @@ class File(NestedSet):
|
|||
|
||||
frappe.delete_doc('File', self.name)
|
||||
|
||||
|
||||
def get_file_url(self):
|
||||
data = frappe.db.get_value("File", self.file_data_name, ["file_name", "file_url"], as_dict=True)
|
||||
return data.file_url or data.file_name
|
||||
|
||||
|
||||
def upload(self):
|
||||
# get record details
|
||||
self.attached_to_doctype = frappe.form_dict.doctype
|
||||
self.attached_to_name = frappe.form_dict.docname
|
||||
self.attached_to_field = frappe.form_dict.docfield
|
||||
self.file_url = frappe.form_dict.file_url
|
||||
self.file_name = frappe.form_dict.filename
|
||||
frappe.form_dict.is_private = cint(frappe.form_dict.is_private)
|
||||
|
||||
if not self.file_name and not self.file_url:
|
||||
frappe.msgprint(_("Please select a file or url"),
|
||||
raise_exception=True)
|
||||
|
||||
file_doc = self.get_file_doc()
|
||||
|
||||
comment = {}
|
||||
if self.attached_to_doctype and self.attached_to_name:
|
||||
comment = frappe.get_doc(self.attached_to_doctype, self.attached_to_name).add_comment("Attachment",
|
||||
_ ("added {0}").format("<a href='{file_url}' target='_blank'>{file_name}</a>{icon}".format(**{
|
||||
"icon": ' <i class="fa fa-lock text-warning"></i>' \
|
||||
if file_doc.is_private else "",
|
||||
"file_url": file_doc.file_url.replace("#", "%23") \
|
||||
if file_doc.file_name else file_doc.file_url,
|
||||
"file_name": file_doc.file_name or file_doc.file_url
|
||||
})))
|
||||
|
||||
return {
|
||||
"name": file_doc.name,
|
||||
"file_name": file_doc.file_name,
|
||||
"file_url": file_doc.file_url,
|
||||
"is_private": file_doc.is_private,
|
||||
"comment": comment.as_dict() if comment else {}
|
||||
}
|
||||
|
||||
def get_content(self):
|
||||
"""Returns [`file_name`, `content`] for given file name `fname`"""
|
||||
if self.get('content'):
|
||||
return self.content
|
||||
file_path = self.get_full_path()
|
||||
|
||||
# read the file
|
||||
if PY2:
|
||||
with open(encode(file_path)) as f:
|
||||
content = f.read()
|
||||
else:
|
||||
with io.open(encode(file_path), mode='rb') as f:
|
||||
content = f.read()
|
||||
try:
|
||||
# for plain text files
|
||||
content = content.decode()
|
||||
except UnicodeDecodeError:
|
||||
# for .png, .jpg, etc
|
||||
pass
|
||||
|
||||
return content
|
||||
|
||||
def get_full_path(self):
|
||||
"""Returns file path from given file name"""
|
||||
|
||||
file_path = self.file_url or self.file_name
|
||||
|
||||
if "/" not in file_path:
|
||||
file_path = "/files/" + file_path
|
||||
|
||||
if file_path.startswith("/private/files/"):
|
||||
file_path = get_files_path(*file_path.split("/private/files/", 1)[1].split("/"), is_private=1)
|
||||
|
||||
elif file_path.startswith("/files/"):
|
||||
file_path = get_files_path(*file_path.split("/files/", 1)[1].split("/"))
|
||||
|
||||
else:
|
||||
frappe.throw(_("There is some problem with the file url: {0}").format(file_path))
|
||||
|
||||
return file_path
|
||||
|
||||
def write_file(self):
|
||||
"""write file to disk with a random name (to compare)"""
|
||||
file_path = get_files_path(is_private=self.is_private)
|
||||
|
||||
# create directory (if not exists)
|
||||
frappe.create_folder(file_path)
|
||||
# write the file
|
||||
self.content = self.get_content()
|
||||
if isinstance(self.content, text_type):
|
||||
self.content = self.content.encode()
|
||||
with open(os.path.join(file_path.encode('utf-8'), self.file_name.encode('utf-8')), 'wb+') as f:
|
||||
f.write(self.content)
|
||||
|
||||
return get_files_path(self.file_name, is_private=self.is_private)
|
||||
|
||||
def get_file_doc(self):
|
||||
'''returns File object (Document) from given parameters or form_dict'''
|
||||
r = frappe.form_dict
|
||||
|
||||
if self.file_url is None: self.file_url = r.file_url
|
||||
if self.file_name is None: self.file_name = r.file_name
|
||||
if self.attached_to_doctype is None: self.attached_to_doctype = r.doctype
|
||||
if self.attached_to_name is None: self.attached_to_name = r.docname
|
||||
if self.attached_to_field is None: self.attached_to_field = r.docfield
|
||||
if self.folder is None: self.folder = r.folder
|
||||
if self.is_private is None: self.is_private = r.is_private
|
||||
|
||||
if r.filedata:
|
||||
file_doc = self.save_uploaded()
|
||||
|
||||
elif r.file_url:
|
||||
file_doc = self.save()
|
||||
|
||||
return file_doc
|
||||
|
||||
|
||||
def save_uploaded(self):
|
||||
self.content = self.get_uploaded_content()
|
||||
if self.content:
|
||||
return self.save()
|
||||
else:
|
||||
raise Exception
|
||||
|
||||
|
||||
def validate_url(self, df=None):
|
||||
if self.file_url:
|
||||
if not self.file_url.startswith(("http://", "https://", "/files/", "/private/files/")):
|
||||
frappe.throw("URL must start with 'http://' or 'https://'")
|
||||
return
|
||||
|
||||
self.file_url = unquote(self.file_url)
|
||||
self.file_size = frappe.form_dict.file_size
|
||||
|
||||
|
||||
def get_uploaded_content(self):
|
||||
# should not be unicode when reading a file, hence using frappe.form
|
||||
if 'filedata' in frappe.form_dict:
|
||||
if "," in frappe.form_dict.filedata:
|
||||
frappe.form_dict.filedata = frappe.form_dict.filedata.rsplit(",", 1)[1]
|
||||
frappe.uploaded_content = base64.b64decode(frappe.form_dict.filedata)
|
||||
return frappe.uploaded_content
|
||||
elif self.content:
|
||||
return self.content
|
||||
frappe.msgprint(_('No file attached'))
|
||||
return None
|
||||
|
||||
|
||||
def save_file(self, content=None, decode=False):
|
||||
file_exists = False
|
||||
self.content = content
|
||||
if decode:
|
||||
if isinstance(content, text_type):
|
||||
self.content = content.encode("utf-8")
|
||||
|
||||
if b"," in self.content:
|
||||
self.content = self.content.split(b",")[1]
|
||||
self.content = base64.b64decode(self.content)
|
||||
|
||||
if not self.is_private:
|
||||
self.is_private = 0
|
||||
self.file_size = self.check_max_file_size()
|
||||
self.content_hash = get_content_hash(self.content)
|
||||
self.content_type = mimetypes.guess_type(self.file_name)[0]
|
||||
|
||||
_file = frappe.get_value("File", {"content_hash": self.content_hash}, ["file_url"])
|
||||
if _file:
|
||||
self.file_url = _file
|
||||
file_exists = True
|
||||
|
||||
if not file_exists:
|
||||
if os.path.exists(encode(get_files_path(self.file_name))):
|
||||
self.file_name = get_file_name(self.file_name, self.content_hash[-6:])
|
||||
|
||||
call_hook_method("before_write_file", file_size=self.file_size)
|
||||
write_file_method = get_hook_method('write_file')
|
||||
if write_file_method:
|
||||
return write_file_method(self)
|
||||
return self.save_file_on_filesystem()
|
||||
|
||||
|
||||
def save_file_on_filesystem(self):
|
||||
fpath = self.write_file()
|
||||
|
||||
if self.is_private:
|
||||
self.file_url = "/private/files/{0}".format(self.file_name)
|
||||
else:
|
||||
self.file_url = "/files/{0}".format(self.file_name)
|
||||
|
||||
return {
|
||||
'file_name': os.path.basename(fpath),
|
||||
'file_url': self.file_url
|
||||
}
|
||||
|
||||
def get_file_data_from_hash(self):
|
||||
for name in frappe.db.sql_list("select name from `tabFile` where content_hash=%s and is_private=%s",
|
||||
(self.content_hash, self.is_private)):
|
||||
b = frappe.get_doc('File', name)
|
||||
return {k: b.get(k) for k in frappe.get_hooks()['write_file_keys']}
|
||||
return False
|
||||
|
||||
|
||||
def check_max_file_size(self):
|
||||
max_file_size = get_max_file_size()
|
||||
file_size = len(self.content)
|
||||
|
||||
if file_size > max_file_size:
|
||||
frappe.msgprint(_("File size exceeded the maximum allowed size of {0} MB").format(
|
||||
max_file_size / 1048576),
|
||||
raise_exception=MaxFileSizeReachedError)
|
||||
|
||||
return file_size
|
||||
|
||||
|
||||
def delete_file_data_content(self, only_thumbnail=False):
|
||||
method = get_hook_method('delete_file_data_content')
|
||||
if method:
|
||||
method(self, only_thumbnail=only_thumbnail)
|
||||
else:
|
||||
self.delete_file_from_filesystem(only_thumbnail=only_thumbnail)
|
||||
|
||||
|
||||
def delete_file_from_filesystem(self, only_thumbnail=False):
|
||||
"""Delete file, thumbnail from File document"""
|
||||
if only_thumbnail:
|
||||
delete_file(self.thumbnail_url)
|
||||
else:
|
||||
delete_file(self.file_url)
|
||||
delete_file(self.thumbnail_url)
|
||||
|
||||
|
||||
def check_file_permission(self):
|
||||
for file in frappe.get_all("File", filters={"file_url": self.file_url, "is_private": 1},
|
||||
fields=["name", "attached_to_doctype", "attached_to_name"]):
|
||||
|
||||
if (frappe.has_permission("File", ptype="read", doc=file.name)
|
||||
or frappe.has_permission(file.attached_to_doctype, ptype="read", doc=file.attached_to_name)):
|
||||
return True
|
||||
|
||||
raise frappe.PermissionError
|
||||
|
||||
|
||||
def get_extension(self):
|
||||
'''returns split filename and extension'''
|
||||
return os.path.splitext(self.file_name)
|
||||
|
||||
|
||||
def on_doctype_update():
|
||||
frappe.db.add_index("File", ["attached_to_doctype", "attached_to_name"])
|
||||
frappe.db.add_index("File", ["lft", "rgt"])
|
||||
|
|
@ -334,6 +600,7 @@ def create_new_folder(file_name, folder):
|
|||
|
||||
@frappe.whitelist()
|
||||
def move_file(file_list, new_parent, old_parent):
|
||||
|
||||
if isinstance(file_list, string_types):
|
||||
file_list = json.loads(file_list)
|
||||
|
||||
|
|
@ -421,14 +688,171 @@ def get_web_image(file_url):
|
|||
|
||||
return image, filename, extn
|
||||
|
||||
def check_file_permission(file_url):
|
||||
for file in frappe.get_all("File", filters={"file_url": file_url, "is_private": 1}, fields=["name", "attached_to_doctype", "attached_to_name"]):
|
||||
|
||||
if (frappe.has_permission("File", ptype="read", doc=file.name)
|
||||
or frappe.has_permission(file.attached_to_doctype, ptype="read", doc=file.attached_to_name)):
|
||||
return True
|
||||
def delete_file(path):
|
||||
"""Delete file from `public folder`"""
|
||||
if path:
|
||||
if ".." in path.split("/"):
|
||||
frappe.msgprint(_("It is risky to delete this file: {0}. Please contact your System Manager.").format(path))
|
||||
|
||||
parts = os.path.split(path.strip("/"))
|
||||
if parts[0]=="files":
|
||||
path = frappe.utils.get_site_path("public", "files", parts[-1])
|
||||
|
||||
else:
|
||||
path = frappe.utils.get_site_path("private", "files", parts[-1])
|
||||
|
||||
path = encode(path)
|
||||
if os.path.exists(path):
|
||||
os.remove(path)
|
||||
|
||||
|
||||
def remove_file(fid=None, attached_to_doctype=None, attached_to_name=None, from_delete=False):
|
||||
"""Remove file and File entry"""
|
||||
file_name = None
|
||||
if not (attached_to_doctype and attached_to_name):
|
||||
attached = frappe.db.get_value("File", fid,
|
||||
["attached_to_doctype", "attached_to_name", "file_name"])
|
||||
if attached:
|
||||
attached_to_doctype, attached_to_name, file_name = attached
|
||||
|
||||
ignore_permissions, comment = False, None
|
||||
if attached_to_doctype and attached_to_name and not from_delete:
|
||||
doc = frappe.get_doc(attached_to_doctype, attached_to_name)
|
||||
ignore_permissions = doc.has_permission("write") or False
|
||||
if frappe.flags.in_web_form:
|
||||
ignore_permissions = True
|
||||
if not file_name:
|
||||
file_name = frappe.db.get_value("File", fid, "file_name")
|
||||
comment = doc.add_comment("Attachment Removed", _("Removed {0}").format(file_name))
|
||||
frappe.delete_doc("File", fid, ignore_permissions=ignore_permissions)
|
||||
|
||||
return comment
|
||||
|
||||
|
||||
def get_max_file_size():
|
||||
return conf.get('max_file_size') or 10485760
|
||||
|
||||
|
||||
def remove_all(dt, dn, from_delete=False):
|
||||
"""remove all files in a transaction"""
|
||||
try:
|
||||
for fid in frappe.db.sql_list("""select name from `tabFile` where
|
||||
attached_to_doctype=%s and attached_to_name=%s""", (dt, dn)):
|
||||
remove_file(fid=fid, attached_to_doctype=dt, attached_to_name=dn, from_delete=from_delete)
|
||||
except Exception as e:
|
||||
if e.args[0]!=1054: raise # (temp till for patched)
|
||||
|
||||
|
||||
def remove_file_by_url(file_url, doctype=None, name=None):
|
||||
if doctype and name:
|
||||
fid = frappe.db.get_value("File", {
|
||||
"file_url": file_url,
|
||||
"attached_to_doctype": doctype,
|
||||
"attached_to_name": name})
|
||||
else:
|
||||
fid = frappe.db.get_value("File", {"file_url": file_url})
|
||||
|
||||
if fid:
|
||||
return remove_file(fid=fid)
|
||||
|
||||
|
||||
def get_content_hash(content):
|
||||
if isinstance(content, text_type):
|
||||
content = content.encode()
|
||||
return hashlib.md5(content).hexdigest() #nosec
|
||||
|
||||
|
||||
def get_file_name(fname, optional_suffix):
|
||||
# convert to unicode
|
||||
fname = cstr(fname)
|
||||
|
||||
f = fname.rsplit('.', 1)
|
||||
if len(f) == 1:
|
||||
partial, extn = f[0], ""
|
||||
else:
|
||||
partial, extn = f[0], "." + f[1]
|
||||
return '{partial}{suffix}{extn}'.format(partial=partial, extn=extn, suffix=optional_suffix)
|
||||
|
||||
|
||||
@frappe.whitelist()
|
||||
def download_file(file_url):
|
||||
"""
|
||||
Download file using token and REST API. Valid session or
|
||||
token is required to download private files.
|
||||
|
||||
Method : GET
|
||||
Endpoint : frappe.core.doctype.file.file.download_file
|
||||
URL Params : file_name = /path/to/file relative to site path
|
||||
"""
|
||||
file_doc = frappe.get_doc("File", {"file_url": file_url})
|
||||
file_doc.check_permission("read")
|
||||
path = os.path.join(get_files_path(), os.path.basename(file_url))
|
||||
|
||||
with open(path, "rb") as fileobj:
|
||||
filedata = fileobj.read()
|
||||
frappe.local.response.filename = os.path.basename(file_url)
|
||||
frappe.local.response.filecontent = filedata
|
||||
frappe.local.response.type = "download"
|
||||
|
||||
def extract_images_from_doc(doc, fieldname):
|
||||
content = doc.get(fieldname)
|
||||
content = extract_images_from_html(doc, content)
|
||||
if frappe.flags.has_dataurl:
|
||||
doc.set(fieldname, content)
|
||||
|
||||
|
||||
def extract_images_from_html(doc, content):
|
||||
frappe.flags.has_dataurl = False
|
||||
|
||||
def _save_file(match):
|
||||
data = match.group(1)
|
||||
data = data.split("data:")[1]
|
||||
headers, content = data.split(",")
|
||||
|
||||
if "filename=" in headers:
|
||||
filename = headers.split("filename=")[-1]
|
||||
|
||||
# decode filename
|
||||
if not isinstance(filename, text_type):
|
||||
filename = text_type(filename, 'utf-8')
|
||||
else:
|
||||
mtype = headers.split(";")[0]
|
||||
filename = get_random_filename(content_type=mtype)
|
||||
|
||||
doctype = doc.parenttype if doc.parent else doc.doctype
|
||||
name = doc.parent or doc.name
|
||||
|
||||
_file = frappe.get_doc({
|
||||
"doctype": "File",
|
||||
"file_name": filename,
|
||||
"attached_to_doctype": doctype,
|
||||
"attached_to_name": name,
|
||||
"content": content,
|
||||
"decode": True})
|
||||
_file.save()
|
||||
file_url = _file.file_url
|
||||
if not frappe.flags.has_dataurl:
|
||||
frappe.flags.has_dataurl = True
|
||||
|
||||
return '<img src="{file_url}"'.format(file_url=file_url)
|
||||
|
||||
if content:
|
||||
content = re.sub('<img[^>]*src\s*=\s*["\'](?=data:)(.*?)["\']', _save_file, content)
|
||||
|
||||
return content
|
||||
|
||||
|
||||
def get_random_filename(extn=None, content_type=None):
|
||||
if extn:
|
||||
if not extn.startswith("."):
|
||||
extn = "." + extn
|
||||
|
||||
elif content_type:
|
||||
extn = mimetypes.guess_extension(content_type)
|
||||
|
||||
return random_string(7) + (extn or "")
|
||||
|
||||
raise frappe.PermissionError
|
||||
|
||||
@frappe.whitelist()
|
||||
def unzip_file(name):
|
||||
|
|
@ -436,6 +860,7 @@ def unzip_file(name):
|
|||
file_obj = frappe.get_doc('File', name)
|
||||
file_obj.unzip()
|
||||
|
||||
|
||||
@frappe.whitelist()
|
||||
def get_attached_images(doctype, names):
|
||||
'''get list of image urls attached in form
|
||||
|
|
@ -456,3 +881,11 @@ def get_attached_images(doctype, names):
|
|||
out[i.docname].append(i.file_url)
|
||||
|
||||
return out
|
||||
|
||||
|
||||
@frappe.whitelist()
|
||||
def validate_filename(filename):
|
||||
from frappe.utils import now_datetime
|
||||
timestamp = now_datetime().strftime(" %Y-%m-%d %H:%M:%S")
|
||||
fname = get_file_name(filename, timestamp)
|
||||
return fname
|
||||
|
|
|
|||
|
|
@ -3,33 +3,191 @@
|
|||
# See license.txt
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import base64
|
||||
import frappe
|
||||
import os
|
||||
import unittest
|
||||
from frappe.utils.file_manager import save_file, get_files_path
|
||||
from frappe import _
|
||||
from frappe.core.doctype.file.file import move_file
|
||||
from frappe.utils import get_files_path
|
||||
# test_records = frappe.get_test_records('File')
|
||||
|
||||
test_content1 = 'Hello'
|
||||
test_content2 = 'Hello World'
|
||||
|
||||
|
||||
def make_test_doc():
|
||||
d = frappe.new_doc('ToDo')
|
||||
d.description = 'Test'
|
||||
d.save()
|
||||
return d.doctype, d.name
|
||||
|
||||
|
||||
class TestSimpleFile(unittest.TestCase):
|
||||
|
||||
|
||||
def setUp(self):
|
||||
self.attached_to_doctype, self.attached_to_docname = make_test_doc()
|
||||
self.test_content = test_content1
|
||||
_file = frappe.get_doc({
|
||||
"doctype": "File",
|
||||
"file_name": "test1.txt",
|
||||
"attached_to_doctype": self.attached_to_doctype,
|
||||
"attached_to_name": self.attached_to_docname,
|
||||
"content": self.test_content})
|
||||
_file.save()
|
||||
self.saved_file_url = _file.file_url
|
||||
|
||||
|
||||
def test_save(self):
|
||||
_file = frappe.get_doc("File", {"file_url": self.saved_file_url})
|
||||
content = _file.get_content()
|
||||
self.assertEqual(content, self.test_content)
|
||||
|
||||
|
||||
def tearDown(self):
|
||||
# File gets deleted on rollback, so blank
|
||||
pass
|
||||
|
||||
|
||||
class TestBase64File(unittest.TestCase):
|
||||
|
||||
|
||||
def setUp(self):
|
||||
self.attached_to_doctype, self.attached_to_docname = make_test_doc()
|
||||
self.test_content = base64.b64encode(test_content1.encode('utf-8'))
|
||||
_file = frappe.get_doc({
|
||||
"doctype": "File",
|
||||
"file_name": "test_base64.txt",
|
||||
"attached_to_doctype": self.attached_to_doctype,
|
||||
"attached_to_docname": self.attached_to_docname,
|
||||
"content": self.test_content,
|
||||
"decode": True})
|
||||
_file.save()
|
||||
self.saved_file_url = _file.file_url
|
||||
|
||||
|
||||
def test_saved_content(self):
|
||||
_file = frappe.get_doc("File", {"file_url": self.saved_file_url})
|
||||
content = _file.get_content()
|
||||
self.assertEqual(content, test_content1)
|
||||
|
||||
|
||||
def tearDown(self):
|
||||
# File gets deleted on rollback, so blank
|
||||
pass
|
||||
|
||||
|
||||
class TestSameFileName(unittest.TestCase):
|
||||
|
||||
|
||||
def setUp(self):
|
||||
self.attached_to_doctype, self.attached_to_docname = make_test_doc()
|
||||
self.test_content1 = test_content1
|
||||
self.test_content2 = test_content2
|
||||
_file1 = frappe.get_doc({
|
||||
"doctype": "File",
|
||||
"file_name": "testing.txt",
|
||||
"attached_to_doctype": self.attached_to_doctype,
|
||||
"attached_to_name": self.attached_to_docname,
|
||||
"content": self.test_content1})
|
||||
_file1.save()
|
||||
_file2 = frappe.get_doc({
|
||||
"doctype": "File",
|
||||
"file_name": "testing.txt",
|
||||
"attached_to_doctype": self.attached_to_doctype,
|
||||
"attached_to_name": self.attached_to_docname,
|
||||
"content": self.test_content2})
|
||||
_file2.save()
|
||||
self.saved_file_url1 = _file1.file_url
|
||||
self.saved_file_url2 = _file2.file_url
|
||||
|
||||
|
||||
def test_saved_content(self):
|
||||
_file = frappe.get_doc("File", {"file_url": self.saved_file_url1})
|
||||
content1 = _file.get_content()
|
||||
self.assertEqual(content1, self.test_content1)
|
||||
_file = frappe.get_doc("File", {"file_url": self.saved_file_url2})
|
||||
content2 = _file.get_content()
|
||||
self.assertEqual(content2, self.test_content2)
|
||||
|
||||
|
||||
def tearDown(self):
|
||||
# File gets deleted on rollback, so blank
|
||||
pass
|
||||
|
||||
|
||||
class TestSameContent(unittest.TestCase):
|
||||
|
||||
|
||||
def setUp(self):
|
||||
self.attached_to_doctype1, self.attached_to_docname1 = make_test_doc()
|
||||
self.attached_to_doctype2, self.attached_to_docname2 = make_test_doc()
|
||||
self.test_content1 = test_content1
|
||||
self.test_content2 = test_content1
|
||||
self.orig_filename = 'hello.txt'
|
||||
self.dup_filename = 'hello2.txt'
|
||||
_file1 = frappe.get_doc({
|
||||
"doctype": "File",
|
||||
"file_name": self.orig_filename,
|
||||
"attached_to_doctype": self.attached_to_doctype1,
|
||||
"attached_to_name": self.attached_to_docname1,
|
||||
"content": self.test_content1})
|
||||
_file1.save()
|
||||
|
||||
_file2 = frappe.get_doc({
|
||||
"doctype": "File",
|
||||
"file_name": self.dup_filename,
|
||||
"attached_to_doctype": self.attached_to_doctype2,
|
||||
"attached_to_name": self.attached_to_docname2,
|
||||
"content": self.test_content2})
|
||||
|
||||
_file2.save()
|
||||
|
||||
|
||||
def test_saved_content(self):
|
||||
self.assertFalse(os.path.exists(get_files_path(self.dup_filename)))
|
||||
|
||||
|
||||
def tearDown(self):
|
||||
# File gets deleted on rollback, so blank
|
||||
pass
|
||||
|
||||
|
||||
class TestFile(unittest.TestCase):
|
||||
|
||||
|
||||
def setUp(self):
|
||||
self.delete_test_data()
|
||||
self.upload_file()
|
||||
|
||||
|
||||
def tearDown(self):
|
||||
try:
|
||||
frappe.get_doc("File", {"file_name": "file_copy.txt"}).delete()
|
||||
except frappe.DoesNotExistError:
|
||||
pass
|
||||
|
||||
|
||||
def delete_test_data(self):
|
||||
for f in frappe.db.sql('''select name, file_name from tabFile where
|
||||
is_home_folder = 0 and is_attachments_folder = 0 order by rgt-lft asc'''):
|
||||
frappe.delete_doc("File", f[0])
|
||||
|
||||
|
||||
def upload_file(self):
|
||||
self.saved_file = save_file('file_copy.txt', "Testing file copy example.",\
|
||||
"", "", self.get_folder("Test Folder 1", "Home").name)
|
||||
self.saved_filename = get_files_path(self.saved_file.file_name)
|
||||
_file = frappe.get_doc({
|
||||
"doctype": "File",
|
||||
"file_name": "file_copy.txt",
|
||||
"attached_to_name": "",
|
||||
"attached_to_doctype": "",
|
||||
"folder": self.get_folder("Test Folder 1", "Home").name,
|
||||
"content": "Testing file copy example."})
|
||||
_file.save()
|
||||
self.saved_folder = _file.folder
|
||||
self.saved_name = _file.name
|
||||
self.saved_filename = get_files_path(_file.file_name)
|
||||
|
||||
|
||||
def get_folder(self, folder_name, parent_folder="Home"):
|
||||
return frappe.get_doc({
|
||||
|
|
@ -39,30 +197,39 @@ class TestFile(unittest.TestCase):
|
|||
"folder": _(parent_folder)
|
||||
}).insert()
|
||||
|
||||
|
||||
def tests_after_upload(self):
|
||||
self.assertEqual(self.saved_file.folder, _("Home/Test Folder 1"))
|
||||
self.assertEqual(self.saved_folder, _("Home/Test Folder 1"))
|
||||
|
||||
folder_size = frappe.db.get_value("File", _("Home/Test Folder 1"), "file_size")
|
||||
saved_file_size = frappe.db.get_value("File", self.saved_file.name, "file_size")
|
||||
saved_file_size = frappe.db.get_value("File", self.saved_name, "file_size")
|
||||
|
||||
self.assertEqual(folder_size, saved_file_size)
|
||||
|
||||
|
||||
def test_file_copy(self):
|
||||
folder = self.get_folder("Test Folder 2", "Home")
|
||||
|
||||
file = frappe.get_doc("File", {"file_name":"file_copy.txt"})
|
||||
file = frappe.get_doc("File", {"file_name": "file_copy.txt"})
|
||||
move_file([{"name": file.name}], folder.name, file.folder)
|
||||
file = frappe.get_doc("File", {"file_name":"file_copy.txt"})
|
||||
file = frappe.get_doc("File", {"file_name": "file_copy.txt"})
|
||||
|
||||
self.assertEqual(_("Home/Test Folder 2"), file.folder)
|
||||
self.assertEqual(frappe.db.get_value("File", _("Home/Test Folder 2"), "file_size"), file.file_size)
|
||||
self.assertEqual(frappe.db.get_value("File", _("Home/Test Folder 1"), "file_size"), 0)
|
||||
|
||||
|
||||
def test_folder_copy(self):
|
||||
folder = self.get_folder("Test Folder 2", "Home")
|
||||
folder = self.get_folder("Test Folder 3", "Home/Test Folder 2")
|
||||
|
||||
self.saved_file = save_file('folder_copy.txt', "Testing folder copy example.", "", "", folder.name)
|
||||
_file = frappe.get_doc({
|
||||
"doctype": "File",
|
||||
"file_name": "folder_copy.txt",
|
||||
"attached_to_name": "",
|
||||
"attached_to_doctype": "",
|
||||
"folder": folder.name,
|
||||
"content": "Testing folder copy example"})
|
||||
_file.save()
|
||||
|
||||
move_file([{"name": folder.name}], 'Home/Test Folder 1', folder.folder)
|
||||
|
||||
|
|
@ -75,29 +242,39 @@ class TestFile(unittest.TestCase):
|
|||
self.assertEqual(frappe.db.get_value("File", _("Home/Test Folder 1"), "file_size"), file.file_size)
|
||||
self.assertEqual(frappe.db.get_value("File", _("Home/Test Folder 2"), "file_size"), 0)
|
||||
|
||||
def test_non_parent_folder(self):
|
||||
|
||||
def test_default_folder(self):
|
||||
d = frappe.get_doc({
|
||||
"doctype": "File",
|
||||
"file_name": _("Test_Folder"),
|
||||
"is_folder": 1
|
||||
})
|
||||
d.save()
|
||||
self.assertEqual(d.folder, "Home")
|
||||
|
||||
self.assertRaises(frappe.ValidationError, d.save)
|
||||
|
||||
def test_on_delete(self):
|
||||
file = frappe.get_doc("File", {"file_name":"file_copy.txt"})
|
||||
file = frappe.get_doc("File", {"file_name": "file_copy.txt"})
|
||||
file.delete()
|
||||
|
||||
self.assertEqual(frappe.db.get_value("File", _("Home/Test Folder 1"), "file_size"), 0)
|
||||
|
||||
folder = self.get_folder("Test Folder 3", "Home/Test Folder 1")
|
||||
self.saved_file = save_file('folder_copy.txt', "Testing folder copy example.", "", "", folder.name)
|
||||
_file = frappe.get_doc({
|
||||
"doctype": "File",
|
||||
"file_name": "folder_copy.txt",
|
||||
"attached_to_name": "",
|
||||
"attached_to_doctype": "",
|
||||
"folder": folder.name,
|
||||
"content": "Testing folder copy example"})
|
||||
_file.save()
|
||||
|
||||
folder = frappe.get_doc("File", "Home/Test Folder 1/Test Folder 3")
|
||||
self.assertRaises(frappe.ValidationError, folder.delete)
|
||||
|
||||
|
||||
def test_file_upload_limit(self):
|
||||
from frappe.utils.file_manager import MaxFileSizeReachedError
|
||||
from frappe.core.doctype.file.file import MaxFileSizeReachedError
|
||||
from frappe.limits import update_limits, clear_limit
|
||||
from frappe import _dict
|
||||
|
||||
|
|
@ -114,8 +291,15 @@ class TestFile(unittest.TestCase):
|
|||
# Rebuild the frappe.local.conf to take up the changes from site_config
|
||||
frappe.local.conf = _dict(frappe.get_site_config())
|
||||
|
||||
self.assertRaises(MaxFileSizeReachedError, save_file, '_test_max_space.txt',
|
||||
'This files test for max space usage', "", "", self.get_folder("Test Folder 2", "Home").name)
|
||||
_file = frappe.get_doc({
|
||||
"doctype": "File",
|
||||
"file_name": "_test_max_space.txt",
|
||||
"attached_to_name": "",
|
||||
"attached_to_doctype": "",
|
||||
"folder": self.get_folder("Test Folder 2", "Home").name,
|
||||
"content": "This file tests for max space usage"})
|
||||
self.assertRaises(MaxFileSizeReachedError,
|
||||
_file.save)
|
||||
|
||||
# Scrub the site_config and rebuild frappe.local.conf
|
||||
clear_limit("space")
|
||||
|
|
|
|||
|
|
@ -11,10 +11,10 @@ import frappe
|
|||
from frappe.model.document import Document
|
||||
from frappe.utils.background_jobs import enqueue
|
||||
from frappe.desk.query_report import generate_report_result, get_columns_dict
|
||||
from frappe.utils.file_manager import save_file, remove_all
|
||||
from frappe.core.doctype.file.file import remove_all
|
||||
from frappe.utils.csvutils import to_csv, read_csv_content_from_attached_file
|
||||
from frappe.desk.form.load import get_attachments
|
||||
from frappe.utils.file_manager import download_file
|
||||
from frappe.core.doctype.file.file import download_file
|
||||
|
||||
|
||||
class PreparedReport(Document):
|
||||
|
|
@ -61,15 +61,15 @@ def create_csv_file(columns, data, dt, dn):
|
|||
csv_filename = '{0}.csv'.format(frappe.utils.data.format_datetime(frappe.utils.now(), "Y-m-d-H:M"))
|
||||
rows = [tuple(columns)] + data
|
||||
encoded = base64.b64encode(frappe.safe_encode(to_csv(rows)))
|
||||
# Call save_file function to upload and attach the file
|
||||
save_file(
|
||||
fname=csv_filename,
|
||||
content=encoded,
|
||||
dt=dt,
|
||||
dn=dn,
|
||||
folder=None,
|
||||
decode=True,
|
||||
is_private=False)
|
||||
# Call save() file function to upload and attach the file
|
||||
_file = frappe.get_doc({
|
||||
"doctype": "File",
|
||||
"file_name": csv_filename,
|
||||
"attached_to_doctype": dt,
|
||||
"attached_to_name": dn,
|
||||
"content": encoded,
|
||||
"decode": True})
|
||||
_file.save()
|
||||
|
||||
|
||||
@frappe.whitelist()
|
||||
|
|
|
|||
|
|
@ -13,9 +13,11 @@ from six import string_types
|
|||
@frappe.whitelist()
|
||||
def remove_attach():
|
||||
"""remove attachment"""
|
||||
import frappe.utils.file_manager
|
||||
fid = frappe.form_dict.get('fid')
|
||||
return frappe.utils.file_manager.remove_file(fid)
|
||||
file_name = frappe.form_dict.get('file_name')
|
||||
frappe.delete_doc('File', fid)
|
||||
return doc.add_comment("Attachment Removed", _("Removed {0}").format(file_name))
|
||||
|
||||
|
||||
@frappe.whitelist()
|
||||
def validate_link():
|
||||
|
|
|
|||
|
|
@ -7,7 +7,6 @@ import frappe, json, os
|
|||
from frappe.utils import strip, cint
|
||||
from frappe.translate import (set_default_language, get_dict, send_translations)
|
||||
from frappe.geo.country_info import get_country_info
|
||||
from frappe.utils.file_manager import save_file
|
||||
from frappe.utils.password import update_password
|
||||
from werkzeug.useragents import UserAgent
|
||||
from . import install_fixtures
|
||||
|
|
@ -187,7 +186,15 @@ def update_user_name(args):
|
|||
attach_user = args.get("attach_user").split(",")
|
||||
if len(attach_user)==3:
|
||||
filename, filetype, content = attach_user
|
||||
fileurl = save_file(filename, content, "User", args.get("name"), decode=True).file_url
|
||||
_file = frappe.get_doc({
|
||||
"doctype": "File",
|
||||
"file_name": filename,
|
||||
"attached_to_doctype": "User",
|
||||
"attached_to_name": args.get("name"),
|
||||
"content": content,
|
||||
"decode": True})
|
||||
_file.save()
|
||||
fileurl = _file.file_url
|
||||
frappe.db.set_value("User", args.get("name"), "user_image", fileurl)
|
||||
|
||||
if args.get('name'):
|
||||
|
|
|
|||
|
|
@ -8,7 +8,6 @@ test_records = frappe.get_test_records('Email Account')
|
|||
|
||||
from frappe.core.doctype.communication.email import make
|
||||
from frappe.desk.form.load import get_attachments
|
||||
from frappe.utils.file_manager import delete_file_from_filesystem
|
||||
from frappe.email.doctype.email_account.email_account import notify_unreplied
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
|
|
@ -55,7 +54,6 @@ class TestEmailAccount(unittest.TestCase):
|
|||
frappe.db.sql("DELETE FROM `tabCommunication` WHERE sender='test_sender@example.com'")
|
||||
existing_file = frappe.get_doc({'doctype': 'File', 'file_name': 'erpnext-conf-14.png'})
|
||||
frappe.delete_doc("File", existing_file.name)
|
||||
delete_file_from_filesystem(existing_file)
|
||||
|
||||
with open(os.path.join(os.path.dirname(__file__), "test_mails", "incoming-2.raw"), "r") as testfile:
|
||||
test_mails = [testfile.read()]
|
||||
|
|
@ -73,7 +71,7 @@ class TestEmailAccount(unittest.TestCase):
|
|||
# cleanup
|
||||
existing_file = frappe.get_doc({'doctype': 'File', 'file_name': 'erpnext-conf-14.png'})
|
||||
frappe.delete_doc("File", existing_file.name)
|
||||
delete_file_from_filesystem(existing_file)
|
||||
|
||||
|
||||
def test_incoming_attached_email_from_outlook_plain_text_only(self):
|
||||
frappe.db.sql("delete from tabCommunication where sender='test_sender@example.com'")
|
||||
|
|
|
|||
|
|
@ -144,12 +144,12 @@ class EMail:
|
|||
|
||||
def attach_file(self, n):
|
||||
"""attach a file from the `FileData` table"""
|
||||
from frappe.utils.file_manager import get_file
|
||||
res = get_file(n)
|
||||
if not res:
|
||||
_file = frappe.get_doc("File", {"file_name": n})
|
||||
content = _file.get_content()
|
||||
if not content:
|
||||
return
|
||||
|
||||
self.add_attachment(res[0], res[1])
|
||||
self.add_attachment(_file.file_name, content)
|
||||
|
||||
def add_attachment(self, fname, fcontent, content_type=None,
|
||||
parent=None, content_id=None, inline=False):
|
||||
|
|
|
|||
|
|
@ -11,7 +11,6 @@ from frappe.email.email_body import get_email, get_formatted_html, add_attachmen
|
|||
from frappe.utils.verified_command import get_signed_params, verify_request
|
||||
from html2text import html2text
|
||||
from frappe.utils import get_url, nowdate, encode, now_datetime, add_days, split_emails, cstr, cint
|
||||
from frappe.utils.file_manager import get_file
|
||||
from rq.timeouts import JobTimeoutException
|
||||
from frappe.utils.scheduler import log
|
||||
from six import text_type, string_types
|
||||
|
|
@ -531,9 +530,10 @@ def prepare_message(email, recipient, recipients_list):
|
|||
|
||||
fid = attachment.get("fid")
|
||||
if fid:
|
||||
fname, fcontent = get_file(fid)
|
||||
_file = frappe.get_doc("File", {"file_name": fid})
|
||||
fcontent = _file.get_content()
|
||||
attachment.update({
|
||||
'fname': fname,
|
||||
'fname': _file.file_name,
|
||||
'fcontent': fcontent,
|
||||
'parent': msg_obj
|
||||
})
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ from frappe import _, safe_decode, safe_encode
|
|||
from frappe.utils import (extract_email_id, convert_utc_to_user_timezone, now,
|
||||
cint, cstr, strip, markdown, parse_addr)
|
||||
from frappe.utils.scheduler import log
|
||||
from frappe.utils.file_manager import get_random_filename, save_file, MaxFileSizeReachedError
|
||||
from frappe.core.doctype.file.file import get_random_filename, MaxFileSizeReachedError
|
||||
|
||||
class EmailSizeExceededError(frappe.ValidationError): pass
|
||||
class EmailTimeoutError(frappe.ValidationError): pass
|
||||
|
|
@ -523,12 +523,18 @@ class Email:
|
|||
|
||||
for attachment in self.attachments:
|
||||
try:
|
||||
file_data = save_file(attachment['fname'], attachment['fcontent'],
|
||||
doc.doctype, doc.name, is_private=1)
|
||||
saved_attachments.append(file_data)
|
||||
_file = frappe.get_doc({
|
||||
"doctype": "File",
|
||||
"file_name": attachment['fname'],
|
||||
"attached_to_doctype": doc.doctype,
|
||||
"attached_to_name": doc.name,
|
||||
"is_private": 1,
|
||||
"content": attachment['fcontent']})
|
||||
_file.save()
|
||||
saved_attachments.append(_file)
|
||||
|
||||
if attachment['fname'] in self.cid_map:
|
||||
self.cid_map[file_data.name] = self.cid_map[attachment['fname']]
|
||||
self.cid_map[_file.name] = self.cid_map[attachment['fname']]
|
||||
|
||||
except MaxFileSizeReachedError:
|
||||
# WARNING: bypass max file size exception
|
||||
|
|
|
|||
|
|
@ -6,7 +6,6 @@ import frappe
|
|||
from frappe import _
|
||||
import frappe.utils
|
||||
import frappe.sessions
|
||||
import frappe.utils.file_manager
|
||||
import frappe.desk.form.run_method
|
||||
from frappe.utils.response import build_response
|
||||
from werkzeug.wrappers import Response
|
||||
|
|
@ -111,7 +110,18 @@ def uploadfile():
|
|||
try:
|
||||
if frappe.form_dict.get('from_form'):
|
||||
try:
|
||||
ret = frappe.utils.file_manager.upload()
|
||||
ret = frappe.get_doc({
|
||||
"doctype": "File",
|
||||
"attached_to_name": frappe.form_dict.docname,
|
||||
"attached_to_doctype": frappe.form_dict.doctype,
|
||||
"attached_to_field": frappe.form_dict.docfield,
|
||||
"file_url": frappe.form_dict.file_url,
|
||||
"file_name": frappe.form_dict.filename,
|
||||
"is_private": frappe.utils.cint(frappe.form_dict.is_private),
|
||||
"content": frappe.form_dict.filedata,
|
||||
"decode": True
|
||||
})
|
||||
ret.save()
|
||||
except frappe.DuplicateEntryError:
|
||||
# ignore pass
|
||||
ret = None
|
||||
|
|
|
|||
|
|
@ -7,7 +7,6 @@ import frappe
|
|||
from frappe import _
|
||||
from frappe.model.document import Document
|
||||
from frappe.integrations.doctype.gsuite_settings.gsuite_settings import run_gsuite_script
|
||||
from frappe.utils.file_manager import save_url
|
||||
|
||||
class GSuiteTemplates(Document):
|
||||
pass
|
||||
|
|
@ -25,7 +24,16 @@ def create_gsuite_doc(doctype, docname, gs_template=None):
|
|||
|
||||
r = run_gsuite_script('new', filename, templ.template_id, templ.destination_id, json_data)
|
||||
|
||||
filedata = save_url(r['url'], filename, doctype, docname, "Home/Attachments", True)
|
||||
_file = frappe.get_doc({
|
||||
"doctype": "File",
|
||||
"file_url": r['url'],
|
||||
"file_name": filename,
|
||||
"attached_to_doctype": doctype,
|
||||
"attached_to_name": docname,
|
||||
"attached_to_field": True,
|
||||
"folder": "Home/Attachments"})
|
||||
_file.save()
|
||||
|
||||
comment = frappe.get_doc(doctype, docname).add_comment("Attachment",
|
||||
_("added {0}").format("<a href='{file_url}' target='_blank'>{file_name}</a>{icon}".format(**{
|
||||
"icon": ' <i class="fa fa-lock text-warning"></i>' if filedata.is_private else "",
|
||||
|
|
|
|||
|
|
@ -173,7 +173,7 @@ def clear_limit(key):
|
|||
|
||||
def validate_space_limit(file_size):
|
||||
"""Stop from writing file if max space limit is reached"""
|
||||
from frappe.utils.file_manager import MaxFileSizeReachedError
|
||||
from frappe.core.doctype.file.file import MaxFileSizeReachedError
|
||||
|
||||
limits = get_limits()
|
||||
if not limits.space:
|
||||
|
|
|
|||
|
|
@ -774,7 +774,7 @@ class BaseDocument(object):
|
|||
return cast_fieldtype(df.fieldtype, value)
|
||||
|
||||
def _extract_images_from_text_editor(self):
|
||||
from frappe.utils.file_manager import extract_images_from_doc
|
||||
from frappe.core.doctype.file.file import extract_images_from_doc
|
||||
if self.doctype != "DocType":
|
||||
for df in self.meta.get("fields", {"fieldtype": ('=', "Text Editor")}):
|
||||
extract_images_from_doc(self, df.fieldname)
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ import frappe
|
|||
import frappe.model.meta
|
||||
from frappe.model.dynamic_links import get_dynamic_link_map
|
||||
import frappe.defaults
|
||||
from frappe.utils.file_manager import remove_all
|
||||
from frappe.core.doctype.file.file import remove_all
|
||||
from frappe.utils.password import delete_all_passwords_for
|
||||
from frappe import _
|
||||
from frappe.model.naming import revert_series_if_last
|
||||
|
|
|
|||
|
|
@ -14,7 +14,6 @@ from werkzeug.exceptions import NotFound, Forbidden
|
|||
import hashlib, json
|
||||
from frappe.model import optional_fields
|
||||
from frappe.model.workflow import validate_workflow
|
||||
from frappe.utils.file_manager import save_url
|
||||
from frappe.utils.global_search import update_global_search
|
||||
from frappe.integrations.doctype.webhook import run_webhooks
|
||||
|
||||
|
|
@ -322,7 +321,15 @@ class Document(BaseDocument):
|
|||
for attach_item in get_attachments(self.doctype, self.amended_from):
|
||||
|
||||
#save attachments to new doc
|
||||
save_url(attach_item.file_url, attach_item.file_name, self.doctype, self.name, "Home/Attachments", attach_item.is_private)
|
||||
_file = frappe.get_doc({
|
||||
"doctype": "File",
|
||||
"file_url": attach_item.file_url,
|
||||
"file_name": attach_item.file_name,
|
||||
"attached_to_name": self.name,
|
||||
"attached_to_doctype": self.doctype,
|
||||
"folder": "Home/Attachments"})
|
||||
_file.save()
|
||||
|
||||
|
||||
def update_children(self):
|
||||
'''update child tables'''
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ from __future__ import unicode_literals, print_function
|
|||
import frappe
|
||||
import os
|
||||
from frappe.utils import get_files_path
|
||||
from frappe.utils.file_manager import get_content_hash, get_file
|
||||
from frappe.core.doctype.file.file import get_content_hash
|
||||
|
||||
|
||||
def execute():
|
||||
|
|
@ -22,7 +22,8 @@ def execute():
|
|||
else:
|
||||
b.file_url = os.path.normpath('/files/' + old_file_name)
|
||||
try:
|
||||
_file_name, content = get_file(name)
|
||||
_file = frappe.get_doc("File", {"file_name": name})
|
||||
content = _file.get_content()
|
||||
b.content_hash = get_content_hash(content)
|
||||
except IOError:
|
||||
print('Warning: Error processing ', name)
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ from __future__ import unicode_literals, print_function
|
|||
|
||||
import frappe
|
||||
import os
|
||||
from frappe.utils.file_manager import get_content_hash, get_file, get_file_name
|
||||
from frappe.core.doctype.file.file import get_content_hash, get_file_name
|
||||
from frappe.utils import get_files_path, get_site_path
|
||||
|
||||
# The files missed by the previous patch might have been replaced with new files
|
||||
|
|
@ -36,7 +36,8 @@ def execute():
|
|||
else:
|
||||
b.file_url = os.path.normpath('/files/' + old_file_name)
|
||||
try:
|
||||
_file_name, content = get_file(name)
|
||||
_file = frappe.get_doc("File", {"file_name": name})
|
||||
content = _file.get_content()
|
||||
b.content_hash = get_content_hash(content)
|
||||
except IOError:
|
||||
print('Warning: Error processing ', name)
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ class PrintFormat(Document):
|
|||
validate_template(self.html)
|
||||
|
||||
def extract_images(self):
|
||||
from frappe.utils.file_manager import extract_images_from_html
|
||||
from frappe.core.doctype.file.file import extract_images_from_html
|
||||
if self.format_data:
|
||||
data = json.loads(self.format_data)
|
||||
for df in data:
|
||||
|
|
|
|||
|
|
@ -317,7 +317,7 @@ frappe.upload = {
|
|||
} else {
|
||||
args.file_size = fileobj.size;
|
||||
frappe.call({
|
||||
method: 'frappe.utils.file_manager.validate_filename',
|
||||
method: 'frappe.core.doctype.file.file.validate_filename',
|
||||
args: {"filename": args.filename},
|
||||
callback: function(r) {
|
||||
args.filename = r.message;
|
||||
|
|
|
|||
|
|
@ -337,13 +337,19 @@ def get_qr_svg_code(totp_uri):
|
|||
|
||||
def qrcode_as_png(user, totp_uri):
|
||||
'''Save temporary Qrcode to server.'''
|
||||
from frappe.utils.file_manager import save_file
|
||||
folder = create_barcode_folder()
|
||||
png_file_name = '{}.png'.format(frappe.generate_hash(length=20))
|
||||
file_obj = save_file(png_file_name, png_file_name, 'User', user, folder=folder)
|
||||
_file = frappe.get_doc({
|
||||
"doctype": "File",
|
||||
"file_name": png_file_name,
|
||||
"attached_to_doctype": 'User',
|
||||
"attached_to_name": user,
|
||||
"folder": folder,
|
||||
"content": png_file_name})
|
||||
_file.save()
|
||||
frappe.db.commit()
|
||||
file_url = get_url(file_obj.file_url)
|
||||
file_path = os.path.join(frappe.get_site_path('public', 'files'), file_obj.file_name)
|
||||
file_url = get_url(_file.file_url)
|
||||
file_path = os.path.join(frappe.get_site_path('public', 'files'), _file.file_name)
|
||||
url = qrcreate(totp_uri)
|
||||
with open(file_path, 'w') as png_file:
|
||||
url.png(png_file, scale=8, module_color=[0, 0, 0, 180], background=[0xff, 0xff, 0xcc])
|
||||
|
|
|
|||
|
|
@ -15,8 +15,8 @@ def read_csv_content_from_uploaded_file(ignore_encoding=False):
|
|||
with open(frappe.uploaded_file, "r") as upfile:
|
||||
fcontent = upfile.read()
|
||||
else:
|
||||
from frappe.utils.file_manager import get_uploaded_content
|
||||
fname, fcontent = get_uploaded_content()
|
||||
_file = frappe.new_doc("File")
|
||||
fcontent = _file.get_uploaded_content()
|
||||
return read_csv_content(fcontent, ignore_encoding)
|
||||
|
||||
def read_csv_content_from_attached_file(doc):
|
||||
|
|
@ -30,8 +30,8 @@ def read_csv_content_from_attached_file(doc):
|
|||
raise Exception
|
||||
|
||||
try:
|
||||
from frappe.utils.file_manager import get_file
|
||||
fname, fcontent = get_file(fileid)
|
||||
_file = frappe.get_doc("File", {"file_name": fileid})
|
||||
fcontent = _file.get_content()
|
||||
return read_csv_content(fcontent, frappe.form_dict.get('ignore_encoding_errors'))
|
||||
except Exception:
|
||||
frappe.throw(_("Unable to open attached file. Did you export it as CSV?"), title=_('Invalid CSV Format'))
|
||||
|
|
|
|||
|
|
@ -17,7 +17,6 @@ from werkzeug.local import LocalProxy
|
|||
from werkzeug.wsgi import wrap_file
|
||||
from werkzeug.wrappers import Response
|
||||
from werkzeug.exceptions import NotFound, Forbidden
|
||||
from frappe.core.doctype.file.file import check_file_permission
|
||||
from frappe.website.render import render
|
||||
from frappe.utils import cint
|
||||
from six import text_type
|
||||
|
|
@ -147,7 +146,8 @@ def download_backup(path):
|
|||
def download_private_file(path):
|
||||
"""Checks permissions and sends back private file"""
|
||||
try:
|
||||
check_file_permission(path)
|
||||
_file = frappe.get_doc("File", {"file_url": path})
|
||||
_file.check_file_permission()
|
||||
|
||||
except frappe.PermissionError:
|
||||
raise Forbidden(_("You don't have permission to access this file"))
|
||||
|
|
|
|||
|
|
@ -70,8 +70,8 @@ def handle_html(data):
|
|||
|
||||
def read_xlsx_file_from_attached_file(file_id=None, fcontent=None, filepath=None):
|
||||
if file_id:
|
||||
from frappe.utils.file_manager import get_file_path
|
||||
filename = get_file_path(file_id)
|
||||
_file = frappe.get_doc("File", {"file_name": file_id})
|
||||
filename = _file.get_full_path()
|
||||
elif fcontent:
|
||||
from io import BytesIO
|
||||
filename = BytesIO(fcontent)
|
||||
|
|
|
|||
|
|
@ -6,10 +6,10 @@ import frappe, json, os
|
|||
from frappe.website.website_generator import WebsiteGenerator
|
||||
from frappe import _, scrub
|
||||
from frappe.utils import cstr
|
||||
from frappe.utils.file_manager import save_file, remove_file_by_url
|
||||
from frappe.website.utils import get_comment_list
|
||||
from frappe.custom.doctype.customize_form.customize_form import docfield_properties
|
||||
from frappe.utils.file_manager import get_max_file_size
|
||||
from frappe.core.doctype.file.file import get_max_file_size
|
||||
from frappe.core.doctype.file.file import remove_file_by_url
|
||||
from frappe.modules.utils import export_module_json, get_doc_module
|
||||
from six.moves.urllib.parse import urlencode
|
||||
from frappe.integrations.utils import get_payment_gateway_controller
|
||||
|
|
@ -416,22 +416,29 @@ def accept(web_form, data, for_payment=False):
|
|||
|
||||
# remove earlier attached file (if exists)
|
||||
if doc.get(fieldname):
|
||||
remove_file_by_url(doc.get(fieldname), doc.doctype, doc.name)
|
||||
remove_file_by_url(doc.get(fieldname), doctype=doc.doctype, name=doc.name)
|
||||
|
||||
# save new file
|
||||
filename, dataurl = filedata.split(',', 1)
|
||||
filedoc = save_file(filename, dataurl,
|
||||
doc.doctype, doc.name, decode=True)
|
||||
_file = frappe.get_doc({
|
||||
"doctype": "File",
|
||||
"file_name": filename,
|
||||
"attached_to_doctype": doc.doctype,
|
||||
"attached_to_name": doc.name,
|
||||
"content": dataurl,
|
||||
"decode": True})
|
||||
_file.save()
|
||||
|
||||
# update values
|
||||
doc.set(fieldname, filedoc.file_url)
|
||||
doc.set(fieldname, _file.file_url)
|
||||
|
||||
doc.save(ignore_permissions = True)
|
||||
|
||||
if files_to_delete:
|
||||
for f in files_to_delete:
|
||||
if f:
|
||||
remove_file_by_url(f, doc.doctype, doc.name)
|
||||
remove_file_by_url(doc.get(fieldname), doctype=doc.doctype, name=doc.name)
|
||||
|
||||
|
||||
frappe.flags.web_form_doc = doc
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue