Merge branch 'develop' into fix-webform-get-fields-description
This commit is contained in:
commit
20d2e2dced
50 changed files with 9781 additions and 7379 deletions
|
|
@ -227,17 +227,17 @@ context("Control Link", () => {
|
|||
field_name: "assigned_by",
|
||||
property: "default",
|
||||
property_type: "Text",
|
||||
value: "Administrator",
|
||||
value: cy.config("testUser"),
|
||||
},
|
||||
true
|
||||
);
|
||||
cy.reload();
|
||||
cy.new_form("ToDo");
|
||||
cy.fill_field("description", "new", "Text Editor").wait(200);
|
||||
cy.fill_field("description", "new", "Text Editor").blur().wait(200);
|
||||
cy.save();
|
||||
cy.get(".frappe-control[data-fieldname=assigned_by_full_name] .control-value").should(
|
||||
"contain",
|
||||
"Administrator"
|
||||
"Frappe"
|
||||
);
|
||||
// if user clears default value explicitly, system should not reset default again
|
||||
cy.get_field("assigned_by").clear().blur();
|
||||
|
|
|
|||
|
|
@ -437,14 +437,6 @@ def validate_link(doctype: str, docname: str, fields=None):
|
|||
if not values.name:
|
||||
return values
|
||||
|
||||
if not frappe.has_permission(doctype, "read", doc=values.name):
|
||||
frappe.throw(
|
||||
_("You do not have permission to access {0} {1}").format(
|
||||
frappe.bold(doctype), frappe.bold(docname)
|
||||
),
|
||||
frappe.PermissionError,
|
||||
)
|
||||
|
||||
if not fields:
|
||||
frappe.local.response_headers.set("Cache-Control", "private,max-age=1800,stale-while-revalidate=7200")
|
||||
return values
|
||||
|
|
|
|||
|
|
@ -1027,6 +1027,13 @@ def list_sites(context: CliCtxObj, output_json=False):
|
|||
click.echo("No sites found")
|
||||
|
||||
|
||||
@click.command("setup-chrome")
|
||||
def setup_chrome():
|
||||
from frappe.utils.print_utils import setup_chromium
|
||||
|
||||
setup_chromium()
|
||||
|
||||
|
||||
commands = [
|
||||
build,
|
||||
clear_cache,
|
||||
|
|
@ -1059,4 +1066,5 @@ commands = [
|
|||
add_to_email_queue,
|
||||
rebuild_global_search,
|
||||
list_sites,
|
||||
setup_chrome,
|
||||
]
|
||||
|
|
|
|||
|
|
@ -89,7 +89,7 @@ on_logout = "frappe.core.doctype.session_default_settings.session_default_settin
|
|||
pdf_header_html = "frappe.utils.pdf.pdf_header_html"
|
||||
pdf_body_html = "frappe.utils.pdf.pdf_body_html"
|
||||
pdf_footer_html = "frappe.utils.pdf.pdf_footer_html"
|
||||
|
||||
pdf_generator = "frappe.utils.pdf.get_chrome_pdf"
|
||||
# permissions
|
||||
|
||||
permission_query_conditions = {
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load diff
File diff suppressed because it is too large
Load diff
File diff suppressed because it is too large
Load diff
File diff suppressed because it is too large
Load diff
File diff suppressed because it is too large
Load diff
File diff suppressed because it is too large
Load diff
File diff suppressed because it is too large
Load diff
File diff suppressed because it is too large
Load diff
File diff suppressed because it is too large
Load diff
File diff suppressed because it is too large
Load diff
File diff suppressed because it is too large
Load diff
File diff suppressed because it is too large
Load diff
File diff suppressed because it is too large
Load diff
File diff suppressed because it is too large
Load diff
File diff suppressed because it is too large
Load diff
File diff suppressed because it is too large
Load diff
File diff suppressed because it is too large
Load diff
File diff suppressed because it is too large
Load diff
File diff suppressed because it is too large
Load diff
File diff suppressed because it is too large
Load diff
File diff suppressed because it is too large
Load diff
File diff suppressed because it is too large
Load diff
File diff suppressed because it is too large
Load diff
File diff suppressed because it is too large
Load diff
File diff suppressed because it is too large
Load diff
File diff suppressed because it is too large
Load diff
File diff suppressed because it is too large
Load diff
File diff suppressed because it is too large
Load diff
|
|
@ -8,7 +8,11 @@ import json
|
|||
import re
|
||||
from collections import Counter
|
||||
from collections.abc import Mapping, Sequence
|
||||
from functools import cached_property
|
||||
from functools import cached_property, lru_cache
|
||||
|
||||
import sqlparse
|
||||
from sqlparse import tokens
|
||||
from sqlparse.sql import Function, Parenthesis, Statement
|
||||
|
||||
import frappe
|
||||
import frappe.defaults
|
||||
|
|
@ -33,6 +37,22 @@ from frappe.utils import (
|
|||
)
|
||||
from frappe.utils.data import DateTimeLikeObject, get_datetime, getdate, sbool
|
||||
|
||||
|
||||
@lru_cache(maxsize=128)
|
||||
def _parse_sql(field: str) -> Statement | None:
|
||||
"""
|
||||
Parse a given SQL statement using `sqlparse`.
|
||||
|
||||
Args:
|
||||
field (str): The SQL statement string to parse.
|
||||
|
||||
Returns:
|
||||
Statement | None: A `sqlparse.sql.Statement` object if parsing succeeds, otherwise `None`.
|
||||
"""
|
||||
if parsed := sqlparse.parse(field):
|
||||
return parsed[0]
|
||||
|
||||
|
||||
LOCATE_PATTERN = re.compile(r"locate\([^,]+,\s*[`\"]?name[`\"]?\s*\)", flags=re.IGNORECASE)
|
||||
LOCATE_CAST_PATTERN = re.compile(r"locate\(([^,]+),\s*([`\"]?name[`\"]?)\s*\)", flags=re.IGNORECASE)
|
||||
FUNC_IFNULL_PATTERN = re.compile(r"(strpos|ifnull|coalesce)\(\s*[`\"]?name[`\"]?\s*,", flags=re.IGNORECASE)
|
||||
|
|
@ -456,6 +476,46 @@ from {tables}
|
|||
"sleep",
|
||||
]
|
||||
|
||||
def _find_subqueries(parsed: Statement) -> list:
|
||||
"""
|
||||
Recursively find all subqueries in a parsed SQL statement.
|
||||
"""
|
||||
subqueries = []
|
||||
|
||||
for token in parsed.tokens:
|
||||
if isinstance(token, Parenthesis):
|
||||
# Check for DML token for subquery check
|
||||
is_subquery = False
|
||||
for sub_token in token.tokens:
|
||||
if sub_token.ttype is tokens.DML:
|
||||
is_subquery = True
|
||||
break
|
||||
if is_subquery:
|
||||
subqueries.append(token)
|
||||
# Recursively check for nested subqueries
|
||||
subqueries.extend(_find_subqueries(token))
|
||||
elif token.is_group:
|
||||
subqueries.extend(_find_subqueries(token))
|
||||
|
||||
return subqueries
|
||||
|
||||
def _check_sql_token(statement: Statement) -> None:
|
||||
"""
|
||||
Checks the output of `sqlparse.parse()` to detect blocked functions and subqueries.
|
||||
"""
|
||||
if _find_subqueries(statement):
|
||||
_raise_exception()
|
||||
|
||||
for token in statement.tokens:
|
||||
if isinstance(token, Function):
|
||||
if (name := (token.get_name())) and name.lower() in blacklisted_functions:
|
||||
_raise_exception()
|
||||
if token.ttype == tokens.Keyword:
|
||||
if token.value.lower() in blacklisted_keywords:
|
||||
_raise_exception()
|
||||
if token.is_group:
|
||||
_check_sql_token(token)
|
||||
|
||||
def _raise_exception():
|
||||
frappe.throw(_("Use of sub-query or function is restricted"), frappe.DataError)
|
||||
|
||||
|
|
@ -470,21 +530,8 @@ from {tables}
|
|||
lower_field = field.lower().strip()
|
||||
|
||||
if SUB_QUERY_PATTERN.match(field):
|
||||
# Check for subquery anywhere in the field, not just at the beginning
|
||||
if "(" in lower_field:
|
||||
# Check all parentheses pairs, not just the first one
|
||||
paren_start = 0
|
||||
while True:
|
||||
location = lower_field.find("(", paren_start)
|
||||
if location == -1:
|
||||
break
|
||||
token = lower_field[location + 1 :].lstrip().split(" ", 1)[0]
|
||||
if any(
|
||||
re.search(r"\b" + re.escape(keyword) + r"\b", token)
|
||||
for keyword in blacklisted_keywords + blacklisted_functions
|
||||
):
|
||||
_raise_exception()
|
||||
paren_start = location + 1
|
||||
# Check all tokens for subquery detection
|
||||
_check_sql_token(_parse_sql(field))
|
||||
|
||||
if "@" in lower_field:
|
||||
# prevent access to global variables
|
||||
|
|
|
|||
|
|
@ -268,7 +268,7 @@
|
|||
"fieldname": "pdf_generator",
|
||||
"fieldtype": "Select",
|
||||
"label": "PDF Generator",
|
||||
"options": "wkhtmltopdf"
|
||||
"options": "wkhtmltopdf\nchrome"
|
||||
},
|
||||
{
|
||||
"default": "DocType",
|
||||
|
|
@ -292,7 +292,7 @@
|
|||
"icon": "fa fa-print",
|
||||
"idx": 1,
|
||||
"links": [],
|
||||
"modified": "2025-09-16 11:20:20.151669",
|
||||
"modified": "2025-09-23 10:39:51.123539",
|
||||
"modified_by": "Administrator",
|
||||
"module": "Printing",
|
||||
"name": "Print Format",
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ class PrintFormat(Document):
|
|||
page_number: DF.Literal[
|
||||
"Hide", "Top Left", "Top Center", "Top Right", "Bottom Left", "Bottom Center", "Bottom Right"
|
||||
]
|
||||
pdf_generator: DF.Literal["wkhtmltopdf"]
|
||||
pdf_generator: DF.Literal["wkhtmltopdf", "chrome"]
|
||||
print_format_builder: DF.Check
|
||||
print_format_builder_beta: DF.Check
|
||||
print_format_for: DF.Literal["DocType", "Report"]
|
||||
|
|
|
|||
|
|
@ -680,11 +680,15 @@ frappe.ui.form.PrintView = class {
|
|||
}
|
||||
} else {
|
||||
this.is_wkhtmltopdf_valid();
|
||||
this.render_page("/api/method/frappe.utils.print_format.download_pdf?");
|
||||
this.render_page(
|
||||
"/api/method/frappe.utils.print_format.download_pdf?",
|
||||
false,
|
||||
print_format?.pdf_generator
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
render_page(method, printit = false) {
|
||||
render_page(method, printit = false, pdf_generator = "wkhtmltopdf") {
|
||||
let w = window.open(
|
||||
frappe.urllib.get_full_url(
|
||||
method +
|
||||
|
|
@ -701,7 +705,9 @@ frappe.ui.form.PrintView = class {
|
|||
encodeURIComponent(this.get_letterhead()) +
|
||||
"&settings=" +
|
||||
encodeURIComponent(JSON.stringify(this.additional_settings)) +
|
||||
(this.lang_code ? "&_lang=" + this.lang_code : "")
|
||||
(this.lang_code ? "&_lang=" + this.lang_code : "") +
|
||||
"&pdf_generator=" +
|
||||
encodeURIComponent(pdf_generator)
|
||||
)
|
||||
);
|
||||
if (!w) {
|
||||
|
|
|
|||
|
|
@ -1585,7 +1585,10 @@ frappe.views.ReportView = class ReportView extends frappe.views.ListView {
|
|||
fields: this.get_dialog_fields(),
|
||||
primary_action: (values) => {
|
||||
// doctype fields
|
||||
let fields = values[this.doctype].map((f) => [f, this.doctype]);
|
||||
let fields = (values[this.doctype] || []).map((f) => [
|
||||
f,
|
||||
this.doctype,
|
||||
]);
|
||||
delete values[this.doctype];
|
||||
|
||||
// child table fields
|
||||
|
|
@ -1610,6 +1613,18 @@ frappe.views.ReportView = class ReportView extends frappe.views.ListView {
|
|||
},
|
||||
});
|
||||
|
||||
const $bulk = $(`
|
||||
<div class="mb-3">
|
||||
<button class="btn btn-default btn-xs" data-action="select_all">${__("Select All")}</button>
|
||||
<button class="btn btn-default btn-xs" data-action="unselect_all">${__("Unselect All")}</button>
|
||||
</div>
|
||||
`);
|
||||
const toggleAll = (checked) =>
|
||||
d.$wrapper.find(":checkbox").prop("checked", checked).trigger("change");
|
||||
$bulk.on("click", "[data-action=select_all]", () => toggleAll(true));
|
||||
$bulk.on("click", "[data-action=unselect_all]", () => toggleAll(false));
|
||||
d.$body.prepend($bulk);
|
||||
|
||||
d.$body.prepend(`
|
||||
<div class="columns-search">
|
||||
<input type="text" placeholder="${__(
|
||||
|
|
|
|||
|
|
@ -677,8 +677,7 @@
|
|||
}
|
||||
.column-limit-reached {
|
||||
background-color: var(--subtle-accent);
|
||||
border-top-left-radius: var(--border-radius-md);
|
||||
border-top-right-radius: var(--border-radius-md);
|
||||
border-radius: var(--border-radius-md);
|
||||
border: 1px solid var(--table-border-color);
|
||||
overflow-y: hidden;
|
||||
.form-grid {
|
||||
|
|
@ -688,6 +687,9 @@
|
|||
&:focus-visible {
|
||||
@include grid-focus();
|
||||
}
|
||||
.grid-static-col {
|
||||
height: 38px;
|
||||
}
|
||||
.grid-static-col.col-xs-1 {
|
||||
flex: 1 0 60px;
|
||||
width: 60px;
|
||||
|
|
|
|||
|
|
@ -182,7 +182,7 @@ a.card-link:hover {
|
|||
}
|
||||
|
||||
.text-extra-muted {
|
||||
color: #d1d8dd !important;
|
||||
color: var(--gray-500) !important;
|
||||
}
|
||||
|
||||
.no-underline {
|
||||
|
|
|
|||
49
frappe/templates/print_formats/chrome_pdf_header_footer.html
Normal file
49
frappe/templates/print_formats/chrome_pdf_header_footer.html
Normal file
|
|
@ -0,0 +1,49 @@
|
|||
<!DOCTYPE html>
|
||||
<html lang={{ lang }} dir={{ layout_direction }}>
|
||||
<head>
|
||||
<meta charset="utf-8">
|
||||
{% for tag in head -%}
|
||||
{{ tag | string }}
|
||||
{%- endfor %}
|
||||
|
||||
<style>
|
||||
body {
|
||||
margin: 0 !important;
|
||||
border: 0 !important;
|
||||
padding-top: 1mm !important;
|
||||
}
|
||||
.letter-head,
|
||||
.letter-head-footer {
|
||||
margin-top: -12mm !important;
|
||||
}
|
||||
/* Dont show explicit links for <a> tags */
|
||||
@media print {
|
||||
/* padding is added to simulate old wkhtmltopdf format prints */
|
||||
.wrapper {
|
||||
box-sizing: border-box;
|
||||
padding: 1mm 0 1mm !important;
|
||||
page-break-after: always !important;
|
||||
}
|
||||
[document-status] {
|
||||
margin-bottom: 0 !important;
|
||||
}
|
||||
a[href]:after {
|
||||
content: none;
|
||||
}
|
||||
}
|
||||
</style>
|
||||
{% for tag in styles -%}
|
||||
{{ tag | string }}
|
||||
{%- endfor %}
|
||||
|
||||
</head>
|
||||
<body>
|
||||
<div class="print-format">
|
||||
<div class="wrapper">
|
||||
{% for tag in content -%}
|
||||
{{ tag | string }}
|
||||
{%- endfor %}
|
||||
</div>
|
||||
</div>
|
||||
</body>
|
||||
</html>
|
||||
|
|
@ -489,6 +489,23 @@ class TestDBQuery(IntegrationTestCase):
|
|||
)
|
||||
self.assertTrue("_relevance" in data[0])
|
||||
|
||||
# Test that fields with keywords in strings are allowed
|
||||
data = DatabaseQuery("DocType").execute(
|
||||
fields=["name", "locate('select', name)"],
|
||||
limit_start=0,
|
||||
limit_page_length=1,
|
||||
)
|
||||
self.assertTrue(data)
|
||||
|
||||
# Test that subqueries with other DML are blocked
|
||||
self.assertRaises(
|
||||
frappe.DataError,
|
||||
DatabaseQuery("DocType").execute,
|
||||
fields=["name", "issingle", "(insert into tabUser values (1))"],
|
||||
limit_start=0,
|
||||
limit_page_length=1,
|
||||
)
|
||||
|
||||
data = DatabaseQuery("DocType").execute(
|
||||
fields=["name", "issingle", "date(creation) as creation"],
|
||||
limit_start=0,
|
||||
|
|
@ -554,6 +571,16 @@ class TestDBQuery(IntegrationTestCase):
|
|||
limit_page_length=1,
|
||||
)
|
||||
|
||||
# Ensure search terms aren't blocked as functions
|
||||
from frappe.desk.search import search_link
|
||||
|
||||
search_terms = ("global", "user")
|
||||
|
||||
for term in search_terms:
|
||||
with self.subTest(term=term):
|
||||
result = search_link("ToDo", term)
|
||||
self.assertIsInstance(result, list)
|
||||
|
||||
def test_nested_permission(self):
|
||||
frappe.set_user("Administrator")
|
||||
create_nested_doctype()
|
||||
|
|
|
|||
|
|
@ -131,6 +131,37 @@ def get_pdf(html, options=None, output: PdfWriter | None = None):
|
|||
return filedata
|
||||
|
||||
|
||||
def measure_time(func):
|
||||
import time
|
||||
|
||||
def wrapper(*args, **kwargs):
|
||||
start_time = time.time()
|
||||
result = func(*args, **kwargs)
|
||||
end_time = time.time()
|
||||
print(f"Function {func.__name__} took {end_time - start_time:.4f} seconds")
|
||||
return result
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
@measure_time
|
||||
def get_chrome_pdf(print_format, html, options, output, pdf_generator=None):
|
||||
from frappe.utils.pdf_generator.browser import Browser
|
||||
from frappe.utils.pdf_generator.chrome_pdf_generator import ChromePDFGenerator
|
||||
from frappe.utils.pdf_generator.pdf_merge import PDFTransformer
|
||||
|
||||
if pdf_generator != "chrome":
|
||||
# Use the default pdf generator
|
||||
return
|
||||
# scrubbing url to expand url is not required as we have set url.
|
||||
# also, planning to remove network requests anyway 🤞
|
||||
generator = ChromePDFGenerator()
|
||||
browser = Browser(generator, print_format, html, options)
|
||||
transformer = PDFTransformer(browser)
|
||||
# transforms and merges header, footer into body pdf and returns merged pdf
|
||||
return transformer.transform_pdf(output=output)
|
||||
|
||||
|
||||
def get_file_data_from_writer(writer_obj):
|
||||
# https://docs.python.org/3/library/io.html
|
||||
stream = io.BytesIO()
|
||||
|
|
|
|||
465
frappe/utils/pdf_generator/browser.py
Normal file
465
frappe/utils/pdf_generator/browser.py
Normal file
|
|
@ -0,0 +1,465 @@
|
|||
from typing import ClassVar
|
||||
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
import frappe
|
||||
from frappe.utils.print_utils import convert_uom, parse_float_and_unit
|
||||
|
||||
|
||||
class Browser:
|
||||
def __init__(self, generator, print_format, html, options):
|
||||
self.is_print_designer = frappe.get_cached_value("Print Format", print_format, "print_designer")
|
||||
self.browserID = frappe.utils.random_string(10)
|
||||
generator.add_browser(self.browserID)
|
||||
# sets soup from html
|
||||
self.set_html(html)
|
||||
# sets wkhtmltopdf options
|
||||
self.set_options(options)
|
||||
# start cdp connection and create browser context ( kind of like new window / incognito mode)
|
||||
self.open(generator)
|
||||
# opens header and footer pages and sets content ( not waiting for it to load)
|
||||
self.prepare_header_footer()
|
||||
# opens body page and sets content and waits for it to finshing load
|
||||
self.setup_body_page()
|
||||
# prepare options as per chrome for pdf
|
||||
self.prepare_options_for_pdf()
|
||||
# generate header and footer pages if they are not dynamic ( first, odd, even, last)
|
||||
self.update_header_footer_page_pd()
|
||||
# if header and footer are not dynamic start generating pdf for them (non-blocking)
|
||||
self.try_async_header_footer_pdf()
|
||||
# now wait for page to load as we need DOM to generate pdf
|
||||
self.body_page.wait_for_set_content()
|
||||
self.body_pdf = self.body_page.generate_pdf(raw=not self.header_page and not self.footer_page)
|
||||
self.body_page.close()
|
||||
self.update_header_footer_page()
|
||||
|
||||
if self.header_page:
|
||||
if not self.is_header_dynamic:
|
||||
self.header_pdf = self.header_page.get_pdf_from_stream(self.header_page.get_pdf_stream_id())
|
||||
else:
|
||||
self.header_pdf = self.header_page.generate_pdf()
|
||||
self.header_page.close()
|
||||
|
||||
if self.footer_page:
|
||||
if not self.is_footer_dynamic:
|
||||
self.footer_pdf = self.footer_page.get_pdf_from_stream(self.footer_page.get_pdf_stream_id())
|
||||
else:
|
||||
self.footer_pdf = self.footer_page.generate_pdf()
|
||||
self.footer_page.close()
|
||||
|
||||
self.close()
|
||||
|
||||
generator.remove_browser(self.browserID)
|
||||
|
||||
def open(self, generator):
|
||||
from frappe.utils.pdf_generator.cdp_connection import CDPSocketClient
|
||||
|
||||
# checking because if we share browser accross request _devtools_url will already be set for subsequent requests.
|
||||
if not generator._devtools_url:
|
||||
generator._set_devtools_url()
|
||||
# start the CDP websocket connection to browser
|
||||
self.session = CDPSocketClient(generator._devtools_url)
|
||||
|
||||
self.session.connect()
|
||||
self.create_browser_context()
|
||||
|
||||
def create_browser_context(self):
|
||||
# create browser context
|
||||
result, error = self.session.send("Target.createBrowserContext", {"disposeOnDetach": True})
|
||||
if error:
|
||||
frappe.log_error(title="Error creating browser context:", message=f"{error}")
|
||||
self.browser_context_id = result["browserContextId"]
|
||||
|
||||
def set_html(self, html):
|
||||
self.soup = BeautifulSoup(html, "html5lib")
|
||||
|
||||
def set_options(self, options):
|
||||
self.options = options
|
||||
|
||||
def new_page(self, page_type):
|
||||
"""
|
||||
# create a new page in the browser inside browser context
|
||||
----
|
||||
TODO: Implement Deterministic rendering for headless-chrome via DevTools Protocol ( waiting for macos support )
|
||||
https://docs.google.com/document/d/1PppegrpXhOzKKAuNlP6XOEnviXFGUiX2hop00Cxcv4o/edit?tab=t.0#bookmark=id.dukbomwxpb3j
|
||||
|
||||
NOTE: In theory this will make it faster but more importantly use less cpu, ram etc.
|
||||
"""
|
||||
|
||||
from frappe.utils.pdf_generator.page import Page
|
||||
|
||||
page = Page(self.session, self.browser_context_id, page_type)
|
||||
page.is_print_designer = self.is_print_designer
|
||||
|
||||
return page
|
||||
|
||||
def setup_body_page(self):
|
||||
self.body_page = self.new_page("body")
|
||||
self.body_page.set_tab_url(frappe.request.host_url)
|
||||
self.body_page.wait_for_navigate()
|
||||
self.body_page.set_content(str(self.soup))
|
||||
|
||||
def close_page(self, type):
|
||||
page = getattr(self, f"{type}_page")
|
||||
page.close()
|
||||
|
||||
def is_page_no_used(self, soup):
|
||||
# Check if any of the classes exist
|
||||
classes_to_check = [
|
||||
"page",
|
||||
"frompage",
|
||||
"topage",
|
||||
"page_info_page",
|
||||
"page_info_frompage",
|
||||
"page_info_topage",
|
||||
]
|
||||
|
||||
# Loop through the classes to check
|
||||
for class_name in classes_to_check:
|
||||
if soup.find(class_=class_name): # Check if any element with the class is found
|
||||
return True # Return True if class is found
|
||||
|
||||
return False
|
||||
|
||||
def prepare_header_footer(self):
|
||||
# code is structured like this to improve performance by running commands in chrome as soon as possible.
|
||||
soup = self.soup
|
||||
options = self.options
|
||||
# open header and footer pages
|
||||
self._open_header_footer_pages()
|
||||
|
||||
# get tags to pass to header template.
|
||||
head = soup.find("head").contents
|
||||
styles = soup.find_all("style")
|
||||
|
||||
# set header and footer content ( not waiting for it to load yet).
|
||||
if self.header_page:
|
||||
self.header_page.wait_for_navigate()
|
||||
self.header_page.set_content(
|
||||
self.get_rendered_header_footer(self.header_content, "header", head, styles, css=[])
|
||||
)
|
||||
|
||||
if self.footer_page:
|
||||
self.footer_page.wait_for_navigate()
|
||||
self.footer_page.set_content(
|
||||
self.get_rendered_header_footer(self.footer_content, "footer", head, styles, css=[])
|
||||
)
|
||||
if self.header_page:
|
||||
self.header_page.wait_for_set_content()
|
||||
self.header_height = self.header_page.get_element_height()
|
||||
self.is_header_dynamic = self.is_page_no_used(self.header_content)
|
||||
del self.header_content
|
||||
else:
|
||||
# bad implicit setting of margin #backwards-compatibility
|
||||
options["margin-top"] = "15mm"
|
||||
|
||||
if self.footer_page:
|
||||
self.footer_page.wait_for_set_content()
|
||||
self.footer_height = self.footer_page.get_element_height()
|
||||
self.is_footer_dynamic = self.is_page_no_used(self.footer_content)
|
||||
del self.footer_content
|
||||
else:
|
||||
# bad implicit setting of margin #backwards-compatibility
|
||||
options["margin-bottom"] = "15mm"
|
||||
|
||||
# Remove instances of them from main content for render_template
|
||||
for html_id in ["header-html", "footer-html"]:
|
||||
for tag in soup.find_all(id=html_id):
|
||||
tag.extract()
|
||||
|
||||
def try_async_header_footer_pdf(self):
|
||||
if self.header_page and not self.is_header_dynamic:
|
||||
self.header_page.generate_pdf(wait_for_pdf=False)
|
||||
if self.footer_page and not self.is_footer_dynamic:
|
||||
self.footer_page.generate_pdf(wait_for_pdf=False)
|
||||
|
||||
def _get_converted_num(self, num_str, unit="px"):
|
||||
parsed = parse_float_and_unit(num_str)
|
||||
if parsed:
|
||||
return convert_uom(parsed["value"], parsed["unit"], unit, only_number=True)
|
||||
|
||||
def _parse_pdf_options_from_html(self):
|
||||
from frappe.utils.pdf import get_print_format_styles
|
||||
|
||||
soup: BeautifulSoup = self.soup
|
||||
options = {}
|
||||
print_format_css = get_print_format_styles(soup)
|
||||
attrs = (
|
||||
"margin-top",
|
||||
"margin-bottom",
|
||||
"margin-left",
|
||||
"margin-right",
|
||||
"page-size",
|
||||
"header-spacing",
|
||||
"orientation",
|
||||
"page-width",
|
||||
"page-height",
|
||||
)
|
||||
options |= {style.name: style.value for style in print_format_css if style.name in attrs}
|
||||
self.options.update(options)
|
||||
|
||||
def _set_default_page_size(self):
|
||||
options = self.options
|
||||
pdf_page_size = (
|
||||
options.get("page-size") or frappe.db.get_single_value("Print Settings", "pdf_page_size") or "A4"
|
||||
)
|
||||
|
||||
if pdf_page_size == "Custom":
|
||||
options["page-height"] = options.get("page-height") or frappe.db.get_single_value(
|
||||
"Print Settings", "pdf_page_height"
|
||||
)
|
||||
options["page-width"] = options.get("page-width") or frappe.db.get_single_value(
|
||||
"Print Settings", "pdf_page_width"
|
||||
)
|
||||
else:
|
||||
options["page-size"] = pdf_page_size
|
||||
|
||||
def prepare_options_for_pdf(self):
|
||||
self._parse_pdf_options_from_html()
|
||||
self._set_default_page_size()
|
||||
|
||||
options = self.options
|
||||
|
||||
updated_options = {
|
||||
"scale": 1,
|
||||
"printBackground": True,
|
||||
"transferMode": "ReturnAsStream",
|
||||
"marginTop": 0,
|
||||
"marginBottom": 0,
|
||||
"marginLeft": 0,
|
||||
"marginRight": 0,
|
||||
"landscape": options.get("orientation", "Portrait") == "Landscape",
|
||||
"preferCSSPageSize": False,
|
||||
"pageRanges": options.get("page-ranges", ""),
|
||||
# Experimental
|
||||
"generateTaggedPDF": options.get("generate-tagged-pdf", False),
|
||||
"generateOutline": options.get("generate-outline", False),
|
||||
}
|
||||
|
||||
# bad implicit setting of margin #backwards-compatibility
|
||||
if not self.is_print_designer:
|
||||
if not options.get("margin-right"):
|
||||
options["margin-right"] = "15mm"
|
||||
|
||||
if not options.get("margin-left"):
|
||||
options["margin-left"] = "15mm"
|
||||
|
||||
if not options.get("page-height") or not options.get("page-width"):
|
||||
if not (page_size := self.options.get("page-size")):
|
||||
raise frappe.ValidationError("Page size is required")
|
||||
if page_size == "CUSTOM":
|
||||
raise frappe.ValidationError("Custom page size requires page-height and page-width")
|
||||
size = PageSize.get(page_size)
|
||||
if not size:
|
||||
raise frappe.ValidationError("Invalid page size")
|
||||
|
||||
options["page-height"] = convert_uom(size["height"], "mm", "px", only_number=True)
|
||||
options["page-width"] = convert_uom(size["width"], "mm", "px", only_number=True)
|
||||
|
||||
if isinstance(options["page-height"], str):
|
||||
options["page-height"] = self._get_converted_num(options["page-height"])
|
||||
|
||||
if isinstance(options["page-width"], str):
|
||||
options["page-width"] = self._get_converted_num(options["page-width"])
|
||||
|
||||
updated_options["paperWidth"] = convert_uom(options["page-width"], "px", "in", only_number=True)
|
||||
|
||||
if options.get("margin-left"):
|
||||
updated_options["marginLeft"] = convert_uom(
|
||||
self._get_converted_num(options["margin-left"]), "px", "in", only_number=True
|
||||
)
|
||||
|
||||
if options.get("margin-right"):
|
||||
updated_options["marginRight"] = convert_uom(
|
||||
self._get_converted_num(options["margin-right"]), "px", "in", only_number=True
|
||||
)
|
||||
|
||||
# make copy of options to update them in header, body, footer.
|
||||
self.body_page.options = updated_options.copy()
|
||||
if self.header_page:
|
||||
self.header_page.options = updated_options.copy()
|
||||
if self.footer_page:
|
||||
self.footer_page.options = updated_options.copy()
|
||||
|
||||
margin_top = self._get_converted_num(options.get("margin-top", 0))
|
||||
margin_bottom = self._get_converted_num(options.get("margin-bottom", 0))
|
||||
|
||||
header_with_top_margin = 0
|
||||
header_with_spacing_top_margin = 0
|
||||
footer_with_bottom_margin = 0
|
||||
footer_height = 0
|
||||
|
||||
if self.header_page:
|
||||
header_with_top_margin = self.header_height + margin_top
|
||||
header_spacing = options.get("header-spacing", 0)
|
||||
header_with_spacing_top_margin = header_with_top_margin + header_spacing
|
||||
self.header_page.options["paperHeight"] = (
|
||||
convert_uom(header_with_spacing_top_margin, "px", "in", only_number=True)
|
||||
if header_with_spacing_top_margin
|
||||
else 0
|
||||
)
|
||||
|
||||
margin_top = convert_uom(margin_top, "px", "in", only_number=True)
|
||||
|
||||
if self.header_page:
|
||||
self.header_page.options["marginTop"] = margin_top
|
||||
else:
|
||||
self.body_page.options["marginTop"] = margin_top
|
||||
|
||||
if self.footer_page:
|
||||
footer_height = self.footer_height
|
||||
self.footer_page.options["paperHeight"] = (
|
||||
convert_uom(footer_height, "px", "in", only_number=True) if footer_height else 0
|
||||
)
|
||||
footer_with_bottom_margin = self.footer_height + margin_bottom
|
||||
|
||||
margin_bottom = convert_uom(margin_bottom, "px", "in", only_number=True)
|
||||
|
||||
if self.footer_page:
|
||||
self.footer_page.options["marginBottom"] = margin_bottom
|
||||
else:
|
||||
self.body_page.options["marginBottom"] = margin_bottom
|
||||
|
||||
body_height = options.get("page-height") - (
|
||||
header_with_spacing_top_margin + footer_with_bottom_margin
|
||||
)
|
||||
|
||||
"""
|
||||
matching scale for some old formats is 1.46 #backwards-compatibility ( scale 1 is better in my opinion)
|
||||
If we face issues in custom formats then only we should enable this.
|
||||
"""
|
||||
|
||||
self.body_page.options["paperHeight"] = convert_uom(body_height, "px", "in", only_number=True)
|
||||
|
||||
def get_rendered_header_footer(self, content, type, head, styles, css):
|
||||
from frappe.utils.pdf import toggle_visible_pdf
|
||||
|
||||
html_id = f"{type}-html"
|
||||
content = content.extract()
|
||||
toggle_visible_pdf(content)
|
||||
id_map = {"header": "pdf_header_html", "footer": "pdf_footer_html"}
|
||||
hook_func = frappe.get_hooks(id_map.get(type))
|
||||
return frappe.call(
|
||||
hook_func[-1],
|
||||
soup=self.soup,
|
||||
head=head,
|
||||
content=content,
|
||||
styles=styles,
|
||||
html_id=html_id,
|
||||
css=css,
|
||||
path="templates/print_formats/chrome_pdf_header_footer.html",
|
||||
)
|
||||
|
||||
def update_header_footer_page(self):
|
||||
if not self.header_page and not self.footer_page:
|
||||
return
|
||||
total_pages = len(self.body_pdf.pages)
|
||||
# function is added to html from update_page_no.js
|
||||
if self.header_page:
|
||||
if self.is_header_dynamic:
|
||||
self.header_page.evaluate(
|
||||
f"clone_and_update('{'#header-render-container' if self.is_print_designer else '.wrapper'}', {total_pages}, {1 if self.is_print_designer else 0}, 'Header', 1);",
|
||||
await_promise=True,
|
||||
)
|
||||
|
||||
if self.footer_page:
|
||||
if self.is_footer_dynamic:
|
||||
self.footer_page.evaluate(
|
||||
f"clone_and_update('{'#footer-render-container' if self.is_print_designer else '.wrapper'}', {total_pages}, {1 if self.is_print_designer else 0}, 'Footer', 1);",
|
||||
await_promise=True,
|
||||
)
|
||||
|
||||
def update_header_footer_page_pd(self):
|
||||
if not self.is_print_designer:
|
||||
return
|
||||
if not self.header_page and not self.footer_page:
|
||||
return
|
||||
# function is added to html from update_page_no.js
|
||||
if self.header_page and not self.is_header_dynamic:
|
||||
self.header_page.evaluate(
|
||||
"clone_and_update('#header-render-container', 0, 1, 'Header', 0);",
|
||||
await_promise=True,
|
||||
)
|
||||
|
||||
if self.footer_page and not self.is_footer_dynamic:
|
||||
self.footer_page.evaluate(
|
||||
"clone_and_update('#footer-render-container', 0, 1, 'Footer', 0);",
|
||||
await_promise=True,
|
||||
)
|
||||
|
||||
def _open_header_footer_pages(self):
|
||||
self.header_page = None
|
||||
self.footer_page = None
|
||||
# open new page for header/footer if they exist.
|
||||
# It sends CDP command to the browser to open a new tab.
|
||||
if header_content := self.soup.find(id="header-html"):
|
||||
self.header_page = self.new_page("header")
|
||||
self.header_page.set_tab_url(frappe.request.host_url)
|
||||
|
||||
if footer_content := self.soup.find(id="footer-html"):
|
||||
self.footer_page = self.new_page("footer")
|
||||
self.footer_page.set_tab_url(frappe.request.host_url)
|
||||
|
||||
self.header_content = header_content
|
||||
self.footer_content = footer_content
|
||||
|
||||
def close(self):
|
||||
self.session.disconnect()
|
||||
|
||||
|
||||
class PageSize:
|
||||
page_sizes: ClassVar[dict[str, tuple[int, int]]] = {
|
||||
"A10": (26, 37),
|
||||
"A1": (594, 841),
|
||||
"A0": (841, 1189),
|
||||
"A3": (297, 420),
|
||||
"A2": (420, 594),
|
||||
"A5": (148, 210),
|
||||
"A4": (210, 297),
|
||||
"A7": (74, 105),
|
||||
"A6": (105, 148),
|
||||
"A9": (37, 52),
|
||||
"A8": (52, 74),
|
||||
"B10": (44, 31),
|
||||
"B1+": (1020, 720),
|
||||
"B4": (353, 250),
|
||||
"B5": (250, 176),
|
||||
"B6": (176, 125),
|
||||
"B7": (125, 88),
|
||||
"B0": (1414, 1000),
|
||||
"B1": (1000, 707),
|
||||
"B2": (707, 500),
|
||||
"B3": (500, 353),
|
||||
"B2+": (720, 520),
|
||||
"B8": (88, 62),
|
||||
"B9": (62, 44),
|
||||
"C10": (40, 28),
|
||||
"C9": (57, 40),
|
||||
"C8": (81, 57),
|
||||
"C3": (458, 324),
|
||||
"C2": (648, 458),
|
||||
"C1": (917, 648),
|
||||
"C0": (1297, 917),
|
||||
"C7": (114, 81),
|
||||
"C6": (162, 114),
|
||||
"C5": (229, 162),
|
||||
"C4": (324, 229),
|
||||
"Legal": (216, 356),
|
||||
"Junior Legal": (127, 203),
|
||||
"Letter": (216, 279),
|
||||
"Tabloid": (279, 432),
|
||||
"Ledger": (432, 279),
|
||||
"ANSI C": (432, 559),
|
||||
"ANSI A (letter)": (216, 279),
|
||||
"ANSI B (ledger & tabloid)": (279, 432),
|
||||
"ANSI E": (864, 1118),
|
||||
"ANSI D": (559, 864),
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def get(cls, name):
|
||||
if name in cls.page_sizes:
|
||||
width, height = cls.page_sizes[name]
|
||||
return {"width": width, "height": height}
|
||||
else:
|
||||
return None # Return None if the page size is not found
|
||||
179
frappe/utils/pdf_generator/cdp_connection.py
Normal file
179
frappe/utils/pdf_generator/cdp_connection.py
Normal file
|
|
@ -0,0 +1,179 @@
|
|||
import asyncio
|
||||
|
||||
import websockets
|
||||
|
||||
import frappe
|
||||
|
||||
|
||||
class CDPSocketClient:
|
||||
"""
|
||||
Manages WebSocket communications with Chrome DevTools Protocol.
|
||||
Ensures robust error handling and consistent logging.
|
||||
"""
|
||||
|
||||
def __init__(self, websocket_url):
|
||||
self.websocket_url = websocket_url
|
||||
self.connection = None
|
||||
self.message_id = 0
|
||||
self.pending_messages = {}
|
||||
self.listeners = {}
|
||||
self.listen_task = None
|
||||
self.loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(self.loop)
|
||||
|
||||
def connect(self):
|
||||
"""Open the WebSocket connection and start listening for messages."""
|
||||
self.loop.run_until_complete(self._connect())
|
||||
self.listen_task = self.loop.create_task(self._listen())
|
||||
|
||||
async def _connect(self):
|
||||
try:
|
||||
self.connection = await websockets.connect(self.websocket_url)
|
||||
except Exception:
|
||||
frappe.log_error(title="Failed to connect to WebSocket:", message=f"{frappe.get_traceback()}")
|
||||
raise
|
||||
|
||||
async def _listen(self):
|
||||
try:
|
||||
async for message in self.connection:
|
||||
self._handle_message(frappe.json.loads(message))
|
||||
except Exception:
|
||||
frappe.log_error(title="WebSocket listening error:", message=f"{frappe.get_traceback()}")
|
||||
|
||||
def _handle_message(self, response):
|
||||
method = response.get("method")
|
||||
params = response.get("params", {})
|
||||
session_id = response.get("sessionId")
|
||||
target_id = params.get("targetId")
|
||||
frame_id = params.get("frameId")
|
||||
message_id = response.get("id")
|
||||
|
||||
composite_key = (method, session_id, target_id, frame_id)
|
||||
|
||||
# Handle responses with `id`
|
||||
if message_id and message_id in self.pending_messages:
|
||||
future = self.pending_messages.pop(message_id)
|
||||
if composite_key in self.pending_messages:
|
||||
self.pending_messages.pop(composite_key)
|
||||
future.set_result(response)
|
||||
|
||||
# Handle responses without `id` using a composite key
|
||||
elif method:
|
||||
if composite_key in self.pending_messages:
|
||||
# print("matched using composite_key", composite_key)
|
||||
future = self.pending_messages.pop(composite_key)
|
||||
future.set_result(response)
|
||||
|
||||
if method in self.listeners:
|
||||
for callback, future, filters in self.listeners[method]:
|
||||
# added not filters["key"] might cause cross talk between different sessions
|
||||
if (
|
||||
(not session_id or not filters["sessionId"] or filters["sessionId"] == session_id)
|
||||
and (not target_id or not filters["targetId"] or filters["targetId"] == target_id)
|
||||
and (not frame_id or not filters["frameId"] or filters["frameId"] == frame_id)
|
||||
):
|
||||
callback(future, response)
|
||||
|
||||
def disconnect(self):
|
||||
try:
|
||||
if self.listen_task:
|
||||
self.listen_task.cancel()
|
||||
self.loop.run_until_complete(self._disconnect())
|
||||
# Cancel all pending tasks before stopping the loop was causing degrading performance over time to not cancelled properly
|
||||
pending_tasks = [task for task in asyncio.all_tasks(self.loop) if not task.done()]
|
||||
for task in pending_tasks:
|
||||
task.cancel()
|
||||
try:
|
||||
self.loop.run_until_complete(task) # Ensure tasks finish before loop stops
|
||||
except asyncio.CancelledError:
|
||||
pass # Ignore cancellation errors
|
||||
except Exception:
|
||||
frappe.log_error(title="Error while disconnecting:", message=f"{frappe.get_traceback()}")
|
||||
raise
|
||||
|
||||
async def _disconnect(self):
|
||||
try:
|
||||
if self.connection and not self.connection.closed:
|
||||
await self.connection.close()
|
||||
self.connection = None
|
||||
except Exception:
|
||||
frappe.log_error(
|
||||
title="Error during WebSocket disconnection:", message=f"{frappe.get_traceback()}"
|
||||
)
|
||||
|
||||
def send(self, method, params=None, session_id=None, return_future=False):
|
||||
if return_future:
|
||||
return asyncio.ensure_future(
|
||||
self._send(method, params, session_id, wait_future_fulfill=False), loop=self.loop
|
||||
)
|
||||
future = self.loop.run_until_complete(self._send(method, params, session_id))
|
||||
return self._destructure_response(future.result())
|
||||
|
||||
async def _send(self, method, params=None, session_id=None, wait_future_fulfill=True):
|
||||
self.message_id += 1
|
||||
message_id = self.message_id
|
||||
message = {
|
||||
"id": message_id,
|
||||
"method": method,
|
||||
"params": params or {},
|
||||
}
|
||||
|
||||
if session_id:
|
||||
message["sessionId"] = session_id
|
||||
|
||||
if self.connection is None:
|
||||
raise RuntimeError("WebSocket connection is not open.")
|
||||
|
||||
future = asyncio.Future()
|
||||
self.pending_messages[message_id] = future
|
||||
|
||||
# Dynamically create the composite key
|
||||
if any(
|
||||
[
|
||||
method,
|
||||
session_id,
|
||||
params.get("targetId") if params else None,
|
||||
params.get("frameId") if params else None,
|
||||
]
|
||||
):
|
||||
composite_key = (
|
||||
method,
|
||||
session_id,
|
||||
params.get("targetId") if params else None,
|
||||
params.get("frameId") if params else None,
|
||||
)
|
||||
self.pending_messages[composite_key] = future
|
||||
|
||||
await self.connection.send(frappe.json.dumps(message))
|
||||
if wait_future_fulfill:
|
||||
await future
|
||||
return future
|
||||
|
||||
def _destructure_response(self, response):
|
||||
"""Destructure the response to extract useful information."""
|
||||
result = response.get("result", None)
|
||||
error = response.get("error", None)
|
||||
return result, error
|
||||
|
||||
def start_listener(self, method, callback, session_id=None, target_id=None, frame_id=None):
|
||||
"""Register a listener for a specific CDP event with optional filtering."""
|
||||
if method not in self.listeners:
|
||||
self.listeners[method] = []
|
||||
future = self.loop.create_future()
|
||||
|
||||
event = (callback, future, {"sessionId": session_id, "targetId": target_id, "frameId": frame_id})
|
||||
if event not in self.listeners[method]:
|
||||
self.listeners[method].append(event)
|
||||
return event
|
||||
|
||||
def wait_for_event(self, event, timeout=3):
|
||||
if type(event) is tuple:
|
||||
event = event[1]
|
||||
try:
|
||||
self.loop.run_until_complete(asyncio.wait_for(event, timeout))
|
||||
except asyncio.TimeoutError:
|
||||
frappe.log_error(title="Timeout waiting for event", message=f"{frappe.get_traceback()}")
|
||||
|
||||
def remove_listener(self, method, event):
|
||||
"""Remove a listener for a specific CDP event."""
|
||||
self.listeners[method].remove(event)
|
||||
284
frappe/utils/pdf_generator/chrome_pdf_generator.py
Normal file
284
frappe/utils/pdf_generator/chrome_pdf_generator.py
Normal file
|
|
@ -0,0 +1,284 @@
|
|||
import os
|
||||
import platform
|
||||
import subprocess
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import ClassVar
|
||||
|
||||
import requests
|
||||
|
||||
import frappe
|
||||
from frappe import _
|
||||
|
||||
# TODO: close browser when worker is killed.
|
||||
|
||||
|
||||
class ChromePDFGenerator:
|
||||
EXECUTABLE_PATHS: ClassVar[dict[str, list[str]]] = {
|
||||
"linux": ["chrome-linux", "headless_shell"],
|
||||
"darwin": ["chrome-mac", "headless_shell"],
|
||||
"windows": ["chrome-win", "headless_shell.exe"],
|
||||
}
|
||||
|
||||
_instance = None
|
||||
|
||||
_browsers: ClassVar[list] = []
|
||||
|
||||
def add_browser(self, browser):
|
||||
self._browsers.append(browser)
|
||||
|
||||
def remove_browser(self, browser):
|
||||
self._browsers.remove(browser)
|
||||
|
||||
def __new__(cls):
|
||||
# if instance or _chromium_process is not available create object else return current instance stored in cls._instance
|
||||
if cls._instance is None or not cls._instance._chromium_process:
|
||||
cls._instance = super().__new__(cls)
|
||||
return cls._instance
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize only once."""
|
||||
if hasattr(self, "_initialized"): # Prevent multiple initializations
|
||||
return
|
||||
self._initialized = True # Mark as initialized
|
||||
|
||||
self._chromium_process = None
|
||||
self._chromium_path = None
|
||||
self._devtools_url = None
|
||||
self._initialize_chromium()
|
||||
|
||||
def _initialize_chromium(self):
|
||||
# ideally browser is initailized from before request hook.
|
||||
# if _chromium_process is not available then initialize it.
|
||||
if self._chromium_process:
|
||||
return
|
||||
# get site config and load chromium settings.
|
||||
site_config = frappe.get_common_site_config()
|
||||
|
||||
# only when we want to chromium on separate docker / server ( not implemented/tested yet )
|
||||
self.CHROMIUM_WEBSOCKET_URL = site_config.get("chromium_websocket_url", "")
|
||||
if self.CHROMIUM_WEBSOCKET_URL:
|
||||
frappe.warn("Using external chromium websocket url. Make sure it is accessible.")
|
||||
self._devtools_url = self.CHROMIUM_WEBSOCKET_URL
|
||||
return
|
||||
|
||||
# only when we want to use chromium from a specific path ( incase we don't have chromium in bench folder )
|
||||
self.CHROMIUM_BINARY_PATH = site_config.get("chromium_binary_path", "")
|
||||
"""
|
||||
Number of allowed open websocket connections to chromium.
|
||||
This number will basically define how many concurrent requests can be handled by one chromium instance.
|
||||
#TODO: Implement/Modify logic to handle multiple chromium instance in one class / per worker. currently we are starting one chromium.
|
||||
"""
|
||||
self.CHROME_OPEN_CONNECTIONS = site_config.get("chromium_max_concurrent", 1)
|
||||
# if we want to use persistent ( long running ) chromium for all sites.
|
||||
# current approch starts chrome per worker process.
|
||||
# TODO: Better Implement logic to support for persistent chrome proccess.
|
||||
self.USE_PERSISTENT_CHROMIUM = site_config.get("use_persistent_chromium", False)
|
||||
# time to wait for chromium to start and provide dev tools url used in _set_devtools_url.
|
||||
self.START_TIMEOUT = site_config.get("chromium_start_timeout", 3)
|
||||
|
||||
self._chromium_path = (
|
||||
self._find_chromium_executable() if not self.CHROMIUM_BINARY_PATH else self.CHROMIUM_BINARY_PATH
|
||||
)
|
||||
if self._verify_chromium_installation():
|
||||
if not self._devtools_url:
|
||||
self.start_chromium_process()
|
||||
|
||||
def _find_chromium_executable(self):
|
||||
"""Finds the Chromium executable or raises an error if not found."""
|
||||
bench_path = frappe.utils.get_bench_path()
|
||||
"""Determine the path to the Chromium executable. chromium is downloaded by download_chromium in print_designer/install.py"""
|
||||
chromium_dir = os.path.join(bench_path, "chromium")
|
||||
|
||||
if not os.path.exists(chromium_dir):
|
||||
frappe.throw(_("Chromium is not downloaded. Please run the setup first."))
|
||||
|
||||
platform_name = platform.system().lower()
|
||||
|
||||
if platform_name not in ["linux", "darwin", "windows"]:
|
||||
frappe.throw(f"Unsupported platform: {platform_name}")
|
||||
|
||||
executable_name = self.EXECUTABLE_PATHS.get(platform_name)
|
||||
|
||||
# Construct the full path to the executable
|
||||
exec_path = Path(chromium_dir).joinpath(*executable_name)
|
||||
if not exec_path.exists():
|
||||
frappe.throw(
|
||||
f"Chromium executable not found: {exec_path}. please run bench setup-new-pdf-backend"
|
||||
)
|
||||
|
||||
return str(exec_path)
|
||||
|
||||
def _verify_chromium_installation(self):
|
||||
"""Ensures Chromium is available and executable, raising clearer errors if not."""
|
||||
if not os.path.exists(self._chromium_path):
|
||||
frappe.throw(
|
||||
f"Chromium not available at the specified path. Please check the path: {self._chromium_path}"
|
||||
)
|
||||
if not os.access(self._chromium_path, os.X_OK):
|
||||
frappe.throw(f"Chromium not executable at {self._chromium_path}")
|
||||
return True
|
||||
|
||||
def start_chromium_process(self, debug=False):
|
||||
"""
|
||||
Launches Chromium in headless mode with robust logging and error handling.
|
||||
chrome switches
|
||||
https://peter.sh/experiments/chromium-command-line-switches/
|
||||
|
||||
NOTE: dbus issue in docker
|
||||
https://source.chromium.org/chromium/chromium/src/+/main:content/app/content_main.cc;l=229-241?q=DBUS_SESSION_BUS_ADDRESS&ss=chromium
|
||||
"""
|
||||
try:
|
||||
if debug:
|
||||
command_args = [
|
||||
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome", # path to locally installed chrome browser for debugging.
|
||||
"--remote-debugging-port=0",
|
||||
"--user-data-dir=/tmp/chromium-{}-user-data".format(
|
||||
frappe.local.site + frappe.utils.random_string(10)
|
||||
),
|
||||
"--disable-gpu",
|
||||
"--no-sandbox",
|
||||
"--no-first-run",
|
||||
"",
|
||||
]
|
||||
else:
|
||||
command_args = [
|
||||
self._chromium_path,
|
||||
# 0 will automatically select a random open port from the ephemeral port range.
|
||||
"--remote-debugging-port=0",
|
||||
"--disable-gpu", # GPU is not available in production environment.
|
||||
"--disable-field-trial-config",
|
||||
"--disable-background-networking",
|
||||
"--disable-background-timer-throttling",
|
||||
"--disable-backgrounding-occluded-windows",
|
||||
"--disable-back-forward-cache",
|
||||
"--disable-breakpad",
|
||||
"--disable-client-side-phishing-detection",
|
||||
"--disable-component-extensions-with-background-pages",
|
||||
"--disable-component-update",
|
||||
"--no-default-browser-check",
|
||||
"--disable-default-apps",
|
||||
"--disable-dev-shm-usage",
|
||||
"--disable-extensions",
|
||||
"--disable-features=ImprovedCookieControls,LazyFrameLoading,GlobalMediaControls,DestroyProfileOnBrowserClose,MediaRouter,DialMediaRouteProvider,AcceptCHFrame,AutoExpandDetailsElement,CertificateTransparencyComponentUpdater,AvoidUnnecessaryBeforeUnloadCheckSync,Translate,HttpsUpgrades,PaintHolding,ThirdPartyStoragePartitioning,LensOverlay,PlzDedicatedWorker",
|
||||
"--allow-pre-commit-input",
|
||||
"--disable-hang-monitor",
|
||||
"--disable-ipc-flooding-protection",
|
||||
"--disable-popup-blocking",
|
||||
"--disable-prompt-on-repost",
|
||||
"--disable-renderer-backgrounding",
|
||||
"--force-color-profile=srgb",
|
||||
"--metrics-recording-only",
|
||||
"--no-first-run",
|
||||
"--password-store=basic",
|
||||
"--use-mock-keychain",
|
||||
"--no-service-autorun",
|
||||
"--export-tagged-pdf",
|
||||
"--disable-search-engine-choice-screen",
|
||||
"--unsafely-disable-devtools-self-xss-warnings",
|
||||
"--enable-use-zoom-for-dsf=false",
|
||||
"--use-angle",
|
||||
"--headless",
|
||||
"--hide-scrollbars",
|
||||
"--mute-audio",
|
||||
"--blink-settings=primaryHoverType=2,availableHoverTypes=2,primaryPointerType=4,availablePointerTypes=4",
|
||||
"--no-sandbox",
|
||||
"--no-startup-window",
|
||||
# related to HeadlessExperimental flag enable when Implement Deterministic rendering. check page class for more info.
|
||||
# "--enable-surface-synchronization",
|
||||
# "--run-all-compositor-stages-before-draw",
|
||||
# "--disable-threaded-animation",
|
||||
# "--disable-threaded-scrolling",
|
||||
# "--disable-checker-imaging",
|
||||
]
|
||||
|
||||
self._start_chromium_process(command_args)
|
||||
|
||||
except Exception as e:
|
||||
frappe.log_error(f"Error starting Chromium: {e}")
|
||||
frappe.throw(_("Could not start Chromium. Check logs for details."))
|
||||
|
||||
# Apply the decorator to monitor Chromium subprocess usage for development / debugging purposes.
|
||||
# it will print and write usage data to a file ( defaults to chrome_process_usage.json).
|
||||
# from print_designer.pdf_generator.monitor_subprocess import monitor_subprocess_usage
|
||||
# @monitor_subprocess_usage(interval=0.1)
|
||||
def _start_chromium_process(self, command_args):
|
||||
if platform.system().lower() == "windows":
|
||||
# hide cmd window
|
||||
startupinfo = subprocess.STARTUPINFO()
|
||||
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
|
||||
startupinfo.wShowWindow = subprocess.SW_HIDE
|
||||
self._chromium_process = subprocess.Popen(
|
||||
command_args,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
startupinfo=startupinfo,
|
||||
text=True,
|
||||
)
|
||||
else:
|
||||
self._chromium_process = subprocess.Popen(
|
||||
command_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
|
||||
)
|
||||
return self._chromium_process
|
||||
|
||||
def _set_devtools_url(self):
|
||||
"""
|
||||
Monitor Chromium's stderr for the DevTools WebSocket URL
|
||||
----------------
|
||||
other approch: if we choose port using find_available_port we can avoid this entirely and fetch_devtools_url() method.
|
||||
|
||||
NOTE: 1) in current approch output to stderr is pretty consistent.
|
||||
2) other approch may seem reliable but it is slow compared to this in testing.
|
||||
|
||||
TODO:
|
||||
final approch can be decided later after testing in production.
|
||||
"""
|
||||
stderr = self._chromium_process.stderr
|
||||
start_time = time.time()
|
||||
|
||||
while time.time() - start_time < self.START_TIMEOUT:
|
||||
# Read a single line from stderr and check if it contains the DevTools URL.
|
||||
# Not using select() because it is not supported on Windows for non-socket file descriptors.
|
||||
line = stderr.readline()
|
||||
# not sure if "DevTools listening on" is consistent in all chromium versions.
|
||||
if "DevTools listening on" in line:
|
||||
url_start = line.find("ws://")
|
||||
if url_start != -1:
|
||||
self._devtools_url = line[url_start:].strip()
|
||||
break
|
||||
|
||||
if not self._devtools_url:
|
||||
self._chromium_process.terminate()
|
||||
raise TimeoutError("Chromium took too long to start.")
|
||||
|
||||
def _close_browser(self):
|
||||
"""
|
||||
Close the headless Chromium browser.
|
||||
"""
|
||||
if self._browsers:
|
||||
frappe.log("Cannot close Chromium as there are active browser instances.")
|
||||
return
|
||||
if self._chromium_process:
|
||||
self._chromium_process.terminate()
|
||||
ChromePDFGenerator._instance = None
|
||||
self._chromium_process = None
|
||||
self._devtools_url = None
|
||||
frappe.log("Headless Chromium closed successfully.")
|
||||
|
||||
# not used anywhere in the code. read _set_devtools_url for more info. useful in case we want to take different approch to fetch devtools url.
|
||||
def fetch_devtools_url(self, port):
|
||||
if not port:
|
||||
return None
|
||||
url = f"http://127.0.0.1:{port}/json/version"
|
||||
try:
|
||||
response = requests.get(url)
|
||||
response.raise_for_status() # Raise an exception for HTTP errors
|
||||
response_data = response.json()
|
||||
return response_data["webSocketDebuggerUrl"].strip()
|
||||
except requests.ConnectionError:
|
||||
frappe.log_error(
|
||||
f"Failed to connect to the Chrome DevTools Protocol. Is Chrome running with --remote-debugging-port={port}"
|
||||
)
|
||||
except requests.RequestException as e:
|
||||
frappe.log_error(f"An error occurred: {e}")
|
||||
return None
|
||||
356
frappe/utils/pdf_generator/page.py
Normal file
356
frappe/utils/pdf_generator/page.py
Normal file
|
|
@ -0,0 +1,356 @@
|
|||
import base64
|
||||
import time
|
||||
import urllib
|
||||
|
||||
import frappe
|
||||
|
||||
"""
|
||||
CDP commands documentation can be found here.
|
||||
https://chromedevtools.github.io/devtools-protocol/
|
||||
"""
|
||||
|
||||
|
||||
class Page:
|
||||
def __init__(self, session, browser_context_id, page_type):
|
||||
self.session = session
|
||||
result, error = self.session.send(
|
||||
"Target.createTarget", {"url": "", "browserContextId": browser_context_id}
|
||||
)
|
||||
if error:
|
||||
frappe.log_error(title="Error creating new page:", message=f"{error}")
|
||||
|
||||
self.target_id = result["targetId"]
|
||||
self.type = page_type
|
||||
result, error = self.session.send(
|
||||
"Target.attachToTarget", {"targetId": self.target_id, "flatten": True}
|
||||
)
|
||||
if error:
|
||||
raise RuntimeError(f"Error attaching to target: {error}")
|
||||
self.session_id = result["sessionId"]
|
||||
self.send("Page.enable")
|
||||
self.frame_id = None
|
||||
self.get_frame_id_on_demand()
|
||||
self.set_media_emulation("print")
|
||||
self.set_cookies()
|
||||
|
||||
# TODO: make send to return future and don't wait for it by default.
|
||||
def send(self, method, params=None, return_future=False):
|
||||
if params is None:
|
||||
params = {}
|
||||
return self.session.send(method, params, self.session_id, return_future)
|
||||
|
||||
def get_frame_id_on_demand(self):
|
||||
if self.frame_id:
|
||||
return self.frame_id
|
||||
try:
|
||||
result, error = self.send("Page.getFrameTree")
|
||||
if error:
|
||||
raise RuntimeError(f"Error fetching frameId: {error}")
|
||||
frame_tree = result["frameTree"]
|
||||
frame = frame_tree["frame"]
|
||||
self.frame_id = frame["id"]
|
||||
return self.frame_id
|
||||
except Exception:
|
||||
frappe.log_error(title="Error fetching frameId:", message=f"{frappe.get_traceback()}")
|
||||
raise
|
||||
|
||||
def _ensure_frame_id(self):
|
||||
if not self.frame_id:
|
||||
self.get_frame_id_on_demand()
|
||||
return self.frame_id
|
||||
|
||||
def set_media_emulation(self, media_type: str = "print"):
|
||||
"""Set media emulation for the page."""
|
||||
return self.send("Emulation.setEmulatedMedia", {"media": media_type})
|
||||
|
||||
def set_cookies(self):
|
||||
if frappe.session and frappe.session.sid and hasattr(frappe.local, "request"):
|
||||
domain = frappe.utils.get_host_name().split(":", 1)[0]
|
||||
cookie = {
|
||||
"name": "sid",
|
||||
"value": frappe.session.sid,
|
||||
"domain": domain,
|
||||
"sameSite": "Strict",
|
||||
}
|
||||
_result, error = self.send("Network.enable")
|
||||
if error:
|
||||
raise RuntimeError(f"Error enabling network: {error}")
|
||||
_result, error = self.send("Network.setCookie", cookie)
|
||||
if error:
|
||||
raise RuntimeError(f"Error setting cookie: {error}")
|
||||
_result, error = self.send("Network.disable")
|
||||
if error:
|
||||
raise RuntimeError(f"Error disabling network: {error}")
|
||||
|
||||
def intercept_request_and_fulfill(self, url_pattern):
|
||||
"""Starts intercepting network requests for the given target_id and URL pattern."""
|
||||
data = {}
|
||||
|
||||
def on_request_paused_event(future, response):
|
||||
"""Callback for when a request is paused (intercepted)."""
|
||||
params = response.get("params")
|
||||
if params and params.get("requestId"):
|
||||
data["request_id"] = params["requestId"]
|
||||
if not future.done():
|
||||
future.set_result(data["request_id"])
|
||||
|
||||
# Start listening for requestPaused event
|
||||
event = self.session.start_listener(
|
||||
"Fetch.requestPaused", on_request_paused_event, self.session_id, self.target_id, self.frame_id
|
||||
)
|
||||
|
||||
# Enable request interception for the specified URL pattern
|
||||
self.session.send("Fetch.enable", {"patterns": [{"urlPattern": url_pattern}]})
|
||||
|
||||
def intercept_and_fulfill():
|
||||
self.session.wait_for_event(event)
|
||||
self.session.send(
|
||||
"Fetch.fulfillRequest",
|
||||
{"requestId": event[1].result(), "responseCode": 200},
|
||||
return_future=True,
|
||||
)
|
||||
self.session.remove_listener("Fetch.requestPaused", event)
|
||||
|
||||
return intercept_and_fulfill
|
||||
|
||||
def intercept_request_for_local_resources(self, url_pattern="*"):
|
||||
"""Starts intercepting network requests for the given target_id and URL pattern."""
|
||||
data = {}
|
||||
|
||||
def on_request_paused_event(future, response):
|
||||
"""Callback for when a request is paused (intercepted)."""
|
||||
params = response.get("params")
|
||||
if params and params.get("requestId"):
|
||||
data["request_id"] = params["requestId"]
|
||||
url = params["request"]["url"]
|
||||
|
||||
if url.startswith(frappe.request.host_url):
|
||||
path = url.replace(frappe.request.host_url, "").split("?v", 1)[0]
|
||||
if path.startswith("assets/") or path.startswith("files/"):
|
||||
path = urllib.parse.unquote(path)
|
||||
if path.startswith("files/"):
|
||||
path = frappe.utils.get_site_path("public", path)
|
||||
content = frappe.read_file(path, as_base64=True)
|
||||
response_headers = []
|
||||
# write logic to handle all file types as required
|
||||
if path.endswith(".svg"):
|
||||
response_headers.append({"name": "Content-Type", "value": "image/svg+xml"})
|
||||
if content:
|
||||
self.session.send(
|
||||
"Fetch.fulfillRequest",
|
||||
{
|
||||
"requestId": data["request_id"],
|
||||
"responseCode": 200, # actually hande the response code from the request
|
||||
"responseHeaders": response_headers,
|
||||
"body": content,
|
||||
},
|
||||
return_future=True,
|
||||
)
|
||||
return
|
||||
self.session.send(
|
||||
"Fetch.continueRequest",
|
||||
{"requestId": data["request_id"]},
|
||||
return_future=True,
|
||||
)
|
||||
|
||||
# Start listening for requestPaused event
|
||||
self.session.start_listener(
|
||||
"Fetch.requestPaused", on_request_paused_event, self.session_id, self.target_id, self.frame_id
|
||||
)
|
||||
|
||||
# Enable request interception for the specified URL pattern
|
||||
self.session.send("Fetch.enable", {"patterns": [{"urlPattern": url_pattern}]})
|
||||
|
||||
def set_tab_url(self, url):
|
||||
"""Navigate to a URL and fulfill the request with status code 200."""
|
||||
|
||||
# Intercept and fulfill request with 200 status code
|
||||
wait_and_fulfill = self.intercept_request_and_fulfill(url)
|
||||
# Now, navigate after intercepting the request
|
||||
wait_start = self.wait_for_load(wait_for="load")
|
||||
page_navigate = self.send("Page.navigate", {"url": url}, return_future=True)
|
||||
wait_and_fulfill()
|
||||
|
||||
def wait_for_navigate():
|
||||
self.session.wait_for_event(page_navigate, 3)
|
||||
wait_start()
|
||||
|
||||
self.wait_for_navigate = wait_for_navigate
|
||||
|
||||
def evaluate(self, expression, await_promise=False):
|
||||
self.send("Runtime.enable")
|
||||
result, error = self.send(
|
||||
"Runtime.evaluate", {"expression": expression, "awaitPromise": await_promise}
|
||||
)
|
||||
if error:
|
||||
# retry if error in 500ms for 3 times (just safe guard as i had few edge cases where it failed).
|
||||
# waiting for network is still slower than this.
|
||||
for _i in range(3):
|
||||
print(f"Error evaluating expression: {error}. Retrying in 500ms")
|
||||
time.sleep(0.5)
|
||||
result, error = self.send(
|
||||
"Runtime.evaluate", {"expression": expression, "awaitPromise": await_promise}
|
||||
)
|
||||
if not error:
|
||||
break
|
||||
raise RuntimeError(f"Error evaluating expression: {error}")
|
||||
|
||||
self.send("Runtime.disable")
|
||||
return result
|
||||
|
||||
# set wait_for to networkIdle if pdf is not rendering correctly.
|
||||
# if you face header Height to be incorrect as some external script is changing elements.
|
||||
# networkIdle is most stable option but make it a lot slower so avoiding for now. enable if not stable
|
||||
def set_content(self, html, wait_for=None):
|
||||
if not wait_for:
|
||||
wait_for = ["load", "DOMContentLoaded"]
|
||||
self.intercept_request_for_local_resources()
|
||||
wait_start = self.wait_for_load(wait_for=wait_for)
|
||||
self.send("Page.setDocumentContent", {"frameId": self._ensure_frame_id(), "html": html})
|
||||
self.wait_for_set_content = wait_start
|
||||
|
||||
def wait_for_load(self, wait_for, timeout=60):
|
||||
self.send("Page.setLifecycleEventsEnabled", {"enabled": True})
|
||||
status = {}
|
||||
if isinstance(wait_for, str):
|
||||
status[wait_for] = False
|
||||
if isinstance(wait_for, list):
|
||||
for event in wait_for:
|
||||
status[event] = False
|
||||
|
||||
def on_lifecycle_event(future, response):
|
||||
params = response.get("params", {})
|
||||
if params.get("name") in status.keys():
|
||||
status[params.get("name")] = True
|
||||
if all(status.values()):
|
||||
if not future.done():
|
||||
future.set_result(response)
|
||||
|
||||
event = self.session.start_listener(
|
||||
"Page.lifecycleEvent", on_lifecycle_event, self.session_id, self.target_id, self.frame_id
|
||||
)
|
||||
|
||||
def start_wait():
|
||||
self.session.wait_for_event(event, timeout)
|
||||
self.session.remove_listener("Page.lifecycleEvent", event)
|
||||
|
||||
return start_wait
|
||||
|
||||
def get_element_height(self, selector="body"):
|
||||
try:
|
||||
if not self.is_print_designer:
|
||||
selector = ".wrapper"
|
||||
self.send("DOM.enable")
|
||||
doc_result, doc_error = self.send("DOM.getDocument")
|
||||
if doc_error:
|
||||
raise RuntimeError(f"Error getting document node: {doc_error}")
|
||||
doc_node_id = doc_result["root"]["nodeId"]
|
||||
result, error = self.send("DOM.querySelector", {"nodeId": doc_node_id, "selector": selector})
|
||||
if error:
|
||||
raise RuntimeError(f"Error querying selector: {error}")
|
||||
node_id = result["nodeId"]
|
||||
result, error = self.send("DOM.getBoxModel", {"nodeId": node_id})
|
||||
if error:
|
||||
raise RuntimeError(f"Error getting computed style: {error}")
|
||||
height = result["model"]["height"]
|
||||
finally:
|
||||
self.send("DOM.disable")
|
||||
return height
|
||||
|
||||
def add_page_size_css(self):
|
||||
width = str(self.options["paperWidth"]) + "in"
|
||||
height = str(self.options["paperHeight"]) + "in"
|
||||
marginLeft = str(self.options["marginLeft"]) + "in"
|
||||
marginRight = str(self.options["marginRight"]) + "in"
|
||||
marginTop = str(self.options["marginTop"]) + "in"
|
||||
marginBottom = str(self.options["marginBottom"]) + "in"
|
||||
|
||||
# Enable DOM and CSS agents
|
||||
result, error = self.send("DOM.enable")
|
||||
if error:
|
||||
raise RuntimeError(f"Error enabling DOM: {error}")
|
||||
|
||||
result, error = self.send("CSS.enable")
|
||||
if error:
|
||||
raise RuntimeError(f"Error enabling CSS: {error}")
|
||||
|
||||
# Create a new stylesheet
|
||||
result, error = self.send("CSS.createStyleSheet", {"frameId": self._ensure_frame_id()})
|
||||
if error:
|
||||
raise RuntimeError(f"Error creating stylesheet: {error}")
|
||||
|
||||
style_sheet_id = result["styleSheetId"]
|
||||
|
||||
# Define the CSS rule for the page size
|
||||
css_rule = f"""
|
||||
@page {{
|
||||
size: {width} {height};
|
||||
margin: {marginTop} {marginRight} {marginBottom} {marginLeft};
|
||||
}}
|
||||
"""
|
||||
|
||||
# Apply the CSS rule to the created stylesheet
|
||||
result, error = self.send("CSS.setStyleSheetText", {"styleSheetId": style_sheet_id, "text": css_rule})
|
||||
|
||||
if error:
|
||||
raise RuntimeError(f"Error setting stylesheet text: {error}")
|
||||
|
||||
self.send("CSS.disable")
|
||||
self.send("DOM.disable")
|
||||
|
||||
def generate_pdf(self, wait_for_pdf=True, raw=False):
|
||||
self.add_page_size_css()
|
||||
if not wait_for_pdf:
|
||||
self.wait_for_pdf = self.send("Page.printToPDF", self.options, return_future=True)
|
||||
return
|
||||
|
||||
result, error = self.send("Page.printToPDF", self.options)
|
||||
if error:
|
||||
raise RuntimeError(f"Error generating PDF: {error}")
|
||||
if "stream" not in result:
|
||||
raise ValueError("Stream handle not returned from Page.printToPDF")
|
||||
return self.get_pdf_from_stream(result["stream"], raw)
|
||||
|
||||
def get_pdf_stream_id(self):
|
||||
# wait for task to complete
|
||||
self.session.wait_for_event(self.wait_for_pdf)
|
||||
# wait for event to complete
|
||||
task = self.wait_for_pdf.result()
|
||||
future = task.result()
|
||||
stream_id = future["result"]["stream"]
|
||||
return stream_id
|
||||
|
||||
def get_pdf_from_stream(self, stream_id, raw=False):
|
||||
from io import BytesIO
|
||||
|
||||
from pypdf import PdfReader
|
||||
|
||||
pdf_data = b""
|
||||
offset = 0
|
||||
while True:
|
||||
chunk_result, error = self.send("IO.read", {"handle": stream_id, "offset": offset, "size": 4096})
|
||||
if error:
|
||||
raise RuntimeError(f"Error reading PDF chunk: {error}")
|
||||
chunk_data = chunk_result["data"]
|
||||
# we don't use base64Encode option but added check anyway as it is one of the valid options.
|
||||
if chunk_result.get("base64Encoded", False):
|
||||
chunk_data = base64.b64decode(chunk_data)
|
||||
pdf_data += chunk_data
|
||||
offset += len(chunk_data)
|
||||
if chunk_result.get("eof", False):
|
||||
break
|
||||
|
||||
_result, error = self.send("IO.close", {"handle": stream_id})
|
||||
if error:
|
||||
raise RuntimeError(f"Error closing PDF stream: {error}")
|
||||
|
||||
if raw:
|
||||
return pdf_data
|
||||
|
||||
return PdfReader(BytesIO(pdf_data))
|
||||
|
||||
def close(self):
|
||||
self.session.send("Fetch.disable")
|
||||
_result, error = self.send("Target.closeTarget", {"targetId": self.target_id})
|
||||
if error:
|
||||
raise RuntimeError(f"Error closing target: {error}")
|
||||
117
frappe/utils/pdf_generator/pdf_merge.py
Normal file
117
frappe/utils/pdf_generator/pdf_merge.py
Normal file
|
|
@ -0,0 +1,117 @@
|
|||
class PDFTransformer:
|
||||
def __init__(self, browser):
|
||||
self.browser = browser
|
||||
self.body_pdf = browser.body_pdf
|
||||
self.is_print_designer = browser.is_print_designer
|
||||
self._set_header_pdf()
|
||||
self._set_footer_pdf()
|
||||
if not self.header_pdf and not self.footer_pdf:
|
||||
return
|
||||
self.no_of_pages = len(self.body_pdf.pages)
|
||||
self.encrypt_password = self.browser.options.get("password", None)
|
||||
# if not header / footer then return body pdf
|
||||
|
||||
def _set_header_pdf(self):
|
||||
self.header_pdf = None
|
||||
if hasattr(self.browser, "header_pdf"):
|
||||
self.header_pdf = self.browser.header_pdf
|
||||
self.is_header_dynamic = self.browser.is_header_dynamic
|
||||
|
||||
def _set_footer_pdf(self):
|
||||
self.footer_pdf = None
|
||||
if hasattr(self.browser, "footer_pdf"):
|
||||
self.footer_pdf = self.browser.footer_pdf
|
||||
self.is_footer_dynamic = self.browser.is_footer_dynamic
|
||||
|
||||
def transform_pdf(self, output=None):
|
||||
from pypdf import PdfWriter
|
||||
|
||||
header = self.header_pdf
|
||||
body = self.body_pdf
|
||||
footer = self.footer_pdf
|
||||
|
||||
if not header and not footer:
|
||||
return body
|
||||
|
||||
body_height = body.pages[0].mediabox.top
|
||||
body_transform = header_height = footer_height = header_body_top = 0
|
||||
|
||||
if footer:
|
||||
footer_height = footer.pages[0].mediabox.top
|
||||
body_transform = footer_height
|
||||
|
||||
if header:
|
||||
header_height = header.pages[0].mediabox.top
|
||||
header_transform = body_height + footer_height
|
||||
header_body_top = header_height + body_height + footer_height
|
||||
|
||||
if header and not self.is_header_dynamic:
|
||||
for h in header.pages:
|
||||
self._transform(h, header_body_top, header_transform)
|
||||
|
||||
for p in body.pages:
|
||||
if header_body_top:
|
||||
self._transform(p, header_body_top, body_transform)
|
||||
if header:
|
||||
if self.is_header_dynamic:
|
||||
p.merge_page(
|
||||
self._transform(header.pages[p.page_number], header_body_top, header_transform)
|
||||
)
|
||||
elif self.is_print_designer:
|
||||
if p.page_number == 0:
|
||||
p.merge_page(header.pages[0])
|
||||
elif p.page_number == self.no_of_pages - 1:
|
||||
p.merge_page(header.pages[3])
|
||||
elif p.page_number % 2 == 0:
|
||||
p.merge_page(header.pages[2])
|
||||
else:
|
||||
p.merge_page(header.pages[1])
|
||||
else:
|
||||
p.merge_page(header.pages[0])
|
||||
|
||||
if footer:
|
||||
if self.is_footer_dynamic:
|
||||
p.merge_page(footer.pages[p.page_number])
|
||||
elif self.is_print_designer:
|
||||
if p.page_number == 0:
|
||||
p.merge_page(footer.pages[0])
|
||||
elif p.page_number == self.no_of_pages - 1:
|
||||
p.merge_page(footer.pages[3])
|
||||
elif p.page_number % 2 == 0:
|
||||
p.merge_page(footer.pages[2])
|
||||
else:
|
||||
p.merge_page(footer.pages[1])
|
||||
else:
|
||||
p.merge_page(footer.pages[0])
|
||||
|
||||
if output:
|
||||
output.append_pages_from_reader(body)
|
||||
return output
|
||||
|
||||
writer = PdfWriter()
|
||||
writer.append_pages_from_reader(body)
|
||||
if self.encrypt_password:
|
||||
writer.encrypt(self.encrypt_password)
|
||||
|
||||
return self.get_file_data_from_writer(writer)
|
||||
|
||||
def _transform(self, page, page_top, ty):
|
||||
from pypdf import PdfWriter, Transformation
|
||||
|
||||
transform = Transformation().translate(ty=ty)
|
||||
page.mediabox.upper_right = (page.mediabox.right, page_top)
|
||||
page.add_transformation(transform)
|
||||
return page
|
||||
|
||||
def get_file_data_from_writer(self, writer_obj):
|
||||
from io import BytesIO
|
||||
|
||||
# https://docs.python.org/3/library/io.html
|
||||
stream = BytesIO()
|
||||
writer_obj.write(stream)
|
||||
|
||||
# Change the stream position to start of the stream
|
||||
stream.seek(0)
|
||||
|
||||
# Read up to size bytes from the object and return them
|
||||
return stream.read()
|
||||
|
|
@ -229,6 +229,9 @@ def download_pdf(
|
|||
letterhead=None,
|
||||
pdf_generator: Literal["wkhtmltopdf", "chrome"] | None = None,
|
||||
):
|
||||
if pdf_generator is None:
|
||||
pdf_generator = "wkhtmltopdf"
|
||||
|
||||
doc = doc or frappe.get_doc(doctype, name)
|
||||
validate_print_permission(doc)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,8 +1,18 @@
|
|||
import os
|
||||
import re
|
||||
from typing import Literal
|
||||
|
||||
import click
|
||||
|
||||
import frappe
|
||||
from frappe.utils.data import cint, cstr
|
||||
|
||||
EXECUTABLE_PATHS = {
|
||||
"linux": ["chrome-linux", "headless_shell"],
|
||||
"darwin": ["chrome-mac", "headless_shell"],
|
||||
"windows": ["chrome-win", "headless_shell.exe"],
|
||||
}
|
||||
|
||||
|
||||
def get_print(
|
||||
doctype=None,
|
||||
|
|
@ -144,3 +154,336 @@ def attach_print(
|
|||
file_name = cstr(file_name).replace(" ", "").replace("/", "-") + ext
|
||||
|
||||
return {"fname": file_name, "fcontent": content}
|
||||
|
||||
|
||||
def setup_chromium():
|
||||
"""Setup Chromium at the bench level."""
|
||||
# Load Chromium version from common_site_config.json or use default
|
||||
|
||||
try:
|
||||
executable = find_or_download_chromium_executable()
|
||||
click.echo(f"Chromium is already set up at {executable}")
|
||||
except Exception as e:
|
||||
click.echo(f"Failed to setup Chromium: {e}")
|
||||
raise RuntimeError(f"Failed to setup Chromium: {e}")
|
||||
return executable
|
||||
|
||||
|
||||
def find_or_download_chromium_executable():
|
||||
"""Finds the Chromium executable or downloads if not found."""
|
||||
import platform
|
||||
from pathlib import Path
|
||||
|
||||
bench_path = frappe.utils.get_bench_path()
|
||||
"""Determine the path to the Chromium executable."""
|
||||
chromium_dir = os.path.join(bench_path, "chromium")
|
||||
|
||||
platform_name = platform.system().lower()
|
||||
|
||||
if platform_name not in ["linux", "darwin", "windows"]:
|
||||
click.echo(f"Unsupported platform: {platform_name}")
|
||||
|
||||
executable_name = EXECUTABLE_PATHS.get(platform_name)
|
||||
|
||||
# Construct the full path to the executable
|
||||
exec_path = Path(chromium_dir).joinpath(*executable_name)
|
||||
if not exec_path.exists():
|
||||
click.echo("Chromium is not available. downloading...")
|
||||
download_chromium()
|
||||
|
||||
if not exec_path.exists():
|
||||
click.echo("Error while downloading chrome")
|
||||
|
||||
return str(exec_path)
|
||||
|
||||
|
||||
def download_chromium():
|
||||
import platform
|
||||
import shutil
|
||||
import zipfile
|
||||
|
||||
import requests
|
||||
|
||||
bench_path = frappe.utils.get_bench_path()
|
||||
"""Download and extract Chromium for the specific version at the bench level."""
|
||||
chromium_dir = os.path.join(bench_path, "chromium")
|
||||
|
||||
# Remove old Chromium directory if it exists
|
||||
if os.path.exists(chromium_dir):
|
||||
click.echo("Removing old Chromium directory...")
|
||||
shutil.rmtree(chromium_dir, ignore_errors=True)
|
||||
|
||||
os.makedirs(chromium_dir, exist_ok=True)
|
||||
|
||||
download_url = get_chromium_download_url()
|
||||
file_name = os.path.basename(download_url)
|
||||
zip_path = os.path.join(chromium_dir, file_name)
|
||||
|
||||
try:
|
||||
click.echo(f"Downloading Chromium from {download_url}...")
|
||||
# playwright's requires a user agent
|
||||
headers = {"User-Agent": "Wget/1.21.1"}
|
||||
with requests.get(download_url, stream=True, timeout=(10, 60), headers=headers) as r:
|
||||
r.raise_for_status() # Raise an error for bad status codes
|
||||
total_size = int(r.headers.get("content-length", 0)) # Get total file size
|
||||
bar = click.progressbar(length=total_size, label="Downloading Chromium")
|
||||
with open(zip_path, "wb") as f:
|
||||
for chunk in r.iter_content(chunk_size=65536):
|
||||
f.write(chunk)
|
||||
bar.update(len(chunk))
|
||||
|
||||
click.echo("Extracting Chromium...")
|
||||
with zipfile.ZipFile(zip_path, "r") as zip_ref:
|
||||
zip_ref.extractall(chromium_dir)
|
||||
|
||||
if os.path.exists(zip_path):
|
||||
os.remove(zip_path)
|
||||
|
||||
# There should be only one directory
|
||||
# Ensure the correct directory is renamed
|
||||
extracted = os.listdir(chromium_dir)[0]
|
||||
executable_path = EXECUTABLE_PATHS[platform.system().lower()]
|
||||
chrome_folder_name = executable_path[0]
|
||||
|
||||
if extracted != chrome_folder_name:
|
||||
extracted_dir = os.path.join(chromium_dir, extracted)
|
||||
renamed_dir = os.path.join(chromium_dir, chrome_folder_name)
|
||||
if os.path.exists(extracted_dir):
|
||||
click.echo(f"Renaming {extracted_dir} to {renamed_dir}")
|
||||
os.rename(extracted_dir, renamed_dir)
|
||||
else:
|
||||
raise RuntimeError(f"Failed to rename extracted directory. Expected {chrome_folder_name}.")
|
||||
if os.path.exists(renamed_dir):
|
||||
executable_shell = os.path.join(renamed_dir, "chrome-headless-shell")
|
||||
if os.path.exists(executable_shell):
|
||||
os.rename(executable_shell, os.path.join(renamed_dir, "headless_shell"))
|
||||
else:
|
||||
raise RuntimeError("Failed to rename executable. Expected chrome-headless-shell.")
|
||||
# Make the `headless_shell` executable
|
||||
exec_path = os.path.join(renamed_dir, executable_path[1])
|
||||
make_chromium_executable(exec_path)
|
||||
|
||||
click.echo(f"Chromium is ready to use at: {chromium_dir}")
|
||||
except requests.Timeout:
|
||||
click.echo("Download timed out. Check your internet connection.")
|
||||
raise RuntimeError("Download timed out.")
|
||||
except requests.ConnectionError:
|
||||
click.echo("Failed to connect to Chromium download server.")
|
||||
raise RuntimeError("Connection error.")
|
||||
except requests.RequestException as e:
|
||||
click.echo(f"Failed to download Chromium: {e}")
|
||||
raise RuntimeError(f"Failed to download Chromium: {e}")
|
||||
except zipfile.BadZipFile as e:
|
||||
click.echo(f"Failed to extract Chromium: {e}")
|
||||
raise RuntimeError(f"Failed to extract Chromium: {e}")
|
||||
|
||||
|
||||
def get_chromium_download_url():
|
||||
# Avoid this unless it is going to run on a single type of platform and you have the correct binary hosted.
|
||||
common_config = frappe.get_common_site_config()
|
||||
|
||||
chrome_download_url = common_config.get("chromium_download_url", None)
|
||||
|
||||
if chrome_download_url:
|
||||
return chrome_download_url
|
||||
|
||||
"""
|
||||
We are going to use chrome-for-testing builds but unfortunately it doesn't have linux arm64 https://github.com/GoogleChromeLabs/chrome-for-testing/issues/1
|
||||
so we will use playwright's fallback builds for linux arm64
|
||||
TODO: we will also use the fallback builds for windows arm
|
||||
https://community.arm.com/arm-community-blogs/b/tools-software-ides-blog/posts/native-chromium-builds-windows-on-arm
|
||||
"""
|
||||
"""
|
||||
To find the CHROME_VERSION AND CHROME_FALLBACK_VERSION, follow these steps:
|
||||
1. Visit the GitHub Actions page for Playwright: https://github.com/microsoft/playwright/actions/workflows/roll_browser_into_playwright.yml
|
||||
2. Open the latest job run.
|
||||
3. Navigate to the "Roll to New Browser Version" step.
|
||||
4. In the logs, look for a line similar to:
|
||||
Downloading Chromium 133.0.6943.16 (playwright build v1155)
|
||||
Here, the first number (e.g., 133.0.6943.16) is the CHROME_VERSION, and the second number (e.g., 1155) is the CHROME_FALLBACK_VERSION.
|
||||
"""
|
||||
# Using Google's chrome-for-testing-public builds for most platforms. (close to end user experience)
|
||||
# For Linux ARM64, we use Playwright's Chromium builds due to the lack of official support.
|
||||
|
||||
download_path = {
|
||||
"linux64": "%s/linux64/chrome-headless-shell-linux64.zip",
|
||||
"mac-arm64": "%s/mac-arm64/chrome-headless-shell-mac-arm64.zip",
|
||||
"mac-x64": "%s/mac-x64/chrome-headless-shell-mac-x64.zip",
|
||||
"win32": "%s/win32/chrome-headless-shell-win32.zip",
|
||||
"win64": "%s/win64/chrome-headless-shell-win64.zip",
|
||||
}
|
||||
linux_arm_download_path = {
|
||||
"ubuntu20.04-arm64": "%s/chromium-headless-shell-linux-arm64.zip",
|
||||
"ubuntu22.04-arm64": "%s/chromium-headless-shell-linux-arm64.zip",
|
||||
"ubuntu24.04-arm64": "%s/chromium-headless-shell-linux-arm64.zip",
|
||||
"debian11-arm64": "%s/chromium-headless-shell-linux-arm64.zip",
|
||||
"debian12-arm64": "%s/chromium-headless-shell-linux-arm64.zip",
|
||||
}
|
||||
|
||||
platform_key = calculate_platform()
|
||||
|
||||
version = "133.0.6943.35"
|
||||
playwright_build_version = "1157"
|
||||
|
||||
base_url = "https://storage.googleapis.com/chrome-for-testing-public/"
|
||||
playwright_base_url = "https://cdn.playwright.dev/dbazure/download/playwright/builds/chromium/"
|
||||
|
||||
# Overwrite with values from common_site_config.json ( escape hatch )
|
||||
version = common_config.get("chromium_version", version)
|
||||
playwright_build_version = common_config.get("playwright_chromium_version", playwright_build_version)
|
||||
# make sure that you have all required flavours at correct urls
|
||||
base_url = common_config.get("chromium_download_base_url", base_url)
|
||||
playwright_base_url = common_config.get("playwright_chromium_download_base_url", playwright_base_url)
|
||||
|
||||
if platform_key in download_path:
|
||||
relative_path = download_path[platform_key]
|
||||
elif platform_key in linux_arm_download_path:
|
||||
version = playwright_build_version
|
||||
base_url = playwright_base_url
|
||||
relative_path = linux_arm_download_path[platform_key]
|
||||
else:
|
||||
frappe.throw(
|
||||
f"No download path configured or Chromium download not available for platform: {platform_key}"
|
||||
)
|
||||
|
||||
return f"{base_url}{relative_path % version}"
|
||||
|
||||
|
||||
def make_chromium_executable(executable):
|
||||
"""Make the Chromium executable."""
|
||||
if os.path.exists(executable):
|
||||
# check if the file is executable
|
||||
if os.access(executable, os.X_OK):
|
||||
click.echo(f"Chromium executable is already executable: {executable}")
|
||||
return
|
||||
click.echo(f"Making Chromium executable: {executable}")
|
||||
os.chmod(executable, 0o755) # Set executable permissions
|
||||
click.echo(f"Chromium executable permissions set: {executable}")
|
||||
else:
|
||||
raise RuntimeError(f"Chromium executable not found: {executable}.")
|
||||
|
||||
|
||||
def calculate_platform():
|
||||
"""
|
||||
Determines the host platform and returns it as a string.
|
||||
Includes logic for Linux ARM, Linux x64, macOS (Intel and ARM), and Windows (32-bit and 64-bit).
|
||||
|
||||
Returns:
|
||||
str: The detected platform string (e.g., 'linux64', 'mac-arm64', etc.).
|
||||
"""
|
||||
import platform
|
||||
|
||||
system = platform.system().lower()
|
||||
arch = platform.machine().lower()
|
||||
|
||||
# Handle Linux ARM-specific logic
|
||||
if system == "linux" and arch == "aarch64":
|
||||
distro_info = get_linux_distribution_info()
|
||||
distro_id = distro_info.get("id", "")
|
||||
version = distro_info.get("version", "")
|
||||
major_version = int(version.split(".")[0]) if version else 0
|
||||
|
||||
if distro_id == "ubuntu":
|
||||
if major_version < 20:
|
||||
return "ubuntu18.04-arm64"
|
||||
if major_version < 22:
|
||||
return "ubuntu20.04-arm64"
|
||||
if major_version < 24:
|
||||
return "ubuntu22.04-arm64"
|
||||
if major_version < 26:
|
||||
return "ubuntu24.04-arm64"
|
||||
return "<unknown>"
|
||||
|
||||
if distro_id in ["debian", "raspbian"]:
|
||||
if major_version < 11:
|
||||
return "debian10-arm64"
|
||||
if major_version < 12:
|
||||
return "debian11-arm64"
|
||||
return "debian12-arm64"
|
||||
return "<unknown>"
|
||||
|
||||
# Handle other platforms
|
||||
elif system == "linux" and arch == "x86_64":
|
||||
return "linux64"
|
||||
elif system == "darwin" and arch == "arm64":
|
||||
return "mac-arm64"
|
||||
elif system == "darwin" and arch == "x86_64":
|
||||
return "mac-x64"
|
||||
elif system == "windows" and arch == "x86":
|
||||
return "win32"
|
||||
elif system == "windows" and arch == "x86_64":
|
||||
return "win64"
|
||||
|
||||
return "<unknown>"
|
||||
|
||||
|
||||
def get_linux_distribution_info():
|
||||
# not tested
|
||||
"""Retrieve Linux distribution information using the `distro` library."""
|
||||
import distro
|
||||
|
||||
if not distro:
|
||||
return {"id": "", "version": ""}
|
||||
|
||||
return {"id": distro.id().lower(), "version": distro.version()}
|
||||
|
||||
|
||||
def parse_float_and_unit(input_text, default_unit="px"):
|
||||
if isinstance(input_text, int | float):
|
||||
return {"value": input_text, "unit": default_unit}
|
||||
if not isinstance(input_text, str):
|
||||
return
|
||||
|
||||
number = float(re.search(r"[+-]?([0-9]*[.])?[0-9]+", input_text).group())
|
||||
valid_units = [r"px", r"mm", r"cm", r"in"]
|
||||
unit = [match.group() for rx in valid_units if (match := re.search(rx, input_text))]
|
||||
|
||||
return {"value": number, "unit": unit[0] if len(unit) == 1 else default_unit}
|
||||
|
||||
|
||||
def convert_uom(
|
||||
number: float,
|
||||
from_uom: Literal["px", "mm", "cm", "in"] = "px",
|
||||
to_uom: Literal["px", "mm", "cm", "in"] = "px",
|
||||
only_number: bool = False,
|
||||
) -> float:
|
||||
unit_values = {
|
||||
"px": 1,
|
||||
"mm": 3.7795275591,
|
||||
"cm": 37.795275591,
|
||||
"in": 96,
|
||||
}
|
||||
from_px = (
|
||||
{
|
||||
"to_px": 1,
|
||||
"to_mm": unit_values["px"] / unit_values["mm"],
|
||||
"to_cm": unit_values["px"] / unit_values["cm"],
|
||||
"to_in": unit_values["px"] / unit_values["in"],
|
||||
},
|
||||
)
|
||||
from_mm = (
|
||||
{
|
||||
"to_mm": 1,
|
||||
"to_px": unit_values["mm"] / unit_values["px"],
|
||||
"to_cm": unit_values["mm"] / unit_values["cm"],
|
||||
"to_in": unit_values["mm"] / unit_values["in"],
|
||||
},
|
||||
)
|
||||
from_cm = (
|
||||
{
|
||||
"to_cm": 1,
|
||||
"to_px": unit_values["cm"] / unit_values["px"],
|
||||
"to_mm": unit_values["cm"] / unit_values["mm"],
|
||||
"to_in": unit_values["cm"] / unit_values["in"],
|
||||
},
|
||||
)
|
||||
from_in = {
|
||||
"to_in": 1,
|
||||
"to_px": unit_values["in"] / unit_values["px"],
|
||||
"to_mm": unit_values["in"] / unit_values["mm"],
|
||||
"to_cm": unit_values["in"] / unit_values["cm"],
|
||||
}
|
||||
converstion_factor = ({"from_px": from_px, "from_mm": from_mm, "from_cm": from_cm, "from_in": from_in},)
|
||||
if only_number:
|
||||
return round(number * converstion_factor[0][f"from_{from_uom}"][0][f"to_{to_uom}"], 3)
|
||||
return f"{round(number * converstion_factor[0][f'from_{from_uom}'][0][f'to_{to_uom}'], 3)}{to_uom}"
|
||||
|
|
|
|||
|
|
@ -90,6 +90,8 @@ dependencies = [
|
|||
"posthog~=5.0.0",
|
||||
"vobject~=0.9.9",
|
||||
"pycountry~=24.6.1",
|
||||
|
||||
"websockets"
|
||||
]
|
||||
|
||||
[project.urls]
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue