fix: add utm utils back

This commit is contained in:
sokumon 2025-06-24 18:43:28 +05:30
parent ec79747fa7
commit 2b8d21d483
2 changed files with 139 additions and 0 deletions

View file

@ -35,6 +35,7 @@ from frappe.utils import (
get_sites,
get_url,
money_in_words,
parse_and_map_trackers_from_url,
parse_timedelta,
random_string,
remove_blanks,
@ -51,6 +52,7 @@ from frappe.utils.change_log import (
)
from frappe.utils.data import (
add_to_date,
add_trackers_to_url,
add_years,
cast,
cint,
@ -67,6 +69,7 @@ from frappe.utils.data import (
get_year_ending,
getdate,
is_invalid_date_string,
map_trackers,
now_datetime,
nowtime,
pretty_date,
@ -1383,3 +1386,55 @@ class TestCrypto(IntegrationTestCase):
sha256_hash(b"The quick brown fox jumps over the lazy dog"),
"d7a8fbb307d7809469ca9abcb0082e4f8d5651e46d3cdb762d02d0bf37c9e592",
)
class TestURLTrackers(IntegrationTestCase):
def test_add_trackers_to_url(self):
url = "https://example.com"
source = "test_source"
campaign = "test_campaign"
medium = "test_medium"
content = "test_content"
with patch("frappe.db.get_value") as mock_get_value:
mock_get_value.side_effect = lambda *args: args[1] # Return unslugged input value
result = add_trackers_to_url(url, source, campaign, medium, content)
expected = "https://example.com?utm_source=test_source&utm_medium=test_medium&utm_campaign=test_campaign&utm_content=test_content"
self.assertEqual(result, expected)
def test_parse_and_map_trackers_from_url(self):
url = "https://example.com?utm_source=test_source&utm_medium=test_medium&utm_campaign=test_campaign&utm_content=test_content"
with patch("frappe.db.get_value") as mock_get_value:
mock_get_value.return_value = None # Simulate no existing records
result = parse_and_map_trackers_from_url(url)
expected = {
"utm_source": "test_source",
"utm_medium": "test_medium",
"utm_campaign": "test_campaign",
"utm_content": "test_content",
}
self.assertEqual(result, expected)
def test_map_trackers(self):
url_trackers = {
"utm_source": "test_source",
"utm_medium": "test_medium",
"utm_campaign": "test_campaign",
"utm_content": "test_content",
}
result = map_trackers(url_trackers, create=True)
expected = {
"utm_source": frappe.get_doc("UTM Source", "test_source"),
"utm_medium": frappe.get_doc("UTM Medium", "test_medium"),
"utm_campaign": frappe.get_doc("UTM Campaign", "test_campaign"),
"utm_content": "test_content",
}
self.assertDocumentEqual(result["utm_source"], expected["utm_source"])
self.assertDocumentEqual(result["utm_medium"], expected["utm_medium"])
self.assertDocumentEqual(result["utm_campaign"], expected["utm_campaign"])
self.assertEqual(result["utm_content"], expected["utm_content"])

View file

@ -2639,3 +2639,87 @@ def _get_rss_memory_usage():
rss = psutil.Process().memory_info().rss // (1024 * 1024)
return rss
def add_trackers_to_url(
url: str,
source: str,
campaign: str | None = None,
medium: str | None = None,
content: str | None = None,
) -> str:
def get_utm_values():
_source = frappe.db.get_value("UTM Source", source, "slug") or source
_campaign = frappe.db.get_value("UTM Campaign", campaign, "slug") or campaign
_medium = frappe.db.get_value("UTM Medium", medium, "slug") or medium
return _source, _medium, _campaign
url_parts = list(urlparse(url))
if url_parts[0] == "mailto":
return url
source, medium, campaign = frappe.cache.get_value(
"utm_" + cstr(source) + "_" + cstr(medium) + "_" + cstr(campaign),
get_utm_values,
shared=True,
)
trackers = {"utm_source": source}
if medium:
trackers["utm_medium"] = medium
if campaign:
trackers["utm_campaign"] = campaign
if content:
trackers["utm_content"] = content
query = dict(parse_qsl(url_parts[4])) | trackers
url_parts[4] = urlencode(query)
return urlunparse(url_parts)
def parse_and_map_trackers_from_url(url: str, create: bool = False) -> dict:
query = urlparse(url).query
url_trackers = dict(parse_qsl(query))
return map_trackers(url_trackers, create)
def map_trackers(url_trackers: dict, create: bool = False):
frappe_trackers = {}
if url_source := url_trackers.get("utm_source", url_trackers.get("source")):
source = frappe.db.get_value("UTM Source", {"slug": slug(url_source)}, "name") or url_source
if create and source == url_source and not frappe.db.exists("UTM Source", source):
source = frappe.new_doc("UTM Source")
source.name = url_source
source.description = f"Autogenerated from {url_trackers}"
source.save(ignore_permissions=True)
frappe_trackers["utm_source"] = source
if url_medium := url_trackers.get("utm_medium", url_trackers.get("medium")):
medium = frappe.db.get_value("UTM Medium", {"slug": slug(url_medium)}, "name") or url_medium
if create and medium == url_medium and not frappe.db.exists("UTM Medium", medium):
medium = frappe.new_doc("UTM Medium")
medium.name = url_medium
medium.description = f"Autogenerated from {url_trackers}"
medium.save(ignore_permissions=True)
frappe_trackers["utm_medium"] = medium
if url_campaign := url_trackers.get("utm_campaign", url_trackers.get("campaign")):
campaign = frappe.db.get_value("UTM Campaign", {"slug": slug(url_campaign)}, "name") or url_campaign
if create and campaign == url_campaign and not frappe.db.exists("UTM Campaign", campaign):
campaign = frappe.new_doc("UTM Campaign")
campaign.name = url_campaign
campaign.campaign_description = f"Autogenerated from {url_trackers}"
campaign.save(ignore_permissions=True)
frappe_trackers["utm_campaign"] = campaign
if url_content := url_trackers.get("utm_content", url_trackers.get("content")):
frappe_trackers["utm_content"] = url_content
return frappe_trackers