From c220c6063db078bcb3c8d47d4473b6e821b240e4 Mon Sep 17 00:00:00 2001 From: Gavin D'souza Date: Fri, 12 Feb 2021 01:21:28 +0530 Subject: [PATCH] fix: Standardize anonymization Data * Set anonymization matrix for instance of Personal Data Deletion Request * During the initialization of the object, privacy docs are segregated into full match and partial data redaction types * Context for execution of _anonymize_data is set via __set_anonymization_data by default unless set_data is False --- .../personal_data_deletion_request.py | 89 ++++++++++++------- 1 file changed, 58 insertions(+), 31 deletions(-) diff --git a/frappe/website/doctype/personal_data_deletion_request/personal_data_deletion_request.py b/frappe/website/doctype/personal_data_deletion_request/personal_data_deletion_request.py index 7bbd535bfb..e36f18943f 100644 --- a/frappe/website/doctype/personal_data_deletion_request/personal_data_deletion_request.py +++ b/frappe/website/doctype/personal_data_deletion_request/personal_data_deletion_request.py @@ -11,9 +11,18 @@ from frappe.utils import get_fullname from frappe.utils.user import get_system_managers from frappe.utils.verified_command import get_signed_params, verify_request + class PersonalDataDeletionRequest(Document): - def validate(self): - validate_email_address(self.email, throw=True) + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.user_data_fields = frappe.get_hooks("user_data_fields") + self.full_match_privacy_docs = [ + x for x in self.user_data_fields if x.get("redact_fields") + ] + self.partial_privacy_docs = [ + x for x in self.user_data_fields if x.get("partial") or not x.get("redact_fields") + ] def after_insert(self): self.send_verification_mail() @@ -69,12 +78,21 @@ class PersonalDataDeletionRequest(Document): if self.status != "Pending Approval": frappe.throw(_("This request has not yet been approved by the user.")) - privacy_docs = frappe.get_hooks("user_privacy_documents") + def __set_anonymization_data(self, email, anon): + self.anon = anon or self.name + self.full_name = get_fullname(email) + self.email_regex = get_pattern(email) + self.full_name_regex = get_pattern(self.full_name) + self.is_full_name_set = email != self.full_name - anonymize_value_map = { - 'Date': '1111-01-01', - 'Int': 0, - 'Code': 'http://xxxxx' + self.anonymization_value_map = { + "Code": "REDACTED: Removed due to Personal Data Deletion Request", + "Data": "REDACTED", + "Date": "1111-01-01", + "Email": self.anon, + "Int": 0, + "Phone": "+91 0000000000", + "Name": "REDACTED", } def trigger_data_deletion(self): @@ -93,35 +111,40 @@ class PersonalDataDeletionRequest(Document): now=frappe.flags.in_test, ) - for ref_doc in privacy_docs: - meta = frappe.get_meta(ref_doc['doctype']) - personal_fields = ref_doc.get('personal_fields', []) + def _anonymize_data(self, email=None, anon=None, set_data=True): + email = email or self.email + anon = anon or self.name - if ref_doc.get('applies_to_website_user') and 'Guest' not in frappe.get_roles(self.email): + if set_data: + self.__set_anonymization_data(email, anon) + + for doctype in self.full_match_privacy_docs: continue - anonymize_fields = '' - for field in personal_fields: - field_details = meta.get_field(field) - field_value = anonymize_value_map.get(field_details.fieldtype, str(field)) if not field_details.unique else self.name.split("@")[0] - anonymize_fields += ', `{0}`= \'{1}\''.format(field, field_value) + def generate_anonymization_dict(self, ref): + anonymize_fields_dict = {} + meta = frappe.get_meta(ref["doctype"]) - docs = frappe.get_all(ref_doc['doctype'], {ref_doc['match_field']:('like', '%'+self.email+'%')}, ['name', ref_doc['match_field']]) - for d in docs: - if not re.search(regex, d[ref_doc['match_field']]): - continue + for field in ref.get("redact_fields", []): + field_details = meta.get_field(field) + + if not field_details: + print(f"Incorrect personal_field {field} defined in hooks") + continue + + field_value = ( + self.anon + if field_details.unique + else ( + self.anonymization_value_map.get(field_details.options) + or self.anonymization_value_map.get(field_details.fieldtype) + or field + ) + ) + anonymize_fields_dict[field] = field_value + + return anonymize_fields_dict - anonymize_match_value = ', '.join(map(lambda x: self.name if re.search(regex, x) else x, d[ref_doc['match_field']].split())) - frappe.db.sql("""UPDATE `tab{0}` - SET `{1}` = '{2}' {3} - WHERE `name` = '{4}' """.format( #nosec - ref_doc['doctype'], - ref_doc['match_field'], - anonymize_match_value, - anonymize_fields, - d['name'] - )) - self.db_set('status', 'Deleted') def remove_unverified_record(): @@ -160,3 +183,7 @@ def confirm_deletion(email, name, host_name): _("This link has already been activated for verification."), indicator_color="red", ) + + +def get_pattern(full_match): + return re.compile(r"(?