The license.txt file has been replaced with LICENSE for quite a while now. INAL but it didn't seem accurate to say "hey, checkout license.txt although there's no such file". Apart from this, there were inconsistencies in the headers altogether...this change brings consistency.
198 lines
5.6 KiB
Python
198 lines
5.6 KiB
Python
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
|
|
# License: MIT. See LICENSE
|
|
import frappe
|
|
from frappe import msgprint, _
|
|
import json
|
|
import csv
|
|
import requests
|
|
from io import StringIO
|
|
from frappe.utils import cstr, cint, flt, comma_or
|
|
|
|
def read_csv_content_from_attached_file(doc):
|
|
fileid = frappe.get_all("File", fields = ["name"], filters = {"attached_to_doctype": doc.doctype,
|
|
"attached_to_name":doc.name}, order_by="creation desc")
|
|
|
|
if fileid : fileid = fileid[0].name
|
|
|
|
if not fileid:
|
|
msgprint(_("File not attached"))
|
|
raise Exception
|
|
|
|
try:
|
|
_file = frappe.get_doc("File", fileid)
|
|
fcontent = _file.get_content()
|
|
return read_csv_content(fcontent, frappe.form_dict.get('ignore_encoding_errors'))
|
|
except Exception:
|
|
frappe.throw(_("Unable to open attached file. Did you export it as CSV?"), title=_('Invalid CSV Format'))
|
|
|
|
def read_csv_content(fcontent, ignore_encoding=False):
|
|
rows = []
|
|
|
|
if not isinstance(fcontent, str):
|
|
decoded = False
|
|
for encoding in ["utf-8", "windows-1250", "windows-1252"]:
|
|
try:
|
|
fcontent = str(fcontent, encoding)
|
|
decoded = True
|
|
break
|
|
except UnicodeDecodeError:
|
|
continue
|
|
|
|
if not decoded:
|
|
frappe.msgprint(_("Unknown file encoding. Tried utf-8, windows-1250, windows-1252."), raise_exception=True)
|
|
|
|
fcontent = fcontent.encode("utf-8")
|
|
content = [ ]
|
|
for line in fcontent.splitlines(True):
|
|
content.append(frappe.safe_decode(line))
|
|
|
|
try:
|
|
rows = []
|
|
for row in csv.reader(content):
|
|
r = []
|
|
for val in row:
|
|
# decode everything
|
|
val = val.strip()
|
|
|
|
if val=="":
|
|
# reason: in maraidb strict config, one cannot have blank strings for non string datatypes
|
|
r.append(None)
|
|
else:
|
|
r.append(val)
|
|
|
|
rows.append(r)
|
|
|
|
return rows
|
|
|
|
except Exception:
|
|
frappe.msgprint(_("Not a valid Comma Separated Value (CSV File)"))
|
|
raise
|
|
|
|
@frappe.whitelist()
|
|
def send_csv_to_client(args):
|
|
if isinstance(args, str):
|
|
args = json.loads(args)
|
|
|
|
args = frappe._dict(args)
|
|
|
|
frappe.response["result"] = cstr(to_csv(args.data))
|
|
frappe.response["doctype"] = args.filename
|
|
frappe.response["type"] = "csv"
|
|
|
|
def to_csv(data):
|
|
writer = UnicodeWriter()
|
|
for row in data:
|
|
writer.writerow(row)
|
|
|
|
return writer.getvalue()
|
|
|
|
def build_csv_response(data, filename):
|
|
frappe.response["result"] = cstr(to_csv(data))
|
|
frappe.response["doctype"] = filename
|
|
frappe.response["type"] = "csv"
|
|
|
|
class UnicodeWriter:
|
|
def __init__(self, encoding="utf-8", quoting=csv.QUOTE_NONNUMERIC):
|
|
self.encoding = encoding
|
|
self.queue = StringIO()
|
|
self.writer = csv.writer(self.queue, quoting=quoting)
|
|
|
|
def writerow(self, row):
|
|
self.writer.writerow(row)
|
|
|
|
def getvalue(self):
|
|
return self.queue.getvalue()
|
|
|
|
def check_record(d):
|
|
"""check for mandatory, select options, dates. these should ideally be in doclist"""
|
|
from frappe.utils.dateutils import parse_date
|
|
doc = frappe.get_doc(d)
|
|
|
|
for key in d:
|
|
docfield = doc.meta.get_field(key)
|
|
val = d[key]
|
|
if docfield:
|
|
if docfield.reqd and (val=='' or val==None):
|
|
frappe.msgprint(_("{0} is required").format(docfield.label), raise_exception=1)
|
|
|
|
if docfield.fieldtype=='Select' and val and docfield.options:
|
|
if val not in docfield.options.split('\n'):
|
|
frappe.throw(_("{0} must be one of {1}").format(_(docfield.label), comma_or(docfield.options.split("\n"))))
|
|
|
|
if val and docfield.fieldtype=='Date':
|
|
d[key] = parse_date(val)
|
|
elif val and docfield.fieldtype in ["Int", "Check"]:
|
|
d[key] = cint(val)
|
|
elif val and docfield.fieldtype in ["Currency", "Float", "Percent"]:
|
|
d[key] = flt(val)
|
|
|
|
def import_doc(d, doctype, overwrite, row_idx, submit=False, ignore_links=False):
|
|
"""import main (non child) document"""
|
|
if d.get("name") and frappe.db.exists(doctype, d['name']):
|
|
if overwrite:
|
|
doc = frappe.get_doc(doctype, d['name'])
|
|
doc.flags.ignore_links = ignore_links
|
|
doc.update(d)
|
|
if d.get("docstatus") == 1:
|
|
doc.update_after_submit()
|
|
elif d.get("docstatus") == 0 and submit:
|
|
doc.submit()
|
|
else:
|
|
doc.save()
|
|
return 'Updated row (#%d) %s' % (row_idx + 1, getlink(doctype, d['name']))
|
|
else:
|
|
return 'Ignored row (#%d) %s (exists)' % (row_idx + 1,
|
|
getlink(doctype, d['name']))
|
|
else:
|
|
doc = frappe.get_doc(d)
|
|
doc.flags.ignore_links = ignore_links
|
|
doc.insert()
|
|
|
|
if submit:
|
|
doc.submit()
|
|
|
|
return 'Inserted row (#%d) %s' % (row_idx + 1, getlink(doctype,
|
|
doc.get('name')))
|
|
|
|
def getlink(doctype, name):
|
|
return '<a href="/app/Form/%(doctype)s/%(name)s">%(name)s</a>' % locals()
|
|
|
|
def get_csv_content_from_google_sheets(url):
|
|
# https://docs.google.com/spreadsheets/d/{sheetid}}/edit#gid={gid}
|
|
validate_google_sheets_url(url)
|
|
# get gid, defaults to first sheet
|
|
if "gid=" in url:
|
|
gid = url.rsplit('gid=', 1)[1]
|
|
else:
|
|
gid = 0
|
|
# remove /edit path
|
|
url = url.rsplit('/edit', 1)[0]
|
|
# add /export path,
|
|
url = url + '/export?format=csv&gid={0}'.format(gid)
|
|
|
|
headers = {
|
|
'Accept': 'text/csv'
|
|
}
|
|
response = requests.get(url, headers=headers)
|
|
|
|
if response.ok:
|
|
# if it returns html, it couldn't find the CSV content
|
|
# because of invalid url or no access
|
|
if response.text.strip().endswith('</html>'):
|
|
frappe.throw(
|
|
_('Google Sheets URL is invalid or not publicly accessible.'),
|
|
title=_("Invalid URL")
|
|
)
|
|
return response.content
|
|
elif response.status_code == 400:
|
|
frappe.throw(_('Google Sheets URL must end with "gid={number}". Copy and paste the URL from the browser address bar and try again.'),
|
|
title=_("Incorrect URL"))
|
|
else:
|
|
response.raise_for_status()
|
|
|
|
def validate_google_sheets_url(url):
|
|
if "docs.google.com/spreadsheets" not in url:
|
|
frappe.throw(
|
|
_('"{0}" is not a valid Google Sheets URL').format(url),
|
|
title=_("Invalid URL"),
|
|
)
|