diff --git a/frappe/commands/utils.py b/frappe/commands/utils.py
index bfbd1b1052..afafadcbaf 100644
--- a/frappe/commands/utils.py
+++ b/frappe/commands/utils.py
@@ -1027,6 +1027,13 @@ def list_sites(context: CliCtxObj, output_json=False):
click.echo("No sites found")
+@click.command("setup-chrome")
+def setup_chrome():
+ from frappe.utils.print_utils import setup_chromium
+
+ setup_chromium()
+
+
commands = [
build,
clear_cache,
@@ -1059,4 +1066,5 @@ commands = [
add_to_email_queue,
rebuild_global_search,
list_sites,
+ setup_chrome,
]
diff --git a/frappe/hooks.py b/frappe/hooks.py
index d4de0d0047..b8c8901320 100644
--- a/frappe/hooks.py
+++ b/frappe/hooks.py
@@ -89,7 +89,7 @@ on_logout = "frappe.core.doctype.session_default_settings.session_default_settin
pdf_header_html = "frappe.utils.pdf.pdf_header_html"
pdf_body_html = "frappe.utils.pdf.pdf_body_html"
pdf_footer_html = "frappe.utils.pdf.pdf_footer_html"
-
+pdf_generator = "frappe.utils.pdf.get_chrome_pdf"
# permissions
permission_query_conditions = {
diff --git a/frappe/printing/doctype/print_format/print_format.json b/frappe/printing/doctype/print_format/print_format.json
index 8af853fecc..2a7b3c0215 100644
--- a/frappe/printing/doctype/print_format/print_format.json
+++ b/frappe/printing/doctype/print_format/print_format.json
@@ -268,7 +268,7 @@
"fieldname": "pdf_generator",
"fieldtype": "Select",
"label": "PDF Generator",
- "options": "wkhtmltopdf"
+ "options": "wkhtmltopdf\nchrome"
},
{
"default": "DocType",
@@ -292,7 +292,7 @@
"icon": "fa fa-print",
"idx": 1,
"links": [],
- "modified": "2025-09-16 11:20:20.151669",
+ "modified": "2025-09-23 10:39:51.123539",
"modified_by": "Administrator",
"module": "Printing",
"name": "Print Format",
diff --git a/frappe/printing/doctype/print_format/print_format.py b/frappe/printing/doctype/print_format/print_format.py
index a5456b37bc..c122ec56bc 100644
--- a/frappe/printing/doctype/print_format/print_format.py
+++ b/frappe/printing/doctype/print_format/print_format.py
@@ -40,7 +40,7 @@ class PrintFormat(Document):
page_number: DF.Literal[
"Hide", "Top Left", "Top Center", "Top Right", "Bottom Left", "Bottom Center", "Bottom Right"
]
- pdf_generator: DF.Literal["wkhtmltopdf"]
+ pdf_generator: DF.Literal["wkhtmltopdf", "chrome"]
print_format_builder: DF.Check
print_format_builder_beta: DF.Check
print_format_for: DF.Literal["DocType", "Report"]
diff --git a/frappe/printing/page/print/print.js b/frappe/printing/page/print/print.js
index e8b05bd10a..3b464de241 100644
--- a/frappe/printing/page/print/print.js
+++ b/frappe/printing/page/print/print.js
@@ -680,11 +680,15 @@ frappe.ui.form.PrintView = class {
}
} else {
this.is_wkhtmltopdf_valid();
- this.render_page("/api/method/frappe.utils.print_format.download_pdf?");
+ this.render_page(
+ "/api/method/frappe.utils.print_format.download_pdf?",
+ false,
+ print_format?.pdf_generator
+ );
}
}
- render_page(method, printit = false) {
+ render_page(method, printit = false, pdf_generator = "wkhtmltopdf") {
let w = window.open(
frappe.urllib.get_full_url(
method +
@@ -701,7 +705,9 @@ frappe.ui.form.PrintView = class {
encodeURIComponent(this.get_letterhead()) +
"&settings=" +
encodeURIComponent(JSON.stringify(this.additional_settings)) +
- (this.lang_code ? "&_lang=" + this.lang_code : "")
+ (this.lang_code ? "&_lang=" + this.lang_code : "") +
+ "&pdf_generator=" +
+ encodeURIComponent(pdf_generator)
)
);
if (!w) {
diff --git a/frappe/templates/print_formats/chrome_pdf_header_footer.html b/frappe/templates/print_formats/chrome_pdf_header_footer.html
new file mode 100644
index 0000000000..276a2961dc
--- /dev/null
+++ b/frappe/templates/print_formats/chrome_pdf_header_footer.html
@@ -0,0 +1,49 @@
+
+
+
+
+ {% for tag in head -%}
+ {{ tag | string }}
+ {%- endfor %}
+
+
+ {% for tag in styles -%}
+ {{ tag | string }}
+ {%- endfor %}
+
+
+
+
+
+
\ No newline at end of file
diff --git a/frappe/utils/pdf.py b/frappe/utils/pdf.py
index f32c5ba62c..9f40feffaa 100644
--- a/frappe/utils/pdf.py
+++ b/frappe/utils/pdf.py
@@ -131,6 +131,37 @@ def get_pdf(html, options=None, output: PdfWriter | None = None):
return filedata
+def measure_time(func):
+ import time
+
+ def wrapper(*args, **kwargs):
+ start_time = time.time()
+ result = func(*args, **kwargs)
+ end_time = time.time()
+ print(f"Function {func.__name__} took {end_time - start_time:.4f} seconds")
+ return result
+
+ return wrapper
+
+
+@measure_time
+def get_chrome_pdf(print_format, html, options, output, pdf_generator=None):
+ from frappe.utils.pdf_generator.browser import Browser
+ from frappe.utils.pdf_generator.chrome_pdf_generator import ChromePDFGenerator
+ from frappe.utils.pdf_generator.pdf_merge import PDFTransformer
+
+ if pdf_generator != "chrome":
+ # Use the default pdf generator
+ return
+ # scrubbing url to expand url is not required as we have set url.
+ # also, planning to remove network requests anyway 🤞
+ generator = ChromePDFGenerator()
+ browser = Browser(generator, print_format, html, options)
+ transformer = PDFTransformer(browser)
+ # transforms and merges header, footer into body pdf and returns merged pdf
+ return transformer.transform_pdf(output=output)
+
+
def get_file_data_from_writer(writer_obj):
# https://docs.python.org/3/library/io.html
stream = io.BytesIO()
diff --git a/frappe/utils/pdf_generator/browser.py b/frappe/utils/pdf_generator/browser.py
new file mode 100644
index 0000000000..01085d03d9
--- /dev/null
+++ b/frappe/utils/pdf_generator/browser.py
@@ -0,0 +1,465 @@
+from typing import ClassVar
+
+from bs4 import BeautifulSoup
+
+import frappe
+from frappe.utils.print_utils import convert_uom, parse_float_and_unit
+
+
+class Browser:
+ def __init__(self, generator, print_format, html, options):
+ self.is_print_designer = frappe.get_cached_value("Print Format", print_format, "print_designer")
+ self.browserID = frappe.utils.random_string(10)
+ generator.add_browser(self.browserID)
+ # sets soup from html
+ self.set_html(html)
+ # sets wkhtmltopdf options
+ self.set_options(options)
+ # start cdp connection and create browser context ( kind of like new window / incognito mode)
+ self.open(generator)
+ # opens header and footer pages and sets content ( not waiting for it to load)
+ self.prepare_header_footer()
+ # opens body page and sets content and waits for it to finshing load
+ self.setup_body_page()
+ # prepare options as per chrome for pdf
+ self.prepare_options_for_pdf()
+ # generate header and footer pages if they are not dynamic ( first, odd, even, last)
+ self.update_header_footer_page_pd()
+ # if header and footer are not dynamic start generating pdf for them (non-blocking)
+ self.try_async_header_footer_pdf()
+ # now wait for page to load as we need DOM to generate pdf
+ self.body_page.wait_for_set_content()
+ self.body_pdf = self.body_page.generate_pdf(raw=not self.header_page and not self.footer_page)
+ self.body_page.close()
+ self.update_header_footer_page()
+
+ if self.header_page:
+ if not self.is_header_dynamic:
+ self.header_pdf = self.header_page.get_pdf_from_stream(self.header_page.get_pdf_stream_id())
+ else:
+ self.header_pdf = self.header_page.generate_pdf()
+ self.header_page.close()
+
+ if self.footer_page:
+ if not self.is_footer_dynamic:
+ self.footer_pdf = self.footer_page.get_pdf_from_stream(self.footer_page.get_pdf_stream_id())
+ else:
+ self.footer_pdf = self.footer_page.generate_pdf()
+ self.footer_page.close()
+
+ self.close()
+
+ generator.remove_browser(self.browserID)
+
+ def open(self, generator):
+ from frappe.utils.pdf_generator.cdp_connection import CDPSocketClient
+
+ # checking because if we share browser accross request _devtools_url will already be set for subsequent requests.
+ if not generator._devtools_url:
+ generator._set_devtools_url()
+ # start the CDP websocket connection to browser
+ self.session = CDPSocketClient(generator._devtools_url)
+
+ self.session.connect()
+ self.create_browser_context()
+
+ def create_browser_context(self):
+ # create browser context
+ result, error = self.session.send("Target.createBrowserContext", {"disposeOnDetach": True})
+ if error:
+ frappe.log_error(title="Error creating browser context:", message=f"{error}")
+ self.browser_context_id = result["browserContextId"]
+
+ def set_html(self, html):
+ self.soup = BeautifulSoup(html, "html5lib")
+
+ def set_options(self, options):
+ self.options = options
+
+ def new_page(self, page_type):
+ """
+ # create a new page in the browser inside browser context
+ ----
+ TODO: Implement Deterministic rendering for headless-chrome via DevTools Protocol ( waiting for macos support )
+ https://docs.google.com/document/d/1PppegrpXhOzKKAuNlP6XOEnviXFGUiX2hop00Cxcv4o/edit?tab=t.0#bookmark=id.dukbomwxpb3j
+
+ NOTE: In theory this will make it faster but more importantly use less cpu, ram etc.
+ """
+
+ from frappe.utils.pdf_generator.page import Page
+
+ page = Page(self.session, self.browser_context_id, page_type)
+ page.is_print_designer = self.is_print_designer
+
+ return page
+
+ def setup_body_page(self):
+ self.body_page = self.new_page("body")
+ self.body_page.set_tab_url(frappe.request.host_url)
+ self.body_page.wait_for_navigate()
+ self.body_page.set_content(str(self.soup))
+
+ def close_page(self, type):
+ page = getattr(self, f"{type}_page")
+ page.close()
+
+ def is_page_no_used(self, soup):
+ # Check if any of the classes exist
+ classes_to_check = [
+ "page",
+ "frompage",
+ "topage",
+ "page_info_page",
+ "page_info_frompage",
+ "page_info_topage",
+ ]
+
+ # Loop through the classes to check
+ for class_name in classes_to_check:
+ if soup.find(class_=class_name): # Check if any element with the class is found
+ return True # Return True if class is found
+
+ return False
+
+ def prepare_header_footer(self):
+ # code is structured like this to improve performance by running commands in chrome as soon as possible.
+ soup = self.soup
+ options = self.options
+ # open header and footer pages
+ self._open_header_footer_pages()
+
+ # get tags to pass to header template.
+ head = soup.find("head").contents
+ styles = soup.find_all("style")
+
+ # set header and footer content ( not waiting for it to load yet).
+ if self.header_page:
+ self.header_page.wait_for_navigate()
+ self.header_page.set_content(
+ self.get_rendered_header_footer(self.header_content, "header", head, styles, css=[])
+ )
+
+ if self.footer_page:
+ self.footer_page.wait_for_navigate()
+ self.footer_page.set_content(
+ self.get_rendered_header_footer(self.footer_content, "footer", head, styles, css=[])
+ )
+ if self.header_page:
+ self.header_page.wait_for_set_content()
+ self.header_height = self.header_page.get_element_height()
+ self.is_header_dynamic = self.is_page_no_used(self.header_content)
+ del self.header_content
+ else:
+ # bad implicit setting of margin #backwards-compatibility
+ options["margin-top"] = "15mm"
+
+ if self.footer_page:
+ self.footer_page.wait_for_set_content()
+ self.footer_height = self.footer_page.get_element_height()
+ self.is_footer_dynamic = self.is_page_no_used(self.footer_content)
+ del self.footer_content
+ else:
+ # bad implicit setting of margin #backwards-compatibility
+ options["margin-bottom"] = "15mm"
+
+ # Remove instances of them from main content for render_template
+ for html_id in ["header-html", "footer-html"]:
+ for tag in soup.find_all(id=html_id):
+ tag.extract()
+
+ def try_async_header_footer_pdf(self):
+ if self.header_page and not self.is_header_dynamic:
+ self.header_page.generate_pdf(wait_for_pdf=False)
+ if self.footer_page and not self.is_footer_dynamic:
+ self.footer_page.generate_pdf(wait_for_pdf=False)
+
+ def _get_converted_num(self, num_str, unit="px"):
+ parsed = parse_float_and_unit(num_str)
+ if parsed:
+ return convert_uom(parsed["value"], parsed["unit"], unit, only_number=True)
+
+ def _parse_pdf_options_from_html(self):
+ from frappe.utils.pdf import get_print_format_styles
+
+ soup: BeautifulSoup = self.soup
+ options = {}
+ print_format_css = get_print_format_styles(soup)
+ attrs = (
+ "margin-top",
+ "margin-bottom",
+ "margin-left",
+ "margin-right",
+ "page-size",
+ "header-spacing",
+ "orientation",
+ "page-width",
+ "page-height",
+ )
+ options |= {style.name: style.value for style in print_format_css if style.name in attrs}
+ self.options.update(options)
+
+ def _set_default_page_size(self):
+ options = self.options
+ pdf_page_size = (
+ options.get("page-size") or frappe.db.get_single_value("Print Settings", "pdf_page_size") or "A4"
+ )
+
+ if pdf_page_size == "Custom":
+ options["page-height"] = options.get("page-height") or frappe.db.get_single_value(
+ "Print Settings", "pdf_page_height"
+ )
+ options["page-width"] = options.get("page-width") or frappe.db.get_single_value(
+ "Print Settings", "pdf_page_width"
+ )
+ else:
+ options["page-size"] = pdf_page_size
+
+ def prepare_options_for_pdf(self):
+ self._parse_pdf_options_from_html()
+ self._set_default_page_size()
+
+ options = self.options
+
+ updated_options = {
+ "scale": 1,
+ "printBackground": True,
+ "transferMode": "ReturnAsStream",
+ "marginTop": 0,
+ "marginBottom": 0,
+ "marginLeft": 0,
+ "marginRight": 0,
+ "landscape": options.get("orientation", "Portrait") == "Landscape",
+ "preferCSSPageSize": False,
+ "pageRanges": options.get("page-ranges", ""),
+ # Experimental
+ "generateTaggedPDF": options.get("generate-tagged-pdf", False),
+ "generateOutline": options.get("generate-outline", False),
+ }
+
+ # bad implicit setting of margin #backwards-compatibility
+ if not self.is_print_designer:
+ if not options.get("margin-right"):
+ options["margin-right"] = "15mm"
+
+ if not options.get("margin-left"):
+ options["margin-left"] = "15mm"
+
+ if not options.get("page-height") or not options.get("page-width"):
+ if not (page_size := self.options.get("page-size")):
+ raise frappe.ValidationError("Page size is required")
+ if page_size == "CUSTOM":
+ raise frappe.ValidationError("Custom page size requires page-height and page-width")
+ size = PageSize.get(page_size)
+ if not size:
+ raise frappe.ValidationError("Invalid page size")
+
+ options["page-height"] = convert_uom(size["height"], "mm", "px", only_number=True)
+ options["page-width"] = convert_uom(size["width"], "mm", "px", only_number=True)
+
+ if isinstance(options["page-height"], str):
+ options["page-height"] = self._get_converted_num(options["page-height"])
+
+ if isinstance(options["page-width"], str):
+ options["page-width"] = self._get_converted_num(options["page-width"])
+
+ updated_options["paperWidth"] = convert_uom(options["page-width"], "px", "in", only_number=True)
+
+ if options.get("margin-left"):
+ updated_options["marginLeft"] = convert_uom(
+ self._get_converted_num(options["margin-left"]), "px", "in", only_number=True
+ )
+
+ if options.get("margin-right"):
+ updated_options["marginRight"] = convert_uom(
+ self._get_converted_num(options["margin-right"]), "px", "in", only_number=True
+ )
+
+ # make copy of options to update them in header, body, footer.
+ self.body_page.options = updated_options.copy()
+ if self.header_page:
+ self.header_page.options = updated_options.copy()
+ if self.footer_page:
+ self.footer_page.options = updated_options.copy()
+
+ margin_top = self._get_converted_num(options.get("margin-top", 0))
+ margin_bottom = self._get_converted_num(options.get("margin-bottom", 0))
+
+ header_with_top_margin = 0
+ header_with_spacing_top_margin = 0
+ footer_with_bottom_margin = 0
+ footer_height = 0
+
+ if self.header_page:
+ header_with_top_margin = self.header_height + margin_top
+ header_spacing = options.get("header-spacing", 0)
+ header_with_spacing_top_margin = header_with_top_margin + header_spacing
+ self.header_page.options["paperHeight"] = (
+ convert_uom(header_with_spacing_top_margin, "px", "in", only_number=True)
+ if header_with_spacing_top_margin
+ else 0
+ )
+
+ margin_top = convert_uom(margin_top, "px", "in", only_number=True)
+
+ if self.header_page:
+ self.header_page.options["marginTop"] = margin_top
+ else:
+ self.body_page.options["marginTop"] = margin_top
+
+ if self.footer_page:
+ footer_height = self.footer_height
+ self.footer_page.options["paperHeight"] = (
+ convert_uom(footer_height, "px", "in", only_number=True) if footer_height else 0
+ )
+ footer_with_bottom_margin = self.footer_height + margin_bottom
+
+ margin_bottom = convert_uom(margin_bottom, "px", "in", only_number=True)
+
+ if self.footer_page:
+ self.footer_page.options["marginBottom"] = margin_bottom
+ else:
+ self.body_page.options["marginBottom"] = margin_bottom
+
+ body_height = options.get("page-height") - (
+ header_with_spacing_top_margin + footer_with_bottom_margin
+ )
+
+ """
+ matching scale for some old formats is 1.46 #backwards-compatibility ( scale 1 is better in my opinion)
+ If we face issues in custom formats then only we should enable this.
+ """
+
+ self.body_page.options["paperHeight"] = convert_uom(body_height, "px", "in", only_number=True)
+
+ def get_rendered_header_footer(self, content, type, head, styles, css):
+ from frappe.utils.pdf import toggle_visible_pdf
+
+ html_id = f"{type}-html"
+ content = content.extract()
+ toggle_visible_pdf(content)
+ id_map = {"header": "pdf_header_html", "footer": "pdf_footer_html"}
+ hook_func = frappe.get_hooks(id_map.get(type))
+ return frappe.call(
+ hook_func[-1],
+ soup=self.soup,
+ head=head,
+ content=content,
+ styles=styles,
+ html_id=html_id,
+ css=css,
+ path="templates/print_formats/chrome_pdf_header_footer.html",
+ )
+
+ def update_header_footer_page(self):
+ if not self.header_page and not self.footer_page:
+ return
+ total_pages = len(self.body_pdf.pages)
+ # function is added to html from update_page_no.js
+ if self.header_page:
+ if self.is_header_dynamic:
+ self.header_page.evaluate(
+ f"clone_and_update('{'#header-render-container' if self.is_print_designer else '.wrapper'}', {total_pages}, {1 if self.is_print_designer else 0}, 'Header', 1);",
+ await_promise=True,
+ )
+
+ if self.footer_page:
+ if self.is_footer_dynamic:
+ self.footer_page.evaluate(
+ f"clone_and_update('{'#footer-render-container' if self.is_print_designer else '.wrapper'}', {total_pages}, {1 if self.is_print_designer else 0}, 'Footer', 1);",
+ await_promise=True,
+ )
+
+ def update_header_footer_page_pd(self):
+ if not self.is_print_designer:
+ return
+ if not self.header_page and not self.footer_page:
+ return
+ # function is added to html from update_page_no.js
+ if self.header_page and not self.is_header_dynamic:
+ self.header_page.evaluate(
+ "clone_and_update('#header-render-container', 0, 1, 'Header', 0);",
+ await_promise=True,
+ )
+
+ if self.footer_page and not self.is_footer_dynamic:
+ self.footer_page.evaluate(
+ "clone_and_update('#footer-render-container', 0, 1, 'Footer', 0);",
+ await_promise=True,
+ )
+
+ def _open_header_footer_pages(self):
+ self.header_page = None
+ self.footer_page = None
+ # open new page for header/footer if they exist.
+ # It sends CDP command to the browser to open a new tab.
+ if header_content := self.soup.find(id="header-html"):
+ self.header_page = self.new_page("header")
+ self.header_page.set_tab_url(frappe.request.host_url)
+
+ if footer_content := self.soup.find(id="footer-html"):
+ self.footer_page = self.new_page("footer")
+ self.footer_page.set_tab_url(frappe.request.host_url)
+
+ self.header_content = header_content
+ self.footer_content = footer_content
+
+ def close(self):
+ self.session.disconnect()
+
+
+class PageSize:
+ page_sizes: ClassVar[dict[str, tuple[int, int]]] = {
+ "A10": (26, 37),
+ "A1": (594, 841),
+ "A0": (841, 1189),
+ "A3": (297, 420),
+ "A2": (420, 594),
+ "A5": (148, 210),
+ "A4": (210, 297),
+ "A7": (74, 105),
+ "A6": (105, 148),
+ "A9": (37, 52),
+ "A8": (52, 74),
+ "B10": (44, 31),
+ "B1+": (1020, 720),
+ "B4": (353, 250),
+ "B5": (250, 176),
+ "B6": (176, 125),
+ "B7": (125, 88),
+ "B0": (1414, 1000),
+ "B1": (1000, 707),
+ "B2": (707, 500),
+ "B3": (500, 353),
+ "B2+": (720, 520),
+ "B8": (88, 62),
+ "B9": (62, 44),
+ "C10": (40, 28),
+ "C9": (57, 40),
+ "C8": (81, 57),
+ "C3": (458, 324),
+ "C2": (648, 458),
+ "C1": (917, 648),
+ "C0": (1297, 917),
+ "C7": (114, 81),
+ "C6": (162, 114),
+ "C5": (229, 162),
+ "C4": (324, 229),
+ "Legal": (216, 356),
+ "Junior Legal": (127, 203),
+ "Letter": (216, 279),
+ "Tabloid": (279, 432),
+ "Ledger": (432, 279),
+ "ANSI C": (432, 559),
+ "ANSI A (letter)": (216, 279),
+ "ANSI B (ledger & tabloid)": (279, 432),
+ "ANSI E": (864, 1118),
+ "ANSI D": (559, 864),
+ }
+
+ @classmethod
+ def get(cls, name):
+ if name in cls.page_sizes:
+ width, height = cls.page_sizes[name]
+ return {"width": width, "height": height}
+ else:
+ return None # Return None if the page size is not found
diff --git a/frappe/utils/pdf_generator/cdp_connection.py b/frappe/utils/pdf_generator/cdp_connection.py
new file mode 100644
index 0000000000..3c0874aca4
--- /dev/null
+++ b/frappe/utils/pdf_generator/cdp_connection.py
@@ -0,0 +1,179 @@
+import asyncio
+
+import websockets
+
+import frappe
+
+
+class CDPSocketClient:
+ """
+ Manages WebSocket communications with Chrome DevTools Protocol.
+ Ensures robust error handling and consistent logging.
+ """
+
+ def __init__(self, websocket_url):
+ self.websocket_url = websocket_url
+ self.connection = None
+ self.message_id = 0
+ self.pending_messages = {}
+ self.listeners = {}
+ self.listen_task = None
+ self.loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(self.loop)
+
+ def connect(self):
+ """Open the WebSocket connection and start listening for messages."""
+ self.loop.run_until_complete(self._connect())
+ self.listen_task = self.loop.create_task(self._listen())
+
+ async def _connect(self):
+ try:
+ self.connection = await websockets.connect(self.websocket_url)
+ except Exception:
+ frappe.log_error(title="Failed to connect to WebSocket:", message=f"{frappe.get_traceback()}")
+ raise
+
+ async def _listen(self):
+ try:
+ async for message in self.connection:
+ self._handle_message(frappe.json.loads(message))
+ except Exception:
+ frappe.log_error(title="WebSocket listening error:", message=f"{frappe.get_traceback()}")
+
+ def _handle_message(self, response):
+ method = response.get("method")
+ params = response.get("params", {})
+ session_id = response.get("sessionId")
+ target_id = params.get("targetId")
+ frame_id = params.get("frameId")
+ message_id = response.get("id")
+
+ composite_key = (method, session_id, target_id, frame_id)
+
+ # Handle responses with `id`
+ if message_id and message_id in self.pending_messages:
+ future = self.pending_messages.pop(message_id)
+ if composite_key in self.pending_messages:
+ self.pending_messages.pop(composite_key)
+ future.set_result(response)
+
+ # Handle responses without `id` using a composite key
+ elif method:
+ if composite_key in self.pending_messages:
+ # print("matched using composite_key", composite_key)
+ future = self.pending_messages.pop(composite_key)
+ future.set_result(response)
+
+ if method in self.listeners:
+ for callback, future, filters in self.listeners[method]:
+ # added not filters["key"] might cause cross talk between different sessions
+ if (
+ (not session_id or not filters["sessionId"] or filters["sessionId"] == session_id)
+ and (not target_id or not filters["targetId"] or filters["targetId"] == target_id)
+ and (not frame_id or not filters["frameId"] or filters["frameId"] == frame_id)
+ ):
+ callback(future, response)
+
+ def disconnect(self):
+ try:
+ if self.listen_task:
+ self.listen_task.cancel()
+ self.loop.run_until_complete(self._disconnect())
+ # Cancel all pending tasks before stopping the loop was causing degrading performance over time to not cancelled properly
+ pending_tasks = [task for task in asyncio.all_tasks(self.loop) if not task.done()]
+ for task in pending_tasks:
+ task.cancel()
+ try:
+ self.loop.run_until_complete(task) # Ensure tasks finish before loop stops
+ except asyncio.CancelledError:
+ pass # Ignore cancellation errors
+ except Exception:
+ frappe.log_error(title="Error while disconnecting:", message=f"{frappe.get_traceback()}")
+ raise
+
+ async def _disconnect(self):
+ try:
+ if self.connection and not self.connection.closed:
+ await self.connection.close()
+ self.connection = None
+ except Exception:
+ frappe.log_error(
+ title="Error during WebSocket disconnection:", message=f"{frappe.get_traceback()}"
+ )
+
+ def send(self, method, params=None, session_id=None, return_future=False):
+ if return_future:
+ return asyncio.ensure_future(
+ self._send(method, params, session_id, wait_future_fulfill=False), loop=self.loop
+ )
+ future = self.loop.run_until_complete(self._send(method, params, session_id))
+ return self._destructure_response(future.result())
+
+ async def _send(self, method, params=None, session_id=None, wait_future_fulfill=True):
+ self.message_id += 1
+ message_id = self.message_id
+ message = {
+ "id": message_id,
+ "method": method,
+ "params": params or {},
+ }
+
+ if session_id:
+ message["sessionId"] = session_id
+
+ if self.connection is None:
+ raise RuntimeError("WebSocket connection is not open.")
+
+ future = asyncio.Future()
+ self.pending_messages[message_id] = future
+
+ # Dynamically create the composite key
+ if any(
+ [
+ method,
+ session_id,
+ params.get("targetId") if params else None,
+ params.get("frameId") if params else None,
+ ]
+ ):
+ composite_key = (
+ method,
+ session_id,
+ params.get("targetId") if params else None,
+ params.get("frameId") if params else None,
+ )
+ self.pending_messages[composite_key] = future
+
+ await self.connection.send(frappe.json.dumps(message))
+ if wait_future_fulfill:
+ await future
+ return future
+
+ def _destructure_response(self, response):
+ """Destructure the response to extract useful information."""
+ result = response.get("result", None)
+ error = response.get("error", None)
+ return result, error
+
+ def start_listener(self, method, callback, session_id=None, target_id=None, frame_id=None):
+ """Register a listener for a specific CDP event with optional filtering."""
+ if method not in self.listeners:
+ self.listeners[method] = []
+ future = self.loop.create_future()
+
+ event = (callback, future, {"sessionId": session_id, "targetId": target_id, "frameId": frame_id})
+ if event not in self.listeners[method]:
+ self.listeners[method].append(event)
+ return event
+
+ def wait_for_event(self, event, timeout=3):
+ if type(event) is tuple:
+ event = event[1]
+ try:
+ self.loop.run_until_complete(asyncio.wait_for(event, timeout))
+ except asyncio.TimeoutError:
+ frappe.log_error(title="Timeout waiting for event", message=f"{frappe.get_traceback()}")
+
+ def remove_listener(self, method, event):
+ """Remove a listener for a specific CDP event."""
+ self.listeners[method].remove(event)
diff --git a/frappe/utils/pdf_generator/chrome_pdf_generator.py b/frappe/utils/pdf_generator/chrome_pdf_generator.py
new file mode 100644
index 0000000000..4f77c33a37
--- /dev/null
+++ b/frappe/utils/pdf_generator/chrome_pdf_generator.py
@@ -0,0 +1,284 @@
+import os
+import platform
+import subprocess
+import time
+from pathlib import Path
+from typing import ClassVar
+
+import requests
+
+import frappe
+from frappe import _
+
+# TODO: close browser when worker is killed.
+
+
+class ChromePDFGenerator:
+ EXECUTABLE_PATHS: ClassVar[dict[str, list[str]]] = {
+ "linux": ["chrome-linux", "headless_shell"],
+ "darwin": ["chrome-mac", "headless_shell"],
+ "windows": ["chrome-win", "headless_shell.exe"],
+ }
+
+ _instance = None
+
+ _browsers: ClassVar[list] = []
+
+ def add_browser(self, browser):
+ self._browsers.append(browser)
+
+ def remove_browser(self, browser):
+ self._browsers.remove(browser)
+
+ def __new__(cls):
+ # if instance or _chromium_process is not available create object else return current instance stored in cls._instance
+ if cls._instance is None or not cls._instance._chromium_process:
+ cls._instance = super().__new__(cls)
+ return cls._instance
+
+ def __init__(self):
+ """Initialize only once."""
+ if hasattr(self, "_initialized"): # Prevent multiple initializations
+ return
+ self._initialized = True # Mark as initialized
+
+ self._chromium_process = None
+ self._chromium_path = None
+ self._devtools_url = None
+ self._initialize_chromium()
+
+ def _initialize_chromium(self):
+ # ideally browser is initailized from before request hook.
+ # if _chromium_process is not available then initialize it.
+ if self._chromium_process:
+ return
+ # get site config and load chromium settings.
+ site_config = frappe.get_common_site_config()
+
+ # only when we want to chromium on separate docker / server ( not implemented/tested yet )
+ self.CHROMIUM_WEBSOCKET_URL = site_config.get("chromium_websocket_url", "")
+ if self.CHROMIUM_WEBSOCKET_URL:
+ frappe.warn("Using external chromium websocket url. Make sure it is accessible.")
+ self._devtools_url = self.CHROMIUM_WEBSOCKET_URL
+ return
+
+ # only when we want to use chromium from a specific path ( incase we don't have chromium in bench folder )
+ self.CHROMIUM_BINARY_PATH = site_config.get("chromium_binary_path", "")
+ """
+ Number of allowed open websocket connections to chromium.
+ This number will basically define how many concurrent requests can be handled by one chromium instance.
+ #TODO: Implement/Modify logic to handle multiple chromium instance in one class / per worker. currently we are starting one chromium.
+ """
+ self.CHROME_OPEN_CONNECTIONS = site_config.get("chromium_max_concurrent", 1)
+ # if we want to use persistent ( long running ) chromium for all sites.
+ # current approch starts chrome per worker process.
+ # TODO: Better Implement logic to support for persistent chrome proccess.
+ self.USE_PERSISTENT_CHROMIUM = site_config.get("use_persistent_chromium", False)
+ # time to wait for chromium to start and provide dev tools url used in _set_devtools_url.
+ self.START_TIMEOUT = site_config.get("chromium_start_timeout", 3)
+
+ self._chromium_path = (
+ self._find_chromium_executable() if not self.CHROMIUM_BINARY_PATH else self.CHROMIUM_BINARY_PATH
+ )
+ if self._verify_chromium_installation():
+ if not self._devtools_url:
+ self.start_chromium_process()
+
+ def _find_chromium_executable(self):
+ """Finds the Chromium executable or raises an error if not found."""
+ bench_path = frappe.utils.get_bench_path()
+ """Determine the path to the Chromium executable. chromium is downloaded by download_chromium in print_designer/install.py"""
+ chromium_dir = os.path.join(bench_path, "chromium")
+
+ if not os.path.exists(chromium_dir):
+ frappe.throw(_("Chromium is not downloaded. Please run the setup first."))
+
+ platform_name = platform.system().lower()
+
+ if platform_name not in ["linux", "darwin", "windows"]:
+ frappe.throw(f"Unsupported platform: {platform_name}")
+
+ executable_name = self.EXECUTABLE_PATHS.get(platform_name)
+
+ # Construct the full path to the executable
+ exec_path = Path(chromium_dir).joinpath(*executable_name)
+ if not exec_path.exists():
+ frappe.throw(
+ f"Chromium executable not found: {exec_path}. please run bench setup-new-pdf-backend"
+ )
+
+ return str(exec_path)
+
+ def _verify_chromium_installation(self):
+ """Ensures Chromium is available and executable, raising clearer errors if not."""
+ if not os.path.exists(self._chromium_path):
+ frappe.throw(
+ f"Chromium not available at the specified path. Please check the path: {self._chromium_path}"
+ )
+ if not os.access(self._chromium_path, os.X_OK):
+ frappe.throw(f"Chromium not executable at {self._chromium_path}")
+ return True
+
+ def start_chromium_process(self, debug=False):
+ """
+ Launches Chromium in headless mode with robust logging and error handling.
+ chrome switches
+ https://peter.sh/experiments/chromium-command-line-switches/
+
+ NOTE: dbus issue in docker
+ https://source.chromium.org/chromium/chromium/src/+/main:content/app/content_main.cc;l=229-241?q=DBUS_SESSION_BUS_ADDRESS&ss=chromium
+ """
+ try:
+ if debug:
+ command_args = [
+ "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome", # path to locally installed chrome browser for debugging.
+ "--remote-debugging-port=0",
+ "--user-data-dir=/tmp/chromium-{}-user-data".format(
+ frappe.local.site + frappe.utils.random_string(10)
+ ),
+ "--disable-gpu",
+ "--no-sandbox",
+ "--no-first-run",
+ "",
+ ]
+ else:
+ command_args = [
+ self._chromium_path,
+ # 0 will automatically select a random open port from the ephemeral port range.
+ "--remote-debugging-port=0",
+ "--disable-gpu", # GPU is not available in production environment.
+ "--disable-field-trial-config",
+ "--disable-background-networking",
+ "--disable-background-timer-throttling",
+ "--disable-backgrounding-occluded-windows",
+ "--disable-back-forward-cache",
+ "--disable-breakpad",
+ "--disable-client-side-phishing-detection",
+ "--disable-component-extensions-with-background-pages",
+ "--disable-component-update",
+ "--no-default-browser-check",
+ "--disable-default-apps",
+ "--disable-dev-shm-usage",
+ "--disable-extensions",
+ "--disable-features=ImprovedCookieControls,LazyFrameLoading,GlobalMediaControls,DestroyProfileOnBrowserClose,MediaRouter,DialMediaRouteProvider,AcceptCHFrame,AutoExpandDetailsElement,CertificateTransparencyComponentUpdater,AvoidUnnecessaryBeforeUnloadCheckSync,Translate,HttpsUpgrades,PaintHolding,ThirdPartyStoragePartitioning,LensOverlay,PlzDedicatedWorker",
+ "--allow-pre-commit-input",
+ "--disable-hang-monitor",
+ "--disable-ipc-flooding-protection",
+ "--disable-popup-blocking",
+ "--disable-prompt-on-repost",
+ "--disable-renderer-backgrounding",
+ "--force-color-profile=srgb",
+ "--metrics-recording-only",
+ "--no-first-run",
+ "--password-store=basic",
+ "--use-mock-keychain",
+ "--no-service-autorun",
+ "--export-tagged-pdf",
+ "--disable-search-engine-choice-screen",
+ "--unsafely-disable-devtools-self-xss-warnings",
+ "--enable-use-zoom-for-dsf=false",
+ "--use-angle",
+ "--headless",
+ "--hide-scrollbars",
+ "--mute-audio",
+ "--blink-settings=primaryHoverType=2,availableHoverTypes=2,primaryPointerType=4,availablePointerTypes=4",
+ "--no-sandbox",
+ "--no-startup-window",
+ # related to HeadlessExperimental flag enable when Implement Deterministic rendering. check page class for more info.
+ # "--enable-surface-synchronization",
+ # "--run-all-compositor-stages-before-draw",
+ # "--disable-threaded-animation",
+ # "--disable-threaded-scrolling",
+ # "--disable-checker-imaging",
+ ]
+
+ self._start_chromium_process(command_args)
+
+ except Exception as e:
+ frappe.log_error(f"Error starting Chromium: {e}")
+ frappe.throw(_("Could not start Chromium. Check logs for details."))
+
+ # Apply the decorator to monitor Chromium subprocess usage for development / debugging purposes.
+ # it will print and write usage data to a file ( defaults to chrome_process_usage.json).
+ # from print_designer.pdf_generator.monitor_subprocess import monitor_subprocess_usage
+ # @monitor_subprocess_usage(interval=0.1)
+ def _start_chromium_process(self, command_args):
+ if platform.system().lower() == "windows":
+ # hide cmd window
+ startupinfo = subprocess.STARTUPINFO()
+ startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
+ startupinfo.wShowWindow = subprocess.SW_HIDE
+ self._chromium_process = subprocess.Popen(
+ command_args,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ startupinfo=startupinfo,
+ text=True,
+ )
+ else:
+ self._chromium_process = subprocess.Popen(
+ command_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
+ )
+ return self._chromium_process
+
+ def _set_devtools_url(self):
+ """
+ Monitor Chromium's stderr for the DevTools WebSocket URL
+ ----------------
+ other approch: if we choose port using find_available_port we can avoid this entirely and fetch_devtools_url() method.
+
+ NOTE: 1) in current approch output to stderr is pretty consistent.
+ 2) other approch may seem reliable but it is slow compared to this in testing.
+
+ TODO:
+ final approch can be decided later after testing in production.
+ """
+ stderr = self._chromium_process.stderr
+ start_time = time.time()
+
+ while time.time() - start_time < self.START_TIMEOUT:
+ # Read a single line from stderr and check if it contains the DevTools URL.
+ # Not using select() because it is not supported on Windows for non-socket file descriptors.
+ line = stderr.readline()
+ # not sure if "DevTools listening on" is consistent in all chromium versions.
+ if "DevTools listening on" in line:
+ url_start = line.find("ws://")
+ if url_start != -1:
+ self._devtools_url = line[url_start:].strip()
+ break
+
+ if not self._devtools_url:
+ self._chromium_process.terminate()
+ raise TimeoutError("Chromium took too long to start.")
+
+ def _close_browser(self):
+ """
+ Close the headless Chromium browser.
+ """
+ if self._browsers:
+ frappe.log("Cannot close Chromium as there are active browser instances.")
+ return
+ if self._chromium_process:
+ self._chromium_process.terminate()
+ ChromePDFGenerator._instance = None
+ self._chromium_process = None
+ self._devtools_url = None
+ frappe.log("Headless Chromium closed successfully.")
+
+ # not used anywhere in the code. read _set_devtools_url for more info. useful in case we want to take different approch to fetch devtools url.
+ def fetch_devtools_url(self, port):
+ if not port:
+ return None
+ url = f"http://127.0.0.1:{port}/json/version"
+ try:
+ response = requests.get(url)
+ response.raise_for_status() # Raise an exception for HTTP errors
+ response_data = response.json()
+ return response_data["webSocketDebuggerUrl"].strip()
+ except requests.ConnectionError:
+ frappe.log_error(
+ f"Failed to connect to the Chrome DevTools Protocol. Is Chrome running with --remote-debugging-port={port}"
+ )
+ except requests.RequestException as e:
+ frappe.log_error(f"An error occurred: {e}")
+ return None
diff --git a/frappe/utils/pdf_generator/page.py b/frappe/utils/pdf_generator/page.py
new file mode 100644
index 0000000000..f0e9a62304
--- /dev/null
+++ b/frappe/utils/pdf_generator/page.py
@@ -0,0 +1,356 @@
+import base64
+import time
+import urllib
+
+import frappe
+
+"""
+CDP commands documentation can be found here.
+https://chromedevtools.github.io/devtools-protocol/
+"""
+
+
+class Page:
+ def __init__(self, session, browser_context_id, page_type):
+ self.session = session
+ result, error = self.session.send(
+ "Target.createTarget", {"url": "", "browserContextId": browser_context_id}
+ )
+ if error:
+ frappe.log_error(title="Error creating new page:", message=f"{error}")
+
+ self.target_id = result["targetId"]
+ self.type = page_type
+ result, error = self.session.send(
+ "Target.attachToTarget", {"targetId": self.target_id, "flatten": True}
+ )
+ if error:
+ raise RuntimeError(f"Error attaching to target: {error}")
+ self.session_id = result["sessionId"]
+ self.send("Page.enable")
+ self.frame_id = None
+ self.get_frame_id_on_demand()
+ self.set_media_emulation("print")
+ self.set_cookies()
+
+ # TODO: make send to return future and don't wait for it by default.
+ def send(self, method, params=None, return_future=False):
+ if params is None:
+ params = {}
+ return self.session.send(method, params, self.session_id, return_future)
+
+ def get_frame_id_on_demand(self):
+ if self.frame_id:
+ return self.frame_id
+ try:
+ result, error = self.send("Page.getFrameTree")
+ if error:
+ raise RuntimeError(f"Error fetching frameId: {error}")
+ frame_tree = result["frameTree"]
+ frame = frame_tree["frame"]
+ self.frame_id = frame["id"]
+ return self.frame_id
+ except Exception:
+ frappe.log_error(title="Error fetching frameId:", message=f"{frappe.get_traceback()}")
+ raise
+
+ def _ensure_frame_id(self):
+ if not self.frame_id:
+ self.get_frame_id_on_demand()
+ return self.frame_id
+
+ def set_media_emulation(self, media_type: str = "print"):
+ """Set media emulation for the page."""
+ return self.send("Emulation.setEmulatedMedia", {"media": media_type})
+
+ def set_cookies(self):
+ if frappe.session and frappe.session.sid and hasattr(frappe.local, "request"):
+ domain = frappe.utils.get_host_name().split(":", 1)[0]
+ cookie = {
+ "name": "sid",
+ "value": frappe.session.sid,
+ "domain": domain,
+ "sameSite": "Strict",
+ }
+ _result, error = self.send("Network.enable")
+ if error:
+ raise RuntimeError(f"Error enabling network: {error}")
+ _result, error = self.send("Network.setCookie", cookie)
+ if error:
+ raise RuntimeError(f"Error setting cookie: {error}")
+ _result, error = self.send("Network.disable")
+ if error:
+ raise RuntimeError(f"Error disabling network: {error}")
+
+ def intercept_request_and_fulfill(self, url_pattern):
+ """Starts intercepting network requests for the given target_id and URL pattern."""
+ data = {}
+
+ def on_request_paused_event(future, response):
+ """Callback for when a request is paused (intercepted)."""
+ params = response.get("params")
+ if params and params.get("requestId"):
+ data["request_id"] = params["requestId"]
+ if not future.done():
+ future.set_result(data["request_id"])
+
+ # Start listening for requestPaused event
+ event = self.session.start_listener(
+ "Fetch.requestPaused", on_request_paused_event, self.session_id, self.target_id, self.frame_id
+ )
+
+ # Enable request interception for the specified URL pattern
+ self.session.send("Fetch.enable", {"patterns": [{"urlPattern": url_pattern}]})
+
+ def intercept_and_fulfill():
+ self.session.wait_for_event(event)
+ self.session.send(
+ "Fetch.fulfillRequest",
+ {"requestId": event[1].result(), "responseCode": 200},
+ return_future=True,
+ )
+ self.session.remove_listener("Fetch.requestPaused", event)
+
+ return intercept_and_fulfill
+
+ def intercept_request_for_local_resources(self, url_pattern="*"):
+ """Starts intercepting network requests for the given target_id and URL pattern."""
+ data = {}
+
+ def on_request_paused_event(future, response):
+ """Callback for when a request is paused (intercepted)."""
+ params = response.get("params")
+ if params and params.get("requestId"):
+ data["request_id"] = params["requestId"]
+ url = params["request"]["url"]
+
+ if url.startswith(frappe.request.host_url):
+ path = url.replace(frappe.request.host_url, "").split("?v", 1)[0]
+ if path.startswith("assets/") or path.startswith("files/"):
+ path = urllib.parse.unquote(path)
+ if path.startswith("files/"):
+ path = frappe.utils.get_site_path("public", path)
+ content = frappe.read_file(path, as_base64=True)
+ response_headers = []
+ # write logic to handle all file types as required
+ if path.endswith(".svg"):
+ response_headers.append({"name": "Content-Type", "value": "image/svg+xml"})
+ if content:
+ self.session.send(
+ "Fetch.fulfillRequest",
+ {
+ "requestId": data["request_id"],
+ "responseCode": 200, # actually hande the response code from the request
+ "responseHeaders": response_headers,
+ "body": content,
+ },
+ return_future=True,
+ )
+ return
+ self.session.send(
+ "Fetch.continueRequest",
+ {"requestId": data["request_id"]},
+ return_future=True,
+ )
+
+ # Start listening for requestPaused event
+ self.session.start_listener(
+ "Fetch.requestPaused", on_request_paused_event, self.session_id, self.target_id, self.frame_id
+ )
+
+ # Enable request interception for the specified URL pattern
+ self.session.send("Fetch.enable", {"patterns": [{"urlPattern": url_pattern}]})
+
+ def set_tab_url(self, url):
+ """Navigate to a URL and fulfill the request with status code 200."""
+
+ # Intercept and fulfill request with 200 status code
+ wait_and_fulfill = self.intercept_request_and_fulfill(url)
+ # Now, navigate after intercepting the request
+ wait_start = self.wait_for_load(wait_for="load")
+ page_navigate = self.send("Page.navigate", {"url": url}, return_future=True)
+ wait_and_fulfill()
+
+ def wait_for_navigate():
+ self.session.wait_for_event(page_navigate, 3)
+ wait_start()
+
+ self.wait_for_navigate = wait_for_navigate
+
+ def evaluate(self, expression, await_promise=False):
+ self.send("Runtime.enable")
+ result, error = self.send(
+ "Runtime.evaluate", {"expression": expression, "awaitPromise": await_promise}
+ )
+ if error:
+ # retry if error in 500ms for 3 times (just safe guard as i had few edge cases where it failed).
+ # waiting for network is still slower than this.
+ for _i in range(3):
+ print(f"Error evaluating expression: {error}. Retrying in 500ms")
+ time.sleep(0.5)
+ result, error = self.send(
+ "Runtime.evaluate", {"expression": expression, "awaitPromise": await_promise}
+ )
+ if not error:
+ break
+ raise RuntimeError(f"Error evaluating expression: {error}")
+
+ self.send("Runtime.disable")
+ return result
+
+ # set wait_for to networkIdle if pdf is not rendering correctly.
+ # if you face header Height to be incorrect as some external script is changing elements.
+ # networkIdle is most stable option but make it a lot slower so avoiding for now. enable if not stable
+ def set_content(self, html, wait_for=None):
+ if not wait_for:
+ wait_for = ["load", "DOMContentLoaded"]
+ self.intercept_request_for_local_resources()
+ wait_start = self.wait_for_load(wait_for=wait_for)
+ self.send("Page.setDocumentContent", {"frameId": self._ensure_frame_id(), "html": html})
+ self.wait_for_set_content = wait_start
+
+ def wait_for_load(self, wait_for, timeout=60):
+ self.send("Page.setLifecycleEventsEnabled", {"enabled": True})
+ status = {}
+ if isinstance(wait_for, str):
+ status[wait_for] = False
+ if isinstance(wait_for, list):
+ for event in wait_for:
+ status[event] = False
+
+ def on_lifecycle_event(future, response):
+ params = response.get("params", {})
+ if params.get("name") in status.keys():
+ status[params.get("name")] = True
+ if all(status.values()):
+ if not future.done():
+ future.set_result(response)
+
+ event = self.session.start_listener(
+ "Page.lifecycleEvent", on_lifecycle_event, self.session_id, self.target_id, self.frame_id
+ )
+
+ def start_wait():
+ self.session.wait_for_event(event, timeout)
+ self.session.remove_listener("Page.lifecycleEvent", event)
+
+ return start_wait
+
+ def get_element_height(self, selector="body"):
+ try:
+ if not self.is_print_designer:
+ selector = ".wrapper"
+ self.send("DOM.enable")
+ doc_result, doc_error = self.send("DOM.getDocument")
+ if doc_error:
+ raise RuntimeError(f"Error getting document node: {doc_error}")
+ doc_node_id = doc_result["root"]["nodeId"]
+ result, error = self.send("DOM.querySelector", {"nodeId": doc_node_id, "selector": selector})
+ if error:
+ raise RuntimeError(f"Error querying selector: {error}")
+ node_id = result["nodeId"]
+ result, error = self.send("DOM.getBoxModel", {"nodeId": node_id})
+ if error:
+ raise RuntimeError(f"Error getting computed style: {error}")
+ height = result["model"]["height"]
+ finally:
+ self.send("DOM.disable")
+ return height
+
+ def add_page_size_css(self):
+ width = str(self.options["paperWidth"]) + "in"
+ height = str(self.options["paperHeight"]) + "in"
+ marginLeft = str(self.options["marginLeft"]) + "in"
+ marginRight = str(self.options["marginRight"]) + "in"
+ marginTop = str(self.options["marginTop"]) + "in"
+ marginBottom = str(self.options["marginBottom"]) + "in"
+
+ # Enable DOM and CSS agents
+ result, error = self.send("DOM.enable")
+ if error:
+ raise RuntimeError(f"Error enabling DOM: {error}")
+
+ result, error = self.send("CSS.enable")
+ if error:
+ raise RuntimeError(f"Error enabling CSS: {error}")
+
+ # Create a new stylesheet
+ result, error = self.send("CSS.createStyleSheet", {"frameId": self._ensure_frame_id()})
+ if error:
+ raise RuntimeError(f"Error creating stylesheet: {error}")
+
+ style_sheet_id = result["styleSheetId"]
+
+ # Define the CSS rule for the page size
+ css_rule = f"""
+ @page {{
+ size: {width} {height};
+ margin: {marginTop} {marginRight} {marginBottom} {marginLeft};
+ }}
+ """
+
+ # Apply the CSS rule to the created stylesheet
+ result, error = self.send("CSS.setStyleSheetText", {"styleSheetId": style_sheet_id, "text": css_rule})
+
+ if error:
+ raise RuntimeError(f"Error setting stylesheet text: {error}")
+
+ self.send("CSS.disable")
+ self.send("DOM.disable")
+
+ def generate_pdf(self, wait_for_pdf=True, raw=False):
+ self.add_page_size_css()
+ if not wait_for_pdf:
+ self.wait_for_pdf = self.send("Page.printToPDF", self.options, return_future=True)
+ return
+
+ result, error = self.send("Page.printToPDF", self.options)
+ if error:
+ raise RuntimeError(f"Error generating PDF: {error}")
+ if "stream" not in result:
+ raise ValueError("Stream handle not returned from Page.printToPDF")
+ return self.get_pdf_from_stream(result["stream"], raw)
+
+ def get_pdf_stream_id(self):
+ # wait for task to complete
+ self.session.wait_for_event(self.wait_for_pdf)
+ # wait for event to complete
+ task = self.wait_for_pdf.result()
+ future = task.result()
+ stream_id = future["result"]["stream"]
+ return stream_id
+
+ def get_pdf_from_stream(self, stream_id, raw=False):
+ from io import BytesIO
+
+ from pypdf import PdfReader
+
+ pdf_data = b""
+ offset = 0
+ while True:
+ chunk_result, error = self.send("IO.read", {"handle": stream_id, "offset": offset, "size": 4096})
+ if error:
+ raise RuntimeError(f"Error reading PDF chunk: {error}")
+ chunk_data = chunk_result["data"]
+ # we don't use base64Encode option but added check anyway as it is one of the valid options.
+ if chunk_result.get("base64Encoded", False):
+ chunk_data = base64.b64decode(chunk_data)
+ pdf_data += chunk_data
+ offset += len(chunk_data)
+ if chunk_result.get("eof", False):
+ break
+
+ _result, error = self.send("IO.close", {"handle": stream_id})
+ if error:
+ raise RuntimeError(f"Error closing PDF stream: {error}")
+
+ if raw:
+ return pdf_data
+
+ return PdfReader(BytesIO(pdf_data))
+
+ def close(self):
+ self.session.send("Fetch.disable")
+ _result, error = self.send("Target.closeTarget", {"targetId": self.target_id})
+ if error:
+ raise RuntimeError(f"Error closing target: {error}")
diff --git a/frappe/utils/pdf_generator/pdf_merge.py b/frappe/utils/pdf_generator/pdf_merge.py
new file mode 100644
index 0000000000..7081498ac6
--- /dev/null
+++ b/frappe/utils/pdf_generator/pdf_merge.py
@@ -0,0 +1,117 @@
+class PDFTransformer:
+ def __init__(self, browser):
+ self.browser = browser
+ self.body_pdf = browser.body_pdf
+ self.is_print_designer = browser.is_print_designer
+ self._set_header_pdf()
+ self._set_footer_pdf()
+ if not self.header_pdf and not self.footer_pdf:
+ return
+ self.no_of_pages = len(self.body_pdf.pages)
+ self.encrypt_password = self.browser.options.get("password", None)
+ # if not header / footer then return body pdf
+
+ def _set_header_pdf(self):
+ self.header_pdf = None
+ if hasattr(self.browser, "header_pdf"):
+ self.header_pdf = self.browser.header_pdf
+ self.is_header_dynamic = self.browser.is_header_dynamic
+
+ def _set_footer_pdf(self):
+ self.footer_pdf = None
+ if hasattr(self.browser, "footer_pdf"):
+ self.footer_pdf = self.browser.footer_pdf
+ self.is_footer_dynamic = self.browser.is_footer_dynamic
+
+ def transform_pdf(self, output=None):
+ from pypdf import PdfWriter
+
+ header = self.header_pdf
+ body = self.body_pdf
+ footer = self.footer_pdf
+
+ if not header and not footer:
+ return body
+
+ body_height = body.pages[0].mediabox.top
+ body_transform = header_height = footer_height = header_body_top = 0
+
+ if footer:
+ footer_height = footer.pages[0].mediabox.top
+ body_transform = footer_height
+
+ if header:
+ header_height = header.pages[0].mediabox.top
+ header_transform = body_height + footer_height
+ header_body_top = header_height + body_height + footer_height
+
+ if header and not self.is_header_dynamic:
+ for h in header.pages:
+ self._transform(h, header_body_top, header_transform)
+
+ for p in body.pages:
+ if header_body_top:
+ self._transform(p, header_body_top, body_transform)
+ if header:
+ if self.is_header_dynamic:
+ p.merge_page(
+ self._transform(header.pages[p.page_number], header_body_top, header_transform)
+ )
+ elif self.is_print_designer:
+ if p.page_number == 0:
+ p.merge_page(header.pages[0])
+ elif p.page_number == self.no_of_pages - 1:
+ p.merge_page(header.pages[3])
+ elif p.page_number % 2 == 0:
+ p.merge_page(header.pages[2])
+ else:
+ p.merge_page(header.pages[1])
+ else:
+ p.merge_page(header.pages[0])
+
+ if footer:
+ if self.is_footer_dynamic:
+ p.merge_page(footer.pages[p.page_number])
+ elif self.is_print_designer:
+ if p.page_number == 0:
+ p.merge_page(footer.pages[0])
+ elif p.page_number == self.no_of_pages - 1:
+ p.merge_page(footer.pages[3])
+ elif p.page_number % 2 == 0:
+ p.merge_page(footer.pages[2])
+ else:
+ p.merge_page(footer.pages[1])
+ else:
+ p.merge_page(footer.pages[0])
+
+ if output:
+ output.append_pages_from_reader(body)
+ return output
+
+ writer = PdfWriter()
+ writer.append_pages_from_reader(body)
+ if self.encrypt_password:
+ writer.encrypt(self.encrypt_password)
+
+ return self.get_file_data_from_writer(writer)
+
+ def _transform(self, page, page_top, ty):
+ from pypdf import PdfWriter, Transformation
+
+ transform = Transformation().translate(ty=ty)
+ page.mediabox.upper_right = (page.mediabox.right, page_top)
+ page.add_transformation(transform)
+ return page
+
+ def get_file_data_from_writer(self, writer_obj):
+ from io import BytesIO
+
+ # https://docs.python.org/3/library/io.html
+ stream = BytesIO()
+ writer_obj.write(stream)
+
+ # Change the stream position to start of the stream
+ stream.seek(0)
+
+ # Read up to size bytes from the object and return them
+ return stream.read()
diff --git a/frappe/utils/print_format.py b/frappe/utils/print_format.py
index 379c037e03..59a94be98c 100644
--- a/frappe/utils/print_format.py
+++ b/frappe/utils/print_format.py
@@ -229,6 +229,9 @@ def download_pdf(
letterhead=None,
pdf_generator: Literal["wkhtmltopdf", "chrome"] | None = None,
):
+ if pdf_generator is None:
+ pdf_generator = "wkhtmltopdf"
+
doc = doc or frappe.get_doc(doctype, name)
validate_print_permission(doc)
diff --git a/frappe/utils/print_utils.py b/frappe/utils/print_utils.py
index f32f38771a..6efa3a119c 100644
--- a/frappe/utils/print_utils.py
+++ b/frappe/utils/print_utils.py
@@ -1,8 +1,18 @@
+import os
+import re
from typing import Literal
+import click
+
import frappe
from frappe.utils.data import cint, cstr
+EXECUTABLE_PATHS = {
+ "linux": ["chrome-linux", "headless_shell"],
+ "darwin": ["chrome-mac", "headless_shell"],
+ "windows": ["chrome-win", "headless_shell.exe"],
+}
+
def get_print(
doctype=None,
@@ -144,3 +154,336 @@ def attach_print(
file_name = cstr(file_name).replace(" ", "").replace("/", "-") + ext
return {"fname": file_name, "fcontent": content}
+
+
+def setup_chromium():
+ """Setup Chromium at the bench level."""
+ # Load Chromium version from common_site_config.json or use default
+
+ try:
+ executable = find_or_download_chromium_executable()
+ click.echo(f"Chromium is already set up at {executable}")
+ except Exception as e:
+ click.echo(f"Failed to setup Chromium: {e}")
+ raise RuntimeError(f"Failed to setup Chromium: {e}")
+ return executable
+
+
+def find_or_download_chromium_executable():
+ """Finds the Chromium executable or downloads if not found."""
+ import platform
+ from pathlib import Path
+
+ bench_path = frappe.utils.get_bench_path()
+ """Determine the path to the Chromium executable."""
+ chromium_dir = os.path.join(bench_path, "chromium")
+
+ platform_name = platform.system().lower()
+
+ if platform_name not in ["linux", "darwin", "windows"]:
+ click.echo(f"Unsupported platform: {platform_name}")
+
+ executable_name = EXECUTABLE_PATHS.get(platform_name)
+
+ # Construct the full path to the executable
+ exec_path = Path(chromium_dir).joinpath(*executable_name)
+ if not exec_path.exists():
+ click.echo("Chromium is not available. downloading...")
+ download_chromium()
+
+ if not exec_path.exists():
+ click.echo("Error while downloading chrome")
+
+ return str(exec_path)
+
+
+def download_chromium():
+ import platform
+ import shutil
+ import zipfile
+
+ import requests
+
+ bench_path = frappe.utils.get_bench_path()
+ """Download and extract Chromium for the specific version at the bench level."""
+ chromium_dir = os.path.join(bench_path, "chromium")
+
+ # Remove old Chromium directory if it exists
+ if os.path.exists(chromium_dir):
+ click.echo("Removing old Chromium directory...")
+ shutil.rmtree(chromium_dir, ignore_errors=True)
+
+ os.makedirs(chromium_dir, exist_ok=True)
+
+ download_url = get_chromium_download_url()
+ file_name = os.path.basename(download_url)
+ zip_path = os.path.join(chromium_dir, file_name)
+
+ try:
+ click.echo(f"Downloading Chromium from {download_url}...")
+ # playwright's requires a user agent
+ headers = {"User-Agent": "Wget/1.21.1"}
+ with requests.get(download_url, stream=True, timeout=(10, 60), headers=headers) as r:
+ r.raise_for_status() # Raise an error for bad status codes
+ total_size = int(r.headers.get("content-length", 0)) # Get total file size
+ bar = click.progressbar(length=total_size, label="Downloading Chromium")
+ with open(zip_path, "wb") as f:
+ for chunk in r.iter_content(chunk_size=65536):
+ f.write(chunk)
+ bar.update(len(chunk))
+
+ click.echo("Extracting Chromium...")
+ with zipfile.ZipFile(zip_path, "r") as zip_ref:
+ zip_ref.extractall(chromium_dir)
+
+ if os.path.exists(zip_path):
+ os.remove(zip_path)
+
+ # There should be only one directory
+ # Ensure the correct directory is renamed
+ extracted = os.listdir(chromium_dir)[0]
+ executable_path = EXECUTABLE_PATHS[platform.system().lower()]
+ chrome_folder_name = executable_path[0]
+
+ if extracted != chrome_folder_name:
+ extracted_dir = os.path.join(chromium_dir, extracted)
+ renamed_dir = os.path.join(chromium_dir, chrome_folder_name)
+ if os.path.exists(extracted_dir):
+ click.echo(f"Renaming {extracted_dir} to {renamed_dir}")
+ os.rename(extracted_dir, renamed_dir)
+ else:
+ raise RuntimeError(f"Failed to rename extracted directory. Expected {chrome_folder_name}.")
+ if os.path.exists(renamed_dir):
+ executable_shell = os.path.join(renamed_dir, "chrome-headless-shell")
+ if os.path.exists(executable_shell):
+ os.rename(executable_shell, os.path.join(renamed_dir, "headless_shell"))
+ else:
+ raise RuntimeError("Failed to rename executable. Expected chrome-headless-shell.")
+ # Make the `headless_shell` executable
+ exec_path = os.path.join(renamed_dir, executable_path[1])
+ make_chromium_executable(exec_path)
+
+ click.echo(f"Chromium is ready to use at: {chromium_dir}")
+ except requests.Timeout:
+ click.echo("Download timed out. Check your internet connection.")
+ raise RuntimeError("Download timed out.")
+ except requests.ConnectionError:
+ click.echo("Failed to connect to Chromium download server.")
+ raise RuntimeError("Connection error.")
+ except requests.RequestException as e:
+ click.echo(f"Failed to download Chromium: {e}")
+ raise RuntimeError(f"Failed to download Chromium: {e}")
+ except zipfile.BadZipFile as e:
+ click.echo(f"Failed to extract Chromium: {e}")
+ raise RuntimeError(f"Failed to extract Chromium: {e}")
+
+
+def get_chromium_download_url():
+ # Avoid this unless it is going to run on a single type of platform and you have the correct binary hosted.
+ common_config = frappe.get_common_site_config()
+
+ chrome_download_url = common_config.get("chromium_download_url", None)
+
+ if chrome_download_url:
+ return chrome_download_url
+
+ """
+ We are going to use chrome-for-testing builds but unfortunately it doesn't have linux arm64 https://github.com/GoogleChromeLabs/chrome-for-testing/issues/1
+ so we will use playwright's fallback builds for linux arm64
+ TODO: we will also use the fallback builds for windows arm
+ https://community.arm.com/arm-community-blogs/b/tools-software-ides-blog/posts/native-chromium-builds-windows-on-arm
+ """
+ """
+ To find the CHROME_VERSION AND CHROME_FALLBACK_VERSION, follow these steps:
+ 1. Visit the GitHub Actions page for Playwright: https://github.com/microsoft/playwright/actions/workflows/roll_browser_into_playwright.yml
+ 2. Open the latest job run.
+ 3. Navigate to the "Roll to New Browser Version" step.
+ 4. In the logs, look for a line similar to:
+ Downloading Chromium 133.0.6943.16 (playwright build v1155)
+ Here, the first number (e.g., 133.0.6943.16) is the CHROME_VERSION, and the second number (e.g., 1155) is the CHROME_FALLBACK_VERSION.
+ """
+ # Using Google's chrome-for-testing-public builds for most platforms. (close to end user experience)
+ # For Linux ARM64, we use Playwright's Chromium builds due to the lack of official support.
+
+ download_path = {
+ "linux64": "%s/linux64/chrome-headless-shell-linux64.zip",
+ "mac-arm64": "%s/mac-arm64/chrome-headless-shell-mac-arm64.zip",
+ "mac-x64": "%s/mac-x64/chrome-headless-shell-mac-x64.zip",
+ "win32": "%s/win32/chrome-headless-shell-win32.zip",
+ "win64": "%s/win64/chrome-headless-shell-win64.zip",
+ }
+ linux_arm_download_path = {
+ "ubuntu20.04-arm64": "%s/chromium-headless-shell-linux-arm64.zip",
+ "ubuntu22.04-arm64": "%s/chromium-headless-shell-linux-arm64.zip",
+ "ubuntu24.04-arm64": "%s/chromium-headless-shell-linux-arm64.zip",
+ "debian11-arm64": "%s/chromium-headless-shell-linux-arm64.zip",
+ "debian12-arm64": "%s/chromium-headless-shell-linux-arm64.zip",
+ }
+
+ platform_key = calculate_platform()
+
+ version = "133.0.6943.35"
+ playwright_build_version = "1157"
+
+ base_url = "https://storage.googleapis.com/chrome-for-testing-public/"
+ playwright_base_url = "https://cdn.playwright.dev/dbazure/download/playwright/builds/chromium/"
+
+ # Overwrite with values from common_site_config.json ( escape hatch )
+ version = common_config.get("chromium_version", version)
+ playwright_build_version = common_config.get("playwright_chromium_version", playwright_build_version)
+ # make sure that you have all required flavours at correct urls
+ base_url = common_config.get("chromium_download_base_url", base_url)
+ playwright_base_url = common_config.get("playwright_chromium_download_base_url", playwright_base_url)
+
+ if platform_key in download_path:
+ relative_path = download_path[platform_key]
+ elif platform_key in linux_arm_download_path:
+ version = playwright_build_version
+ base_url = playwright_base_url
+ relative_path = linux_arm_download_path[platform_key]
+ else:
+ frappe.throw(
+ f"No download path configured or Chromium download not available for platform: {platform_key}"
+ )
+
+ return f"{base_url}{relative_path % version}"
+
+
+def make_chromium_executable(executable):
+ """Make the Chromium executable."""
+ if os.path.exists(executable):
+ # check if the file is executable
+ if os.access(executable, os.X_OK):
+ click.echo(f"Chromium executable is already executable: {executable}")
+ return
+ click.echo(f"Making Chromium executable: {executable}")
+ os.chmod(executable, 0o755) # Set executable permissions
+ click.echo(f"Chromium executable permissions set: {executable}")
+ else:
+ raise RuntimeError(f"Chromium executable not found: {executable}.")
+
+
+def calculate_platform():
+ """
+ Determines the host platform and returns it as a string.
+ Includes logic for Linux ARM, Linux x64, macOS (Intel and ARM), and Windows (32-bit and 64-bit).
+
+ Returns:
+ str: The detected platform string (e.g., 'linux64', 'mac-arm64', etc.).
+ """
+ import platform
+
+ system = platform.system().lower()
+ arch = platform.machine().lower()
+
+ # Handle Linux ARM-specific logic
+ if system == "linux" and arch == "aarch64":
+ distro_info = get_linux_distribution_info()
+ distro_id = distro_info.get("id", "")
+ version = distro_info.get("version", "")
+ major_version = int(version.split(".")[0]) if version else 0
+
+ if distro_id == "ubuntu":
+ if major_version < 20:
+ return "ubuntu18.04-arm64"
+ if major_version < 22:
+ return "ubuntu20.04-arm64"
+ if major_version < 24:
+ return "ubuntu22.04-arm64"
+ if major_version < 26:
+ return "ubuntu24.04-arm64"
+ return ""
+
+ if distro_id in ["debian", "raspbian"]:
+ if major_version < 11:
+ return "debian10-arm64"
+ if major_version < 12:
+ return "debian11-arm64"
+ return "debian12-arm64"
+ return ""
+
+ # Handle other platforms
+ elif system == "linux" and arch == "x86_64":
+ return "linux64"
+ elif system == "darwin" and arch == "arm64":
+ return "mac-arm64"
+ elif system == "darwin" and arch == "x86_64":
+ return "mac-x64"
+ elif system == "windows" and arch == "x86":
+ return "win32"
+ elif system == "windows" and arch == "x86_64":
+ return "win64"
+
+ return ""
+
+
+def get_linux_distribution_info():
+ # not tested
+ """Retrieve Linux distribution information using the `distro` library."""
+ import distro
+
+ if not distro:
+ return {"id": "", "version": ""}
+
+ return {"id": distro.id().lower(), "version": distro.version()}
+
+
+def parse_float_and_unit(input_text, default_unit="px"):
+ if isinstance(input_text, int | float):
+ return {"value": input_text, "unit": default_unit}
+ if not isinstance(input_text, str):
+ return
+
+ number = float(re.search(r"[+-]?([0-9]*[.])?[0-9]+", input_text).group())
+ valid_units = [r"px", r"mm", r"cm", r"in"]
+ unit = [match.group() for rx in valid_units if (match := re.search(rx, input_text))]
+
+ return {"value": number, "unit": unit[0] if len(unit) == 1 else default_unit}
+
+
+def convert_uom(
+ number: float,
+ from_uom: Literal["px", "mm", "cm", "in"] = "px",
+ to_uom: Literal["px", "mm", "cm", "in"] = "px",
+ only_number: bool = False,
+) -> float:
+ unit_values = {
+ "px": 1,
+ "mm": 3.7795275591,
+ "cm": 37.795275591,
+ "in": 96,
+ }
+ from_px = (
+ {
+ "to_px": 1,
+ "to_mm": unit_values["px"] / unit_values["mm"],
+ "to_cm": unit_values["px"] / unit_values["cm"],
+ "to_in": unit_values["px"] / unit_values["in"],
+ },
+ )
+ from_mm = (
+ {
+ "to_mm": 1,
+ "to_px": unit_values["mm"] / unit_values["px"],
+ "to_cm": unit_values["mm"] / unit_values["cm"],
+ "to_in": unit_values["mm"] / unit_values["in"],
+ },
+ )
+ from_cm = (
+ {
+ "to_cm": 1,
+ "to_px": unit_values["cm"] / unit_values["px"],
+ "to_mm": unit_values["cm"] / unit_values["mm"],
+ "to_in": unit_values["cm"] / unit_values["in"],
+ },
+ )
+ from_in = {
+ "to_in": 1,
+ "to_px": unit_values["in"] / unit_values["px"],
+ "to_mm": unit_values["in"] / unit_values["mm"],
+ "to_cm": unit_values["in"] / unit_values["cm"],
+ }
+ converstion_factor = ({"from_px": from_px, "from_mm": from_mm, "from_cm": from_cm, "from_in": from_in},)
+ if only_number:
+ return round(number * converstion_factor[0][f"from_{from_uom}"][0][f"to_{to_uom}"], 3)
+ return f"{round(number * converstion_factor[0][f'from_{from_uom}'][0][f'to_{to_uom}'], 3)}{to_uom}"
diff --git a/pyproject.toml b/pyproject.toml
index 2984487c23..7ea327290f 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -90,6 +90,8 @@ dependencies = [
"posthog~=5.0.0",
"vobject~=0.9.9",
"pycountry~=24.6.1",
+
+ "websockets"
]
[project.urls]