[2/3] file-api: code migration

migrate api from file_manager.py to file.py

Signed-off-by: Chinmay Pai <chinmaydpai@gmail.com>
This commit is contained in:
Chinmay Pai 2018-09-03 18:04:45 +05:30
parent 16a99f5472
commit 22ba310aaf
No known key found for this signature in database
GPG key ID: 75507BE256F40CED
20 changed files with 129 additions and 126 deletions

View file

@ -10,7 +10,7 @@ from email.utils import formataddr
from frappe.core.utils import get_parent_doc
from frappe.utils import (get_url, get_formatted_email, cint,
validate_email_add, split_emails, time_diff_in_seconds, parse_addr, get_datetime)
from frappe.utils.file_manager import get_file
from frappe.core.doctype.file.file import get_file
from frappe.email.queue import check_email_limit
from frappe.utils.scheduler import log
from frappe.email.email_body import get_message_id

View file

@ -272,7 +272,7 @@ def upload(rows = None, submit_after_import=None, ignore_encoding_errors=False,
# header
filename, file_extension = ['','']
if not rows:
from frappe.utils.file_manager import get_file # get_file_doc
from frappe.core.doctype.file.file import get_file # get_file_doc
fname, fcontent = get_file(data_import_doc.import_file)
filename, file_extension = os.path.splitext(fname)

View file

@ -199,7 +199,7 @@ class File(NestedSet):
self.check_folder_is_empty()
self.check_reference_doc_permission()
super(File, self).on_trash()
self.delete_file()
self.call_delete_file()
def make_thumbnail(self, set_as_thumbnail=True, width=300, height=300, suffix="small", crop=False):
if self.file_url:
@ -265,14 +265,14 @@ class File(NestedSet):
except frappe.DoesNotExistError:
pass
def delete_file(self):
def call_delete_file(self):
"""If file not attached to any other record, delete it"""
if self.file_name and self.content_hash and (not frappe.db.count("File",
{"content_hash": self.content_hash, "name": ["!=", self.name]})):
delete_file_data_content(self)
self.delete_file_data_content()
elif self.file_url:
delete_file_data_content(self, only_thumbnail=True)
self.delete_file_data_content(only_thumbnail=True)
def on_rollback(self):
self.flags.on_rollback = True
@ -308,17 +308,6 @@ class File(NestedSet):
frappe.delete_doc('File', self.name)
def remove_file_by_url(self):
if self.attached_to_doctype and self.attached_to_name:
fid = frappe.db.get_value("File", {"file_url": self.file_url,
"attached_to_doctype": self.attached_to_doctype, "attached_to_name": self.attached_to_name})
else:
fid = frappe.db.get_value("File", {"file_url": self.file_url})
if fid:
return remove_file(fid)
def get_file_url(self):
data = frappe.db.get_value("File", self.file_data_name, ["file_name", "file_url"], as_dict=True)
return data.file_url or data.file_name
@ -434,10 +423,10 @@ class File(NestedSet):
self.content = self.content.split(b",")[1]
self.content = base64.b64decode(self.content)
file_size = check_max_file_size(self.content)
file_size = self.check_max_file_size()
self.content_hash = get_content_hash(self.content)
self.content_type = mimetypes.guess_type(self.fname)[0]
self.fname = self.get_file_name(content_hash[-6:])
self.fname = get_file_name(self.fname, self.content_hash[-6:])
self.is_private = is_private
file_data = self.get_file_data_from_hash(is_private=self.is_private)
if not file_data:
@ -477,7 +466,7 @@ class File(NestedSet):
def save_file_on_filesystem(self):
fpath = write_file(self.content, self.fname, self.is_private)
fpath = self.write_file(self, is_private=self.is_private)
if self.is_private:
file_url = "/private/files/{0}".format(self.fname)
@ -490,6 +479,89 @@ class File(NestedSet):
}
def check_max_file_size(self):
max_file_size = get_max_file_size()
file_size = len(self.content)
if file_size > max_file_size:
frappe.msgprint(_("File size exceeded the maximum allowed size of {0} MB").format(
max_file_size / 1048576),
raise_exception=MaxFileSizeReachedError)
return file_size
def write_file(self, is_private=0):
"""write file to disk with a random name (to compare)"""
file_path = get_files_path(is_private=is_private)
# create directory (if not exists)
frappe.create_folder(file_path)
# write the file
if isinstance(self.content, text_type):
self.content = self.content.encode()
with open(os.path.join(file_path.encode('utf-8'), self.fname.encode('utf-8')), 'wb+') as f:
f.write(self.content)
return get_files_path(self.fname, is_private=is_private)
def remove_file(self, attached_to_doctype=None, attached_to_name=None, from_delete=False):
"""Remove file and File entry"""
file_name = None
if not (attached_to_doctype and attached_to_name):
attached = frappe.db.get_value("File", fid,
["attached_to_doctype", "attached_to_name", "file_name"])
if attached:
attached_to_doctype, attached_to_name, file_name = attached
ignore_permissions, comment = False, None
if attached_to_doctype and attached_to_name and not from_delete:
doc = frappe.get_doc(attached_to_doctype, attached_to_name)
ignore_permissions = doc.has_permission("write") or False
if frappe.flags.in_web_form:
ignore_permissions = True
if not file_name:
file_name = frappe.db.get_value("File", fid, "file_name")
comment = doc.add_comment("Attachment Removed", _("Removed {0}").format(file_name))
frappe.delete_doc("File", fid, ignore_permissions=ignore_permissions)
return comment
def delete_file_data_content(self, only_thumbnail=False):
method = get_hook_method('delete_file_data_content', fallback=self.delete_file_from_filesystem)
method(self.doc, only_thumbnail=only_thumbnail)
def delete_file_from_filesystem(self, only_thumbnail=False):
"""Delete file, thumbnail from File document"""
if only_thumbnail:
self.delete_file(self.doc.thumbnail_url)
else:
self.delete_file(self.doc.file_url)
self.delete_file(self.doc.thumbnail_url)
def delete_file(self, path):
"""Delete file from `public folder`"""
if path:
if ".." in path.split("/"):
frappe.msgprint(_("It is risky to delete this file: {0}. Please contact your System Manager.").format(path))
parts = os.path.split(path.strip("/"))
if parts[0]=="files":
path = frappe.utils.get_site_path("public", "files", parts[-1])
else:
path = frappe.utils.get_site_path("private", "files", parts[-1])
path = encode(path)
if os.path.exists(path):
os.remove(path)
def on_doctype_update():
frappe.db.add_index("File", ["attached_to_doctype", "attached_to_name"])
@ -622,97 +694,27 @@ def get_max_file_size():
return conf.get('max_file_size') or 10485760
def check_max_file_size(content):
max_file_size = get_max_file_size()
file_size = len(content)
if file_size > max_file_size:
frappe.msgprint(_("File size exceeded the maximum allowed size of {0} MB").format(
max_file_size / 1048576),
raise_exception=MaxFileSizeReachedError)
return file_size
def write_file(content, fname, is_private=0):
"""write file to disk with a random name (to compare)"""
file_path = get_files_path(is_private=is_private)
# create directory (if not exists)
frappe.create_folder(file_path)
# write the file
if isinstance(content, text_type):
content = content.encode()
with open(os.path.join(file_path.encode('utf-8'), fname.encode('utf-8')), 'wb+') as f:
f.write(content)
return get_files_path(fname, is_private=is_private)
def remove_all(dt, dn, from_delete=False):
"""remove all files in a transaction"""
try:
for fid in frappe.db.sql_list("""select name from `tabFile` where
attached_to_doctype=%s and attached_to_name=%s""", (dt, dn)):
remove_file(fid, dt, dn, from_delete)
_file = frappe.get_doc("File", {"fid": fid, "attached_to_doctype": dt, "attached_to_name": dn})
_file.remove_file(from_delete=from_delete)
except Exception as e:
if e.args[0]!=1054: raise # (temp till for patched)
def remove_file(fid, attached_to_doctype=None, attached_to_name=None, from_delete=False):
"""Remove file and File entry"""
file_name = None
if not (attached_to_doctype and attached_to_name):
attached = frappe.db.get_value("File", fid,
["attached_to_doctype", "attached_to_name", "file_name"])
if attached:
attached_to_doctype, attached_to_name, file_name = attached
ignore_permissions, comment = False, None
if attached_to_doctype and attached_to_name and not from_delete:
doc = frappe.get_doc(attached_to_doctype, attached_to_name)
ignore_permissions = doc.has_permission("write") or False
if frappe.flags.in_web_form:
ignore_permissions = True
if not file_name:
file_name = frappe.db.get_value("File", fid, "file_name")
comment = doc.add_comment("Attachment Removed", _("Removed {0}").format(file_name))
frappe.delete_doc("File", fid, ignore_permissions=ignore_permissions)
return comment
def delete_file_data_content(doc, only_thumbnail=False):
method = get_hook_method('delete_file_data_content', fallback=delete_file_from_filesystem)
method(doc, only_thumbnail=only_thumbnail)
def delete_file_from_filesystem(doc, only_thumbnail=False):
"""Delete file, thumbnail from File document"""
if only_thumbnail:
delete_file(doc.thumbnail_url)
def remove_file_by_url(file_url, doctype=None, name=None):
if doctype and name:
fid = frappe.db.get_value("File", {"file_url": file_url,
"attached_to_doctype": doctype, "attached_to_name": name})
else:
delete_file(doc.file_url)
delete_file(doc.thumbnail_url)
fid = frappe.db.get_value("File", {"file_url": file_url})
def delete_file(path):
"""Delete file from `public folder`"""
if path:
if ".." in path.split("/"):
frappe.msgprint(_("It is risky to delete this file: {0}. Please contact your System Manager.").format(path))
parts = os.path.split(path.strip("/"))
if parts[0]=="files":
path = frappe.utils.get_site_path("public", "files", parts[-1])
else:
path = frappe.utils.get_site_path("private", "files", parts[-1])
path = encode(path)
if os.path.exists(path):
os.remove(path)
if fid:
_file = frappe.get_doc("File", {"fid": fid})
return _file.remove_file()
def get_file(fname):
@ -788,7 +790,7 @@ def download_file(file_url):
token is required to download private files.
Method : GET
Endpoint : frappe.utils.file_manager.download_file
Endpoint : frappe.core.doctype.file.file.download_file
URL Params : file_name = /path/to/file relative to site path
"""
file_doc = frappe.get_doc("File", {"file_url":file_url})

View file

@ -101,7 +101,7 @@ class TestFile(unittest.TestCase):
self.assertRaises(frappe.ValidationError, folder.delete)
def test_file_upload_limit(self):
from frappe.utils.file_manager import MaxFileSizeReachedError
from frappe.core.doctype.file.file import MaxFileSizeReachedError
from frappe.limits import update_limits, clear_limit
from frappe import _dict

View file

@ -11,10 +11,10 @@ import frappe
from frappe.model.document import Document
from frappe.utils.background_jobs import enqueue
from frappe.desk.query_report import generate_report_result, get_columns_dict
from frappe.utils.file_manager import remove_all
from frappe.core.doctype.file.file import remove_all
from frappe.utils.csvutils import to_csv, read_csv_content_from_attached_file
from frappe.desk.form.load import get_attachments
from frappe.utils.file_manager import download_file
from frappe.core.doctype.file.file import download_file
class PreparedReport(Document):

View file

@ -13,9 +13,10 @@ from six import string_types
@frappe.whitelist()
def remove_attach():
"""remove attachment"""
import frappe.utils.file_manager
fid = frappe.form_dict.get('fid')
return frappe.utils.file_manager.remove_file(fid)
_file = frappe.get_doc("File", {"fid": fid})
return _file.remove_file()
@frappe.whitelist()
def validate_link():

View file

@ -8,7 +8,6 @@ test_records = frappe.get_test_records('Email Account')
from frappe.core.doctype.communication.email import make
from frappe.desk.form.load import get_attachments
from frappe.utils.file_manager import delete_file_from_filesystem
from frappe.email.doctype.email_account.email_account import notify_unreplied
from datetime import datetime, timedelta
@ -55,7 +54,8 @@ class TestEmailAccount(unittest.TestCase):
frappe.db.sql("delete from tabCommunication where sender='test_sender@example.com'")
existing_file = frappe.get_doc({'doctype': 'File', 'file_name': 'erpnext-conf-14.png'})
frappe.delete_doc("File", existing_file.name)
delete_file_from_filesystem(existing_file)
_file = frappe.get_doc("File", {"doc": existing_file})
_file.delete_file_from_filesystem()
with open(os.path.join(os.path.dirname(__file__), "test_mails", "incoming-2.raw"), "r") as testfile:
test_mails = [testfile.read()]
@ -73,7 +73,8 @@ class TestEmailAccount(unittest.TestCase):
# cleanup
existing_file = frappe.get_doc({'doctype': 'File', 'file_name': 'erpnext-conf-14.png'})
frappe.delete_doc("File", existing_file.name)
delete_file_from_filesystem(existing_file)
_file = frappe.get_doc("File", {"doc": existing_file})
def test_incoming_attached_email_from_outlook_plain_text_only(self):
frappe.db.sql("delete from tabCommunication where sender='test_sender@example.com'")

View file

@ -144,7 +144,7 @@ class EMail:
def attach_file(self, n):
"""attach a file from the `FileData` table"""
from frappe.utils.file_manager import get_file
from frappe.core.doctype.file.file import get_file
res = get_file(n)
if not res:
return

View file

@ -11,7 +11,7 @@ from frappe.email.email_body import get_email, get_formatted_html, add_attachmen
from frappe.utils.verified_command import get_signed_params, verify_request
from html2text import html2text
from frappe.utils import get_url, nowdate, encode, now_datetime, add_days, split_emails, cstr, cint
from frappe.utils.file_manager import get_file
from frappe.core.doctype.file.file import get_file
from rq.timeouts import JobTimeoutException
from frappe.utils.scheduler import log
from six import text_type, string_types

View file

@ -13,7 +13,7 @@ from frappe import _, safe_decode, safe_encode
from frappe.utils import (extract_email_id, convert_utc_to_user_timezone, now,
cint, cstr, strip, markdown, parse_addr)
from frappe.utils.scheduler import log
from frappe.utils.file_manager import get_random_filename, MaxFileSizeReachedError
from frappe.core.doctype.file.file import get_random_filename, MaxFileSizeReachedError
class EmailSizeExceededError(frappe.ValidationError): pass
class EmailTimeoutError(frappe.ValidationError): pass

View file

@ -6,7 +6,6 @@ import frappe
from frappe import _
import frappe.utils
import frappe.sessions
import frappe.utils.file_manager
import frappe.desk.form.run_method
from frappe.utils.response import build_response
from werkzeug.wrappers import Response

View file

@ -173,7 +173,7 @@ def clear_limit(key):
def validate_space_limit(file_size):
"""Stop from writing file if max space limit is reached"""
from frappe.utils.file_manager import MaxFileSizeReachedError
from frappe.core.doctype.file.file import MaxFileSizeReachedError
limits = get_limits()
if not limits.space:

View file

@ -7,7 +7,7 @@ import frappe
import frappe.model.meta
from frappe.model.dynamic_links import get_dynamic_link_map
import frappe.defaults
from frappe.utils.file_manager import remove_all
from frappe.core.doctype.file.file import remove_all
from frappe.utils.password import delete_all_passwords_for
from frappe import _
from frappe.model.naming import revert_series_if_last

View file

@ -6,7 +6,7 @@ from __future__ import unicode_literals, print_function
import frappe
import os
from frappe.utils import get_files_path
from frappe.utils.file_manager import get_content_hash, get_file
from frappe.core.doctype.file.file import get_content_hash, get_file
def execute():

View file

@ -5,7 +5,7 @@ from __future__ import unicode_literals, print_function
import frappe
import os
from frappe.utils.file_manager import get_content_hash, get_file, get_file_name
from frappe.core.doctype.file.file import get_content_hash, get_file, get_file_name
from frappe.utils import get_files_path, get_site_path
# The files missed by the previous patch might have been replaced with new files

View file

@ -317,7 +317,7 @@ frappe.upload = {
} else {
args.file_size = fileobj.size;
frappe.call({
method: 'frappe.utils.file_manager.validate_filename',
method: 'frappe.core.doctype.file.file.validate_filename',
args: {"filename": args.filename},
callback: function(r) {
args.filename = r.message;

View file

@ -6,7 +6,7 @@ import frappe
import os
import unittest
from frappe.utils.file_manager import get_file, get_files_path
from frappe.core.doctype.file.file import get_file, get_files_path
test_content1 = 'Hello'
test_content2 = 'Hello World'

View file

@ -30,7 +30,7 @@ def read_csv_content_from_attached_file(doc):
raise Exception
try:
from frappe.utils.file_manager import get_file
from frappe.core.doctype.file.file import get_file
fname, fcontent = get_file(fileid)
return read_csv_content(fcontent, frappe.form_dict.get('ignore_encoding_errors'))
except Exception:

View file

@ -70,7 +70,7 @@ def handle_html(data):
def read_xlsx_file_from_attached_file(file_id=None, fcontent=None, filepath=None):
if file_id:
from frappe.utils.file_manager import get_file_path
from frappe.core.doctype.file.file import get_file_path
filename = get_file_path(file_id)
elif fcontent:
from io import BytesIO

View file

@ -8,7 +8,8 @@ from frappe import _, scrub
from frappe.utils import cstr
from frappe.website.utils import get_comment_list
from frappe.custom.doctype.customize_form.customize_form import docfield_properties
from frappe.utils.file_manager import get_max_file_size
from frappe.core.doctype.file.file import get_max_file_size
from frappe.core.doctype.file.file import remove_file_by_url
from frappe.modules.utils import export_module_json, get_doc_module
from six.moves.urllib.parse import urlencode
from frappe.integrations.utils import get_payment_gateway_controller
@ -415,8 +416,7 @@ def accept(web_form, data, for_payment=False):
# remove earlier attached file (if exists)
if doc.get(fieldname):
file = frappe.get_doc("File", {"attached_to_doctype": doc.doctype, "file_url": doc.get(fieldname), "attached_to_name": doc.name})
file.remove_file_by_url()
remove_file_by_url(doc.get(fieldname), doctype=doc.doctype, name=doc.name)
# save new file
filename, dataurl = filedata.split(',', 1)
@ -432,8 +432,8 @@ def accept(web_form, data, for_payment=False):
if files_to_delete:
for f in files_to_delete:
if f:
file = frappe.get_doc("File", {"attached_to_doctype": doc.doctype, "file_url": doc.get(fieldname), "attached_to_name": doc.name})
file.remove_file_by_url()
remove_file_by_url(doc.get(fieldname), doctype=doc.doctype, name=doc.name)
frappe.flags.web_form_doc = doc