# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors # MIT License. See license.txt from __future__ import unicode_literals import frappe from frappe import msgprint, _ import json import csv import six import requests from six import StringIO, text_type, string_types from frappe.utils import encode, cstr, cint, flt, comma_or def read_csv_content_from_uploaded_file(ignore_encoding=False): if getattr(frappe, "uploaded_file", None): with open(frappe.uploaded_file, "r") as upfile: fcontent = upfile.read() else: _file = frappe.new_doc("File") fcontent = _file.get_uploaded_content() return read_csv_content(fcontent, ignore_encoding) def read_csv_content_from_attached_file(doc): fileid = frappe.get_all("File", fields = ["name"], filters = {"attached_to_doctype": doc.doctype, "attached_to_name":doc.name}, order_by="creation desc") if fileid : fileid = fileid[0].name if not fileid: msgprint(_("File not attached")) raise Exception try: _file = frappe.get_doc("File", fileid) fcontent = _file.get_content() return read_csv_content(fcontent, frappe.form_dict.get('ignore_encoding_errors')) except Exception: frappe.throw(_("Unable to open attached file. Did you export it as CSV?"), title=_('Invalid CSV Format')) def read_csv_content(fcontent, ignore_encoding=False): rows = [] if not isinstance(fcontent, text_type): decoded = False for encoding in ["utf-8", "windows-1250", "windows-1252"]: try: fcontent = text_type(fcontent, encoding) decoded = True break except UnicodeDecodeError: continue if not decoded: frappe.msgprint(_("Unknown file encoding. Tried utf-8, windows-1250, windows-1252."), raise_exception=True) fcontent = fcontent.encode("utf-8") content = [ ] for line in fcontent.splitlines(True): if six.PY2: content.append(line) else: content.append(frappe.safe_decode(line)) try: rows = [] for row in csv.reader(content): r = [] for val in row: # decode everything val = val.strip() if val=="": # reason: in maraidb strict config, one cannot have blank strings for non string datatypes r.append(None) else: r.append(val) rows.append(r) return rows except Exception: frappe.msgprint(_("Not a valid Comma Separated Value (CSV File)")) raise @frappe.whitelist() def send_csv_to_client(args): if isinstance(args, string_types): args = json.loads(args) args = frappe._dict(args) frappe.response["result"] = cstr(to_csv(args.data)) frappe.response["doctype"] = args.filename frappe.response["type"] = "csv" def to_csv(data): writer = UnicodeWriter() for row in data: writer.writerow(row) return writer.getvalue() def build_csv_response(data, filename): frappe.response["result"] = cstr(to_csv(data)) frappe.response["doctype"] = filename frappe.response["type"] = "csv" class UnicodeWriter: def __init__(self, encoding="utf-8"): self.encoding = encoding self.queue = StringIO() self.writer = csv.writer(self.queue, quoting=csv.QUOTE_NONNUMERIC) def writerow(self, row): if six.PY2: row = encode(row, self.encoding) self.writer.writerow(row) def getvalue(self): return self.queue.getvalue() def check_record(d): """check for mandatory, select options, dates. these should ideally be in doclist""" from frappe.utils.dateutils import parse_date doc = frappe.get_doc(d) for key in d: docfield = doc.meta.get_field(key) val = d[key] if docfield: if docfield.reqd and (val=='' or val==None): frappe.msgprint(_("{0} is required").format(docfield.label), raise_exception=1) if docfield.fieldtype=='Select' and val and docfield.options: if val not in docfield.options.split('\n'): frappe.throw(_("{0} must be one of {1}").format(_(docfield.label), comma_or(docfield.options.split("\n")))) if val and docfield.fieldtype=='Date': d[key] = parse_date(val) elif val and docfield.fieldtype in ["Int", "Check"]: d[key] = cint(val) elif val and docfield.fieldtype in ["Currency", "Float", "Percent"]: d[key] = flt(val) def import_doc(d, doctype, overwrite, row_idx, submit=False, ignore_links=False): """import main (non child) document""" if d.get("name") and frappe.db.exists(doctype, d['name']): if overwrite: doc = frappe.get_doc(doctype, d['name']) doc.flags.ignore_links = ignore_links doc.update(d) if d.get("docstatus") == 1: doc.update_after_submit() elif d.get("docstatus") == 0 and submit: doc.submit() else: doc.save() return 'Updated row (#%d) %s' % (row_idx + 1, getlink(doctype, d['name'])) else: return 'Ignored row (#%d) %s (exists)' % (row_idx + 1, getlink(doctype, d['name'])) else: doc = frappe.get_doc(d) doc.flags.ignore_links = ignore_links doc.insert() if submit: doc.submit() return 'Inserted row (#%d) %s' % (row_idx + 1, getlink(doctype, doc.get('name'))) def getlink(doctype, name): return '%(name)s' % locals() def get_csv_content_from_google_sheets(url): # https://docs.google.com/spreadsheets/d/{sheetid}}/edit#gid={gid} validate_google_sheets_url(url) # get gid, defaults to first sheet if "gid=" in url: gid = url.rsplit('gid=', 1)[1] else: gid = 0 # remove /edit path url = url.rsplit('/edit', 1)[0] # add /export path, url = url + '/export?format=csv&gid={0}'.format(gid) headers = { 'Accept': 'text/csv' } response = requests.get(url, headers=headers) if response.ok: # if it returns html, it couldn't find the CSV content # because of invalid url or no access if response.text.strip().endswith(''): frappe.throw( _('Google Sheets URL is invalid or not publicly accessible.'), title=_("Invalid URL") ) return response.content elif response.status_code == 400: frappe.throw(_('Google Sheets URL must end with "gid={number}". Copy and paste the URL from the browser address bar and try again.'), title=_("Incorrect URL")) else: response.raise_for_status() def validate_google_sheets_url(url): if "docs.google.com/spreadsheets" not in url: frappe.throw( _('"{0}" is not a valid Google Sheets URL').format(url), title=_("Invalid URL"), )