diff --git a/frappe/tests/test_utils.py b/frappe/tests/test_utils.py index eda1dfa012..1a80e92a5c 100644 --- a/frappe/tests/test_utils.py +++ b/frappe/tests/test_utils.py @@ -35,6 +35,7 @@ from frappe.utils import ( get_sites, get_url, money_in_words, + parse_and_map_trackers_from_url, parse_timedelta, random_string, remove_blanks, @@ -51,6 +52,7 @@ from frappe.utils.change_log import ( ) from frappe.utils.data import ( add_to_date, + add_trackers_to_url, add_years, cast, cint, @@ -67,6 +69,7 @@ from frappe.utils.data import ( get_year_ending, getdate, is_invalid_date_string, + map_trackers, now_datetime, nowtime, pretty_date, @@ -1383,3 +1386,55 @@ class TestCrypto(IntegrationTestCase): sha256_hash(b"The quick brown fox jumps over the lazy dog"), "d7a8fbb307d7809469ca9abcb0082e4f8d5651e46d3cdb762d02d0bf37c9e592", ) + + +class TestURLTrackers(IntegrationTestCase): + def test_add_trackers_to_url(self): + url = "https://example.com" + source = "test_source" + campaign = "test_campaign" + medium = "test_medium" + content = "test_content" + + with patch("frappe.db.get_value") as mock_get_value: + mock_get_value.side_effect = lambda *args: args[1] # Return unslugged input value + result = add_trackers_to_url(url, source, campaign, medium, content) + + expected = "https://example.com?utm_source=test_source&utm_medium=test_medium&utm_campaign=test_campaign&utm_content=test_content" + self.assertEqual(result, expected) + + def test_parse_and_map_trackers_from_url(self): + url = "https://example.com?utm_source=test_source&utm_medium=test_medium&utm_campaign=test_campaign&utm_content=test_content" + + with patch("frappe.db.get_value") as mock_get_value: + mock_get_value.return_value = None # Simulate no existing records + result = parse_and_map_trackers_from_url(url) + + expected = { + "utm_source": "test_source", + "utm_medium": "test_medium", + "utm_campaign": "test_campaign", + "utm_content": "test_content", + } + self.assertEqual(result, expected) + + def test_map_trackers(self): + url_trackers = { + "utm_source": "test_source", + "utm_medium": "test_medium", + "utm_campaign": "test_campaign", + "utm_content": "test_content", + } + + result = map_trackers(url_trackers, create=True) + + expected = { + "utm_source": frappe.get_doc("UTM Source", "test_source"), + "utm_medium": frappe.get_doc("UTM Medium", "test_medium"), + "utm_campaign": frappe.get_doc("UTM Campaign", "test_campaign"), + "utm_content": "test_content", + } + self.assertDocumentEqual(result["utm_source"], expected["utm_source"]) + self.assertDocumentEqual(result["utm_medium"], expected["utm_medium"]) + self.assertDocumentEqual(result["utm_campaign"], expected["utm_campaign"]) + self.assertEqual(result["utm_content"], expected["utm_content"]) diff --git a/frappe/utils/data.py b/frappe/utils/data.py index 9f775fad1d..d70920ee3a 100644 --- a/frappe/utils/data.py +++ b/frappe/utils/data.py @@ -2639,3 +2639,87 @@ def _get_rss_memory_usage(): rss = psutil.Process().memory_info().rss // (1024 * 1024) return rss + + +def add_trackers_to_url( + url: str, + source: str, + campaign: str | None = None, + medium: str | None = None, + content: str | None = None, +) -> str: + def get_utm_values(): + _source = frappe.db.get_value("UTM Source", source, "slug") or source + _campaign = frappe.db.get_value("UTM Campaign", campaign, "slug") or campaign + _medium = frappe.db.get_value("UTM Medium", medium, "slug") or medium + return _source, _medium, _campaign + + url_parts = list(urlparse(url)) + if url_parts[0] == "mailto": + return url + + source, medium, campaign = frappe.cache.get_value( + "utm_" + cstr(source) + "_" + cstr(medium) + "_" + cstr(campaign), + get_utm_values, + shared=True, + ) + + trackers = {"utm_source": source} + + if medium: + trackers["utm_medium"] = medium + + if campaign: + trackers["utm_campaign"] = campaign + + if content: + trackers["utm_content"] = content + + query = dict(parse_qsl(url_parts[4])) | trackers + + url_parts[4] = urlencode(query) + return urlunparse(url_parts) + + +def parse_and_map_trackers_from_url(url: str, create: bool = False) -> dict: + query = urlparse(url).query + + url_trackers = dict(parse_qsl(query)) + + return map_trackers(url_trackers, create) + + +def map_trackers(url_trackers: dict, create: bool = False): + frappe_trackers = {} + + if url_source := url_trackers.get("utm_source", url_trackers.get("source")): + source = frappe.db.get_value("UTM Source", {"slug": slug(url_source)}, "name") or url_source + if create and source == url_source and not frappe.db.exists("UTM Source", source): + source = frappe.new_doc("UTM Source") + source.name = url_source + source.description = f"Autogenerated from {url_trackers}" + source.save(ignore_permissions=True) + frappe_trackers["utm_source"] = source + + if url_medium := url_trackers.get("utm_medium", url_trackers.get("medium")): + medium = frappe.db.get_value("UTM Medium", {"slug": slug(url_medium)}, "name") or url_medium + if create and medium == url_medium and not frappe.db.exists("UTM Medium", medium): + medium = frappe.new_doc("UTM Medium") + medium.name = url_medium + medium.description = f"Autogenerated from {url_trackers}" + medium.save(ignore_permissions=True) + frappe_trackers["utm_medium"] = medium + + if url_campaign := url_trackers.get("utm_campaign", url_trackers.get("campaign")): + campaign = frappe.db.get_value("UTM Campaign", {"slug": slug(url_campaign)}, "name") or url_campaign + if create and campaign == url_campaign and not frappe.db.exists("UTM Campaign", campaign): + campaign = frappe.new_doc("UTM Campaign") + campaign.name = url_campaign + campaign.campaign_description = f"Autogenerated from {url_trackers}" + campaign.save(ignore_permissions=True) + frappe_trackers["utm_campaign"] = campaign + + if url_content := url_trackers.get("utm_content", url_trackers.get("content")): + frappe_trackers["utm_content"] = url_content + + return frappe_trackers