feat: UnixTimestamp QB function

This commit is contained in:
Ankush Menat 2023-01-09 11:04:33 +05:30
parent 45c80669bc
commit 075a2d778e
4 changed files with 124 additions and 25 deletions

View file

@ -1,6 +1,8 @@
from datetime import datetime
import frappe
from frappe.query_builder import Interval, Order
from frappe.query_builder.functions import Date, Sum, UnixTimestamp
from frappe.utils import getdate
@ -11,32 +13,18 @@ def get_energy_points_heatmap_data(user, date):
except Exception:
date = getdate()
if frappe.db.db_type == "mariadb":
timestamp_field = "unix_timestamp(date(creation))"
subdate_field_year = f"subdate('{date}', interval 1 year)"
subdate_field_minus_year = f"subdate('{date}', interval -1 year)"
else:
timestamp_field = "extract(epoch from date(creation))"
subdate_field_year = f"date('{date}') - INTERVAL '1' YEAR"
subdate_field_minus_year = f"date('{date}') - INTERVAL '-1' YEAR"
eps_log = frappe.qb.DocType("Energy Point Log")
return dict(
frappe.db.sql(
"""select {timestamp_field}, sum(points)
from `tabEnergy Point Log`
where
date(creation) > {subdate_field_year} and
date(creation) < {subdate_field_minus_year} and
user = %s and
type != 'Review'
group by date(creation)
order by creation asc""".format(
timestamp_field=timestamp_field,
subdate_field_year=subdate_field_year,
subdate_field_minus_year=subdate_field_minus_year,
),
user,
)
frappe.qb.from_(eps_log)
.select(UnixTimestamp(Date(eps_log.creation)), Sum(eps_log.points))
.where(eps_log.user == user)
.where(eps_log["type"] != "Review")
.where(Date(eps_log.creation) > Date(date) - Interval(years=1))
.where(Date(eps_log.creation) < Date(date) + Interval(years=1))
.groupby(Date(eps_log.creation))
.orderby(Date(eps_log.creation), order=Order.asc)
.run()
)

View file

@ -74,6 +74,22 @@ DateFormat = ImportMapper(
)
class _PostgresUnixTimestamp(Extract):
# Note: this is just a special case of "Extract" function with "epoch" hardcoded.
# Check super definition to see how it works.
def __init__(self, field, alias=None):
super().__init__("epoch", field=field, alias=alias)
self.field = field
UnixTimestamp = ImportMapper(
{
db_type_is.MARIADB: CustomFunction("unix_timestamp", ["date"]),
db_type_is.POSTGRES: _PostgresUnixTimestamp,
}
)
class Cast_(Function):
def __init__(self, value, as_type, alias=None):
if frappe.db.db_type == "mariadb" and (

View file

@ -2,6 +2,7 @@
# License: MIT. See LICENSE
import frappe
from frappe.desk.form.assign_to import add as assign_to
from frappe.desk.page.user_profile.user_profile import get_energy_points_heatmap_data
from frappe.tests.utils import FrappeTestCase
from frappe.utils.testutils import add_custom_field, clear_custom_fields
@ -234,6 +235,10 @@ class TestEnergyPointLog(FrappeTestCase):
self.assertEqual(test2_user_after_points, test2_user_before_points + rule.points)
def test_eps_heatmap_query(self):
# Just asserts that query works, not correctness.
self.assertIsInstance(get_energy_points_heatmap_data(user="test@example.com", date=None), dict)
def test_points_on_field_value_change(self):
rule = create_energy_point_rule_for_todo(
for_doc_event="Value Change", field_to_check="description"

View file

@ -6,7 +6,15 @@ import frappe
from frappe.query_builder import Case
from frappe.query_builder.builder import Function
from frappe.query_builder.custom import ConstantColumn
from frappe.query_builder.functions import Cast_, Coalesce, CombineDatetime, GroupConcat, Match
from frappe.query_builder.functions import (
Cast_,
Coalesce,
CombineDatetime,
Date,
GroupConcat,
Match,
UnixTimestamp,
)
from frappe.query_builder.utils import db_type_is
from frappe.tests.utils import FrappeTestCase
@ -75,6 +83,47 @@ class TestCustomFunctionsMariaDB(FrappeTestCase):
str(select_query).lower(),
)
def test_unix_ts_mariadb(self):
# Simple Query
note = frappe.qb.DocType("Note")
self.assertEqual(
"unix_timestamp(posting_date)",
UnixTimestamp(note.posting_date).get_sql(),
)
# Complex multi table query
todo = frappe.qb.DocType("ToDo")
select_query = (
frappe.qb.from_(note)
.join(todo)
.on(todo.refernce_name == note.name)
.select(UnixTimestamp(note.posting_date))
)
self.assertIn("select unix_timestamp(`tabnote`.`posting_date`)", str(select_query).lower())
# Order by
select_query = select_query.orderby(UnixTimestamp(note.posting_date))
self.assertIn(
"order by unix_timestamp(`tabnote`.`posting_date`)",
str(select_query).lower(),
)
# Function comparison
select_query = select_query.where(
UnixTimestamp(note.posting_date) >= UnixTimestamp("2021-01-01")
)
self.assertIn(
"unix_timestamp(`tabnote`.`posting_date`)>=unix_timestamp('2021-01-01')",
str(select_query).lower(),
)
# aliasing
select_query = select_query.select(UnixTimestamp(note.posting_date, alias="unix_ts"))
self.assertIn(
"unix_timestamp(`tabnote`.`posting_date`) `unix_ts`",
str(select_query).lower(),
)
def test_time(self):
note = frappe.qb.DocType("Note")
self.assertEqual(
@ -162,6 +211,47 @@ class TestCustomFunctionsPostgres(FrappeTestCase):
'"tabnote"."posting_date"+"tabnote"."posting_time" "timestamp"', str(select_query).lower()
)
def test_unix_ts_postgres(self):
# Simple Query
note = frappe.qb.DocType("Note")
self.assertEqual(
"extract(epoch from posting_date)",
UnixTimestamp(note.posting_date).get_sql().lower(),
)
# Complex multi table query
todo = frappe.qb.DocType("ToDo")
select_query = (
frappe.qb.from_(note)
.join(todo)
.on(todo.refernce_name == note.name)
.select(UnixTimestamp(note.posting_date))
)
self.assertIn('extract(epoch from "tabnote"."posting_date")', str(select_query).lower())
# Order by
select_query = select_query.orderby(UnixTimestamp(note.posting_date))
self.assertIn(
'order by extract(epoch from "tabnote"."posting_date")',
str(select_query).lower(),
)
# Function comparison
select_query = select_query.where(
UnixTimestamp(note.posting_date) >= UnixTimestamp(Date("2021-01-01"))
)
self.assertIn(
'extract(epoch from "tabnote"."posting_date")>=extract(epoch from date(\'2021-01-01\'))',
str(select_query).lower(),
)
# aliasing
select_query = select_query.select(UnixTimestamp(note.posting_date, alias="unix_ts"))
self.assertIn(
'extract(epoch from "tabnote"."posting_date") "unix_ts"',
str(select_query).lower(),
)
def test_time(self):
note = frappe.qb.DocType("Note")