Merge pull request #34191 from iamejaaz/chrome-pdf

feat: Chrome PDF generator
This commit is contained in:
Ejaaz Khan 2025-10-14 17:31:49 +05:30 committed by GitHub
commit a4ba3c1940
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 1850 additions and 7 deletions

View file

@ -1027,6 +1027,13 @@ def list_sites(context: CliCtxObj, output_json=False):
click.echo("No sites found")
@click.command("setup-chrome")
def setup_chrome():
from frappe.utils.print_utils import setup_chromium
setup_chromium()
commands = [
build,
clear_cache,
@ -1059,4 +1066,5 @@ commands = [
add_to_email_queue,
rebuild_global_search,
list_sites,
setup_chrome,
]

View file

@ -89,7 +89,7 @@ on_logout = "frappe.core.doctype.session_default_settings.session_default_settin
pdf_header_html = "frappe.utils.pdf.pdf_header_html"
pdf_body_html = "frappe.utils.pdf.pdf_body_html"
pdf_footer_html = "frappe.utils.pdf.pdf_footer_html"
pdf_generator = "frappe.utils.pdf.get_chrome_pdf"
# permissions
permission_query_conditions = {

View file

@ -268,7 +268,7 @@
"fieldname": "pdf_generator",
"fieldtype": "Select",
"label": "PDF Generator",
"options": "wkhtmltopdf"
"options": "wkhtmltopdf\nchrome"
},
{
"default": "DocType",
@ -292,7 +292,7 @@
"icon": "fa fa-print",
"idx": 1,
"links": [],
"modified": "2025-09-16 11:20:20.151669",
"modified": "2025-09-23 10:39:51.123539",
"modified_by": "Administrator",
"module": "Printing",
"name": "Print Format",

View file

@ -40,7 +40,7 @@ class PrintFormat(Document):
page_number: DF.Literal[
"Hide", "Top Left", "Top Center", "Top Right", "Bottom Left", "Bottom Center", "Bottom Right"
]
pdf_generator: DF.Literal["wkhtmltopdf"]
pdf_generator: DF.Literal["wkhtmltopdf", "chrome"]
print_format_builder: DF.Check
print_format_builder_beta: DF.Check
print_format_for: DF.Literal["DocType", "Report"]

View file

@ -680,11 +680,15 @@ frappe.ui.form.PrintView = class {
}
} else {
this.is_wkhtmltopdf_valid();
this.render_page("/api/method/frappe.utils.print_format.download_pdf?");
this.render_page(
"/api/method/frappe.utils.print_format.download_pdf?",
false,
print_format?.pdf_generator
);
}
}
render_page(method, printit = false) {
render_page(method, printit = false, pdf_generator = "wkhtmltopdf") {
let w = window.open(
frappe.urllib.get_full_url(
method +
@ -701,7 +705,9 @@ frappe.ui.form.PrintView = class {
encodeURIComponent(this.get_letterhead()) +
"&settings=" +
encodeURIComponent(JSON.stringify(this.additional_settings)) +
(this.lang_code ? "&_lang=" + this.lang_code : "")
(this.lang_code ? "&_lang=" + this.lang_code : "") +
"&pdf_generator=" +
encodeURIComponent(pdf_generator)
)
);
if (!w) {

View file

@ -0,0 +1,49 @@
<!DOCTYPE html>
<html lang={{ lang }} dir={{ layout_direction }}>
<head>
<meta charset="utf-8">
{% for tag in head -%}
{{ tag | string }}
{%- endfor %}
<style>
body {
margin: 0 !important;
border: 0 !important;
padding-top: 1mm !important;
}
.letter-head,
.letter-head-footer {
margin-top: -12mm !important;
}
/* Dont show explicit links for <a> tags */
@media print {
/* padding is added to simulate old wkhtmltopdf format prints */
.wrapper {
box-sizing: border-box;
padding: 1mm 0 1mm !important;
page-break-after: always !important;
}
[document-status] {
margin-bottom: 0 !important;
}
a[href]:after {
content: none;
}
}
</style>
{% for tag in styles -%}
{{ tag | string }}
{%- endfor %}
</head>
<body>
<div class="print-format">
<div class="wrapper">
{% for tag in content -%}
{{ tag | string }}
{%- endfor %}
</div>
</div>
</body>
</html>

View file

@ -131,6 +131,37 @@ def get_pdf(html, options=None, output: PdfWriter | None = None):
return filedata
def measure_time(func):
import time
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
print(f"Function {func.__name__} took {end_time - start_time:.4f} seconds")
return result
return wrapper
@measure_time
def get_chrome_pdf(print_format, html, options, output, pdf_generator=None):
from frappe.utils.pdf_generator.browser import Browser
from frappe.utils.pdf_generator.chrome_pdf_generator import ChromePDFGenerator
from frappe.utils.pdf_generator.pdf_merge import PDFTransformer
if pdf_generator != "chrome":
# Use the default pdf generator
return
# scrubbing url to expand url is not required as we have set url.
# also, planning to remove network requests anyway 🤞
generator = ChromePDFGenerator()
browser = Browser(generator, print_format, html, options)
transformer = PDFTransformer(browser)
# transforms and merges header, footer into body pdf and returns merged pdf
return transformer.transform_pdf(output=output)
def get_file_data_from_writer(writer_obj):
# https://docs.python.org/3/library/io.html
stream = io.BytesIO()

View file

@ -0,0 +1,465 @@
from typing import ClassVar
from bs4 import BeautifulSoup
import frappe
from frappe.utils.print_utils import convert_uom, parse_float_and_unit
class Browser:
def __init__(self, generator, print_format, html, options):
self.is_print_designer = frappe.get_cached_value("Print Format", print_format, "print_designer")
self.browserID = frappe.utils.random_string(10)
generator.add_browser(self.browserID)
# sets soup from html
self.set_html(html)
# sets wkhtmltopdf options
self.set_options(options)
# start cdp connection and create browser context ( kind of like new window / incognito mode)
self.open(generator)
# opens header and footer pages and sets content ( not waiting for it to load)
self.prepare_header_footer()
# opens body page and sets content and waits for it to finshing load
self.setup_body_page()
# prepare options as per chrome for pdf
self.prepare_options_for_pdf()
# generate header and footer pages if they are not dynamic ( first, odd, even, last)
self.update_header_footer_page_pd()
# if header and footer are not dynamic start generating pdf for them (non-blocking)
self.try_async_header_footer_pdf()
# now wait for page to load as we need DOM to generate pdf
self.body_page.wait_for_set_content()
self.body_pdf = self.body_page.generate_pdf(raw=not self.header_page and not self.footer_page)
self.body_page.close()
self.update_header_footer_page()
if self.header_page:
if not self.is_header_dynamic:
self.header_pdf = self.header_page.get_pdf_from_stream(self.header_page.get_pdf_stream_id())
else:
self.header_pdf = self.header_page.generate_pdf()
self.header_page.close()
if self.footer_page:
if not self.is_footer_dynamic:
self.footer_pdf = self.footer_page.get_pdf_from_stream(self.footer_page.get_pdf_stream_id())
else:
self.footer_pdf = self.footer_page.generate_pdf()
self.footer_page.close()
self.close()
generator.remove_browser(self.browserID)
def open(self, generator):
from frappe.utils.pdf_generator.cdp_connection import CDPSocketClient
# checking because if we share browser accross request _devtools_url will already be set for subsequent requests.
if not generator._devtools_url:
generator._set_devtools_url()
# start the CDP websocket connection to browser
self.session = CDPSocketClient(generator._devtools_url)
self.session.connect()
self.create_browser_context()
def create_browser_context(self):
# create browser context
result, error = self.session.send("Target.createBrowserContext", {"disposeOnDetach": True})
if error:
frappe.log_error(title="Error creating browser context:", message=f"{error}")
self.browser_context_id = result["browserContextId"]
def set_html(self, html):
self.soup = BeautifulSoup(html, "html5lib")
def set_options(self, options):
self.options = options
def new_page(self, page_type):
"""
# create a new page in the browser inside browser context
----
TODO: Implement Deterministic rendering for headless-chrome via DevTools Protocol ( waiting for macos support )
https://docs.google.com/document/d/1PppegrpXhOzKKAuNlP6XOEnviXFGUiX2hop00Cxcv4o/edit?tab=t.0#bookmark=id.dukbomwxpb3j
NOTE: In theory this will make it faster but more importantly use less cpu, ram etc.
"""
from frappe.utils.pdf_generator.page import Page
page = Page(self.session, self.browser_context_id, page_type)
page.is_print_designer = self.is_print_designer
return page
def setup_body_page(self):
self.body_page = self.new_page("body")
self.body_page.set_tab_url(frappe.request.host_url)
self.body_page.wait_for_navigate()
self.body_page.set_content(str(self.soup))
def close_page(self, type):
page = getattr(self, f"{type}_page")
page.close()
def is_page_no_used(self, soup):
# Check if any of the classes exist
classes_to_check = [
"page",
"frompage",
"topage",
"page_info_page",
"page_info_frompage",
"page_info_topage",
]
# Loop through the classes to check
for class_name in classes_to_check:
if soup.find(class_=class_name): # Check if any element with the class is found
return True # Return True if class is found
return False
def prepare_header_footer(self):
# code is structured like this to improve performance by running commands in chrome as soon as possible.
soup = self.soup
options = self.options
# open header and footer pages
self._open_header_footer_pages()
# get tags to pass to header template.
head = soup.find("head").contents
styles = soup.find_all("style")
# set header and footer content ( not waiting for it to load yet).
if self.header_page:
self.header_page.wait_for_navigate()
self.header_page.set_content(
self.get_rendered_header_footer(self.header_content, "header", head, styles, css=[])
)
if self.footer_page:
self.footer_page.wait_for_navigate()
self.footer_page.set_content(
self.get_rendered_header_footer(self.footer_content, "footer", head, styles, css=[])
)
if self.header_page:
self.header_page.wait_for_set_content()
self.header_height = self.header_page.get_element_height()
self.is_header_dynamic = self.is_page_no_used(self.header_content)
del self.header_content
else:
# bad implicit setting of margin #backwards-compatibility
options["margin-top"] = "15mm"
if self.footer_page:
self.footer_page.wait_for_set_content()
self.footer_height = self.footer_page.get_element_height()
self.is_footer_dynamic = self.is_page_no_used(self.footer_content)
del self.footer_content
else:
# bad implicit setting of margin #backwards-compatibility
options["margin-bottom"] = "15mm"
# Remove instances of them from main content for render_template
for html_id in ["header-html", "footer-html"]:
for tag in soup.find_all(id=html_id):
tag.extract()
def try_async_header_footer_pdf(self):
if self.header_page and not self.is_header_dynamic:
self.header_page.generate_pdf(wait_for_pdf=False)
if self.footer_page and not self.is_footer_dynamic:
self.footer_page.generate_pdf(wait_for_pdf=False)
def _get_converted_num(self, num_str, unit="px"):
parsed = parse_float_and_unit(num_str)
if parsed:
return convert_uom(parsed["value"], parsed["unit"], unit, only_number=True)
def _parse_pdf_options_from_html(self):
from frappe.utils.pdf import get_print_format_styles
soup: BeautifulSoup = self.soup
options = {}
print_format_css = get_print_format_styles(soup)
attrs = (
"margin-top",
"margin-bottom",
"margin-left",
"margin-right",
"page-size",
"header-spacing",
"orientation",
"page-width",
"page-height",
)
options |= {style.name: style.value for style in print_format_css if style.name in attrs}
self.options.update(options)
def _set_default_page_size(self):
options = self.options
pdf_page_size = (
options.get("page-size") or frappe.db.get_single_value("Print Settings", "pdf_page_size") or "A4"
)
if pdf_page_size == "Custom":
options["page-height"] = options.get("page-height") or frappe.db.get_single_value(
"Print Settings", "pdf_page_height"
)
options["page-width"] = options.get("page-width") or frappe.db.get_single_value(
"Print Settings", "pdf_page_width"
)
else:
options["page-size"] = pdf_page_size
def prepare_options_for_pdf(self):
self._parse_pdf_options_from_html()
self._set_default_page_size()
options = self.options
updated_options = {
"scale": 1,
"printBackground": True,
"transferMode": "ReturnAsStream",
"marginTop": 0,
"marginBottom": 0,
"marginLeft": 0,
"marginRight": 0,
"landscape": options.get("orientation", "Portrait") == "Landscape",
"preferCSSPageSize": False,
"pageRanges": options.get("page-ranges", ""),
# Experimental
"generateTaggedPDF": options.get("generate-tagged-pdf", False),
"generateOutline": options.get("generate-outline", False),
}
# bad implicit setting of margin #backwards-compatibility
if not self.is_print_designer:
if not options.get("margin-right"):
options["margin-right"] = "15mm"
if not options.get("margin-left"):
options["margin-left"] = "15mm"
if not options.get("page-height") or not options.get("page-width"):
if not (page_size := self.options.get("page-size")):
raise frappe.ValidationError("Page size is required")
if page_size == "CUSTOM":
raise frappe.ValidationError("Custom page size requires page-height and page-width")
size = PageSize.get(page_size)
if not size:
raise frappe.ValidationError("Invalid page size")
options["page-height"] = convert_uom(size["height"], "mm", "px", only_number=True)
options["page-width"] = convert_uom(size["width"], "mm", "px", only_number=True)
if isinstance(options["page-height"], str):
options["page-height"] = self._get_converted_num(options["page-height"])
if isinstance(options["page-width"], str):
options["page-width"] = self._get_converted_num(options["page-width"])
updated_options["paperWidth"] = convert_uom(options["page-width"], "px", "in", only_number=True)
if options.get("margin-left"):
updated_options["marginLeft"] = convert_uom(
self._get_converted_num(options["margin-left"]), "px", "in", only_number=True
)
if options.get("margin-right"):
updated_options["marginRight"] = convert_uom(
self._get_converted_num(options["margin-right"]), "px", "in", only_number=True
)
# make copy of options to update them in header, body, footer.
self.body_page.options = updated_options.copy()
if self.header_page:
self.header_page.options = updated_options.copy()
if self.footer_page:
self.footer_page.options = updated_options.copy()
margin_top = self._get_converted_num(options.get("margin-top", 0))
margin_bottom = self._get_converted_num(options.get("margin-bottom", 0))
header_with_top_margin = 0
header_with_spacing_top_margin = 0
footer_with_bottom_margin = 0
footer_height = 0
if self.header_page:
header_with_top_margin = self.header_height + margin_top
header_spacing = options.get("header-spacing", 0)
header_with_spacing_top_margin = header_with_top_margin + header_spacing
self.header_page.options["paperHeight"] = (
convert_uom(header_with_spacing_top_margin, "px", "in", only_number=True)
if header_with_spacing_top_margin
else 0
)
margin_top = convert_uom(margin_top, "px", "in", only_number=True)
if self.header_page:
self.header_page.options["marginTop"] = margin_top
else:
self.body_page.options["marginTop"] = margin_top
if self.footer_page:
footer_height = self.footer_height
self.footer_page.options["paperHeight"] = (
convert_uom(footer_height, "px", "in", only_number=True) if footer_height else 0
)
footer_with_bottom_margin = self.footer_height + margin_bottom
margin_bottom = convert_uom(margin_bottom, "px", "in", only_number=True)
if self.footer_page:
self.footer_page.options["marginBottom"] = margin_bottom
else:
self.body_page.options["marginBottom"] = margin_bottom
body_height = options.get("page-height") - (
header_with_spacing_top_margin + footer_with_bottom_margin
)
"""
matching scale for some old formats is 1.46 #backwards-compatibility ( scale 1 is better in my opinion)
If we face issues in custom formats then only we should enable this.
"""
self.body_page.options["paperHeight"] = convert_uom(body_height, "px", "in", only_number=True)
def get_rendered_header_footer(self, content, type, head, styles, css):
from frappe.utils.pdf import toggle_visible_pdf
html_id = f"{type}-html"
content = content.extract()
toggle_visible_pdf(content)
id_map = {"header": "pdf_header_html", "footer": "pdf_footer_html"}
hook_func = frappe.get_hooks(id_map.get(type))
return frappe.call(
hook_func[-1],
soup=self.soup,
head=head,
content=content,
styles=styles,
html_id=html_id,
css=css,
path="templates/print_formats/chrome_pdf_header_footer.html",
)
def update_header_footer_page(self):
if not self.header_page and not self.footer_page:
return
total_pages = len(self.body_pdf.pages)
# function is added to html from update_page_no.js
if self.header_page:
if self.is_header_dynamic:
self.header_page.evaluate(
f"clone_and_update('{'#header-render-container' if self.is_print_designer else '.wrapper'}', {total_pages}, {1 if self.is_print_designer else 0}, 'Header', 1);",
await_promise=True,
)
if self.footer_page:
if self.is_footer_dynamic:
self.footer_page.evaluate(
f"clone_and_update('{'#footer-render-container' if self.is_print_designer else '.wrapper'}', {total_pages}, {1 if self.is_print_designer else 0}, 'Footer', 1);",
await_promise=True,
)
def update_header_footer_page_pd(self):
if not self.is_print_designer:
return
if not self.header_page and not self.footer_page:
return
# function is added to html from update_page_no.js
if self.header_page and not self.is_header_dynamic:
self.header_page.evaluate(
"clone_and_update('#header-render-container', 0, 1, 'Header', 0);",
await_promise=True,
)
if self.footer_page and not self.is_footer_dynamic:
self.footer_page.evaluate(
"clone_and_update('#footer-render-container', 0, 1, 'Footer', 0);",
await_promise=True,
)
def _open_header_footer_pages(self):
self.header_page = None
self.footer_page = None
# open new page for header/footer if they exist.
# It sends CDP command to the browser to open a new tab.
if header_content := self.soup.find(id="header-html"):
self.header_page = self.new_page("header")
self.header_page.set_tab_url(frappe.request.host_url)
if footer_content := self.soup.find(id="footer-html"):
self.footer_page = self.new_page("footer")
self.footer_page.set_tab_url(frappe.request.host_url)
self.header_content = header_content
self.footer_content = footer_content
def close(self):
self.session.disconnect()
class PageSize:
page_sizes: ClassVar[dict[str, tuple[int, int]]] = {
"A10": (26, 37),
"A1": (594, 841),
"A0": (841, 1189),
"A3": (297, 420),
"A2": (420, 594),
"A5": (148, 210),
"A4": (210, 297),
"A7": (74, 105),
"A6": (105, 148),
"A9": (37, 52),
"A8": (52, 74),
"B10": (44, 31),
"B1+": (1020, 720),
"B4": (353, 250),
"B5": (250, 176),
"B6": (176, 125),
"B7": (125, 88),
"B0": (1414, 1000),
"B1": (1000, 707),
"B2": (707, 500),
"B3": (500, 353),
"B2+": (720, 520),
"B8": (88, 62),
"B9": (62, 44),
"C10": (40, 28),
"C9": (57, 40),
"C8": (81, 57),
"C3": (458, 324),
"C2": (648, 458),
"C1": (917, 648),
"C0": (1297, 917),
"C7": (114, 81),
"C6": (162, 114),
"C5": (229, 162),
"C4": (324, 229),
"Legal": (216, 356),
"Junior Legal": (127, 203),
"Letter": (216, 279),
"Tabloid": (279, 432),
"Ledger": (432, 279),
"ANSI C": (432, 559),
"ANSI A (letter)": (216, 279),
"ANSI B (ledger & tabloid)": (279, 432),
"ANSI E": (864, 1118),
"ANSI D": (559, 864),
}
@classmethod
def get(cls, name):
if name in cls.page_sizes:
width, height = cls.page_sizes[name]
return {"width": width, "height": height}
else:
return None # Return None if the page size is not found

View file

@ -0,0 +1,179 @@
import asyncio
import websockets
import frappe
class CDPSocketClient:
"""
Manages WebSocket communications with Chrome DevTools Protocol.
Ensures robust error handling and consistent logging.
"""
def __init__(self, websocket_url):
self.websocket_url = websocket_url
self.connection = None
self.message_id = 0
self.pending_messages = {}
self.listeners = {}
self.listen_task = None
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
def connect(self):
"""Open the WebSocket connection and start listening for messages."""
self.loop.run_until_complete(self._connect())
self.listen_task = self.loop.create_task(self._listen())
async def _connect(self):
try:
self.connection = await websockets.connect(self.websocket_url)
except Exception:
frappe.log_error(title="Failed to connect to WebSocket:", message=f"{frappe.get_traceback()}")
raise
async def _listen(self):
try:
async for message in self.connection:
self._handle_message(frappe.json.loads(message))
except Exception:
frappe.log_error(title="WebSocket listening error:", message=f"{frappe.get_traceback()}")
def _handle_message(self, response):
method = response.get("method")
params = response.get("params", {})
session_id = response.get("sessionId")
target_id = params.get("targetId")
frame_id = params.get("frameId")
message_id = response.get("id")
composite_key = (method, session_id, target_id, frame_id)
# Handle responses with `id`
if message_id and message_id in self.pending_messages:
future = self.pending_messages.pop(message_id)
if composite_key in self.pending_messages:
self.pending_messages.pop(composite_key)
future.set_result(response)
# Handle responses without `id` using a composite key
elif method:
if composite_key in self.pending_messages:
# print("matched using composite_key", composite_key)
future = self.pending_messages.pop(composite_key)
future.set_result(response)
if method in self.listeners:
for callback, future, filters in self.listeners[method]:
# added not filters["key"] might cause cross talk between different sessions
if (
(not session_id or not filters["sessionId"] or filters["sessionId"] == session_id)
and (not target_id or not filters["targetId"] or filters["targetId"] == target_id)
and (not frame_id or not filters["frameId"] or filters["frameId"] == frame_id)
):
callback(future, response)
def disconnect(self):
try:
if self.listen_task:
self.listen_task.cancel()
self.loop.run_until_complete(self._disconnect())
# Cancel all pending tasks before stopping the loop was causing degrading performance over time to not cancelled properly
pending_tasks = [task for task in asyncio.all_tasks(self.loop) if not task.done()]
for task in pending_tasks:
task.cancel()
try:
self.loop.run_until_complete(task) # Ensure tasks finish before loop stops
except asyncio.CancelledError:
pass # Ignore cancellation errors
except Exception:
frappe.log_error(title="Error while disconnecting:", message=f"{frappe.get_traceback()}")
raise
async def _disconnect(self):
try:
if self.connection and not self.connection.closed:
await self.connection.close()
self.connection = None
except Exception:
frappe.log_error(
title="Error during WebSocket disconnection:", message=f"{frappe.get_traceback()}"
)
def send(self, method, params=None, session_id=None, return_future=False):
if return_future:
return asyncio.ensure_future(
self._send(method, params, session_id, wait_future_fulfill=False), loop=self.loop
)
future = self.loop.run_until_complete(self._send(method, params, session_id))
return self._destructure_response(future.result())
async def _send(self, method, params=None, session_id=None, wait_future_fulfill=True):
self.message_id += 1
message_id = self.message_id
message = {
"id": message_id,
"method": method,
"params": params or {},
}
if session_id:
message["sessionId"] = session_id
if self.connection is None:
raise RuntimeError("WebSocket connection is not open.")
future = asyncio.Future()
self.pending_messages[message_id] = future
# Dynamically create the composite key
if any(
[
method,
session_id,
params.get("targetId") if params else None,
params.get("frameId") if params else None,
]
):
composite_key = (
method,
session_id,
params.get("targetId") if params else None,
params.get("frameId") if params else None,
)
self.pending_messages[composite_key] = future
await self.connection.send(frappe.json.dumps(message))
if wait_future_fulfill:
await future
return future
def _destructure_response(self, response):
"""Destructure the response to extract useful information."""
result = response.get("result", None)
error = response.get("error", None)
return result, error
def start_listener(self, method, callback, session_id=None, target_id=None, frame_id=None):
"""Register a listener for a specific CDP event with optional filtering."""
if method not in self.listeners:
self.listeners[method] = []
future = self.loop.create_future()
event = (callback, future, {"sessionId": session_id, "targetId": target_id, "frameId": frame_id})
if event not in self.listeners[method]:
self.listeners[method].append(event)
return event
def wait_for_event(self, event, timeout=3):
if type(event) is tuple:
event = event[1]
try:
self.loop.run_until_complete(asyncio.wait_for(event, timeout))
except asyncio.TimeoutError:
frappe.log_error(title="Timeout waiting for event", message=f"{frappe.get_traceback()}")
def remove_listener(self, method, event):
"""Remove a listener for a specific CDP event."""
self.listeners[method].remove(event)

View file

@ -0,0 +1,284 @@
import os
import platform
import subprocess
import time
from pathlib import Path
from typing import ClassVar
import requests
import frappe
from frappe import _
# TODO: close browser when worker is killed.
class ChromePDFGenerator:
EXECUTABLE_PATHS: ClassVar[dict[str, list[str]]] = {
"linux": ["chrome-linux", "headless_shell"],
"darwin": ["chrome-mac", "headless_shell"],
"windows": ["chrome-win", "headless_shell.exe"],
}
_instance = None
_browsers: ClassVar[list] = []
def add_browser(self, browser):
self._browsers.append(browser)
def remove_browser(self, browser):
self._browsers.remove(browser)
def __new__(cls):
# if instance or _chromium_process is not available create object else return current instance stored in cls._instance
if cls._instance is None or not cls._instance._chromium_process:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
"""Initialize only once."""
if hasattr(self, "_initialized"): # Prevent multiple initializations
return
self._initialized = True # Mark as initialized
self._chromium_process = None
self._chromium_path = None
self._devtools_url = None
self._initialize_chromium()
def _initialize_chromium(self):
# ideally browser is initailized from before request hook.
# if _chromium_process is not available then initialize it.
if self._chromium_process:
return
# get site config and load chromium settings.
site_config = frappe.get_common_site_config()
# only when we want to chromium on separate docker / server ( not implemented/tested yet )
self.CHROMIUM_WEBSOCKET_URL = site_config.get("chromium_websocket_url", "")
if self.CHROMIUM_WEBSOCKET_URL:
frappe.warn("Using external chromium websocket url. Make sure it is accessible.")
self._devtools_url = self.CHROMIUM_WEBSOCKET_URL
return
# only when we want to use chromium from a specific path ( incase we don't have chromium in bench folder )
self.CHROMIUM_BINARY_PATH = site_config.get("chromium_binary_path", "")
"""
Number of allowed open websocket connections to chromium.
This number will basically define how many concurrent requests can be handled by one chromium instance.
#TODO: Implement/Modify logic to handle multiple chromium instance in one class / per worker. currently we are starting one chromium.
"""
self.CHROME_OPEN_CONNECTIONS = site_config.get("chromium_max_concurrent", 1)
# if we want to use persistent ( long running ) chromium for all sites.
# current approch starts chrome per worker process.
# TODO: Better Implement logic to support for persistent chrome proccess.
self.USE_PERSISTENT_CHROMIUM = site_config.get("use_persistent_chromium", False)
# time to wait for chromium to start and provide dev tools url used in _set_devtools_url.
self.START_TIMEOUT = site_config.get("chromium_start_timeout", 3)
self._chromium_path = (
self._find_chromium_executable() if not self.CHROMIUM_BINARY_PATH else self.CHROMIUM_BINARY_PATH
)
if self._verify_chromium_installation():
if not self._devtools_url:
self.start_chromium_process()
def _find_chromium_executable(self):
"""Finds the Chromium executable or raises an error if not found."""
bench_path = frappe.utils.get_bench_path()
"""Determine the path to the Chromium executable. chromium is downloaded by download_chromium in print_designer/install.py"""
chromium_dir = os.path.join(bench_path, "chromium")
if not os.path.exists(chromium_dir):
frappe.throw(_("Chromium is not downloaded. Please run the setup first."))
platform_name = platform.system().lower()
if platform_name not in ["linux", "darwin", "windows"]:
frappe.throw(f"Unsupported platform: {platform_name}")
executable_name = self.EXECUTABLE_PATHS.get(platform_name)
# Construct the full path to the executable
exec_path = Path(chromium_dir).joinpath(*executable_name)
if not exec_path.exists():
frappe.throw(
f"Chromium executable not found: {exec_path}. please run bench setup-new-pdf-backend"
)
return str(exec_path)
def _verify_chromium_installation(self):
"""Ensures Chromium is available and executable, raising clearer errors if not."""
if not os.path.exists(self._chromium_path):
frappe.throw(
f"Chromium not available at the specified path. Please check the path: {self._chromium_path}"
)
if not os.access(self._chromium_path, os.X_OK):
frappe.throw(f"Chromium not executable at {self._chromium_path}")
return True
def start_chromium_process(self, debug=False):
"""
Launches Chromium in headless mode with robust logging and error handling.
chrome switches
https://peter.sh/experiments/chromium-command-line-switches/
NOTE: dbus issue in docker
https://source.chromium.org/chromium/chromium/src/+/main:content/app/content_main.cc;l=229-241?q=DBUS_SESSION_BUS_ADDRESS&ss=chromium
"""
try:
if debug:
command_args = [
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome", # path to locally installed chrome browser for debugging.
"--remote-debugging-port=0",
"--user-data-dir=/tmp/chromium-{}-user-data".format(
frappe.local.site + frappe.utils.random_string(10)
),
"--disable-gpu",
"--no-sandbox",
"--no-first-run",
"",
]
else:
command_args = [
self._chromium_path,
# 0 will automatically select a random open port from the ephemeral port range.
"--remote-debugging-port=0",
"--disable-gpu", # GPU is not available in production environment.
"--disable-field-trial-config",
"--disable-background-networking",
"--disable-background-timer-throttling",
"--disable-backgrounding-occluded-windows",
"--disable-back-forward-cache",
"--disable-breakpad",
"--disable-client-side-phishing-detection",
"--disable-component-extensions-with-background-pages",
"--disable-component-update",
"--no-default-browser-check",
"--disable-default-apps",
"--disable-dev-shm-usage",
"--disable-extensions",
"--disable-features=ImprovedCookieControls,LazyFrameLoading,GlobalMediaControls,DestroyProfileOnBrowserClose,MediaRouter,DialMediaRouteProvider,AcceptCHFrame,AutoExpandDetailsElement,CertificateTransparencyComponentUpdater,AvoidUnnecessaryBeforeUnloadCheckSync,Translate,HttpsUpgrades,PaintHolding,ThirdPartyStoragePartitioning,LensOverlay,PlzDedicatedWorker",
"--allow-pre-commit-input",
"--disable-hang-monitor",
"--disable-ipc-flooding-protection",
"--disable-popup-blocking",
"--disable-prompt-on-repost",
"--disable-renderer-backgrounding",
"--force-color-profile=srgb",
"--metrics-recording-only",
"--no-first-run",
"--password-store=basic",
"--use-mock-keychain",
"--no-service-autorun",
"--export-tagged-pdf",
"--disable-search-engine-choice-screen",
"--unsafely-disable-devtools-self-xss-warnings",
"--enable-use-zoom-for-dsf=false",
"--use-angle",
"--headless",
"--hide-scrollbars",
"--mute-audio",
"--blink-settings=primaryHoverType=2,availableHoverTypes=2,primaryPointerType=4,availablePointerTypes=4",
"--no-sandbox",
"--no-startup-window",
# related to HeadlessExperimental flag enable when Implement Deterministic rendering. check page class for more info.
# "--enable-surface-synchronization",
# "--run-all-compositor-stages-before-draw",
# "--disable-threaded-animation",
# "--disable-threaded-scrolling",
# "--disable-checker-imaging",
]
self._start_chromium_process(command_args)
except Exception as e:
frappe.log_error(f"Error starting Chromium: {e}")
frappe.throw(_("Could not start Chromium. Check logs for details."))
# Apply the decorator to monitor Chromium subprocess usage for development / debugging purposes.
# it will print and write usage data to a file ( defaults to chrome_process_usage.json).
# from print_designer.pdf_generator.monitor_subprocess import monitor_subprocess_usage
# @monitor_subprocess_usage(interval=0.1)
def _start_chromium_process(self, command_args):
if platform.system().lower() == "windows":
# hide cmd window
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = subprocess.SW_HIDE
self._chromium_process = subprocess.Popen(
command_args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
startupinfo=startupinfo,
text=True,
)
else:
self._chromium_process = subprocess.Popen(
command_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
return self._chromium_process
def _set_devtools_url(self):
"""
Monitor Chromium's stderr for the DevTools WebSocket URL
----------------
other approch: if we choose port using find_available_port we can avoid this entirely and fetch_devtools_url() method.
NOTE: 1) in current approch output to stderr is pretty consistent.
2) other approch may seem reliable but it is slow compared to this in testing.
TODO:
final approch can be decided later after testing in production.
"""
stderr = self._chromium_process.stderr
start_time = time.time()
while time.time() - start_time < self.START_TIMEOUT:
# Read a single line from stderr and check if it contains the DevTools URL.
# Not using select() because it is not supported on Windows for non-socket file descriptors.
line = stderr.readline()
# not sure if "DevTools listening on" is consistent in all chromium versions.
if "DevTools listening on" in line:
url_start = line.find("ws://")
if url_start != -1:
self._devtools_url = line[url_start:].strip()
break
if not self._devtools_url:
self._chromium_process.terminate()
raise TimeoutError("Chromium took too long to start.")
def _close_browser(self):
"""
Close the headless Chromium browser.
"""
if self._browsers:
frappe.log("Cannot close Chromium as there are active browser instances.")
return
if self._chromium_process:
self._chromium_process.terminate()
ChromePDFGenerator._instance = None
self._chromium_process = None
self._devtools_url = None
frappe.log("Headless Chromium closed successfully.")
# not used anywhere in the code. read _set_devtools_url for more info. useful in case we want to take different approch to fetch devtools url.
def fetch_devtools_url(self, port):
if not port:
return None
url = f"http://127.0.0.1:{port}/json/version"
try:
response = requests.get(url)
response.raise_for_status() # Raise an exception for HTTP errors
response_data = response.json()
return response_data["webSocketDebuggerUrl"].strip()
except requests.ConnectionError:
frappe.log_error(
f"Failed to connect to the Chrome DevTools Protocol. Is Chrome running with --remote-debugging-port={port}"
)
except requests.RequestException as e:
frappe.log_error(f"An error occurred: {e}")
return None

View file

@ -0,0 +1,356 @@
import base64
import time
import urllib
import frappe
"""
CDP commands documentation can be found here.
https://chromedevtools.github.io/devtools-protocol/
"""
class Page:
def __init__(self, session, browser_context_id, page_type):
self.session = session
result, error = self.session.send(
"Target.createTarget", {"url": "", "browserContextId": browser_context_id}
)
if error:
frappe.log_error(title="Error creating new page:", message=f"{error}")
self.target_id = result["targetId"]
self.type = page_type
result, error = self.session.send(
"Target.attachToTarget", {"targetId": self.target_id, "flatten": True}
)
if error:
raise RuntimeError(f"Error attaching to target: {error}")
self.session_id = result["sessionId"]
self.send("Page.enable")
self.frame_id = None
self.get_frame_id_on_demand()
self.set_media_emulation("print")
self.set_cookies()
# TODO: make send to return future and don't wait for it by default.
def send(self, method, params=None, return_future=False):
if params is None:
params = {}
return self.session.send(method, params, self.session_id, return_future)
def get_frame_id_on_demand(self):
if self.frame_id:
return self.frame_id
try:
result, error = self.send("Page.getFrameTree")
if error:
raise RuntimeError(f"Error fetching frameId: {error}")
frame_tree = result["frameTree"]
frame = frame_tree["frame"]
self.frame_id = frame["id"]
return self.frame_id
except Exception:
frappe.log_error(title="Error fetching frameId:", message=f"{frappe.get_traceback()}")
raise
def _ensure_frame_id(self):
if not self.frame_id:
self.get_frame_id_on_demand()
return self.frame_id
def set_media_emulation(self, media_type: str = "print"):
"""Set media emulation for the page."""
return self.send("Emulation.setEmulatedMedia", {"media": media_type})
def set_cookies(self):
if frappe.session and frappe.session.sid and hasattr(frappe.local, "request"):
domain = frappe.utils.get_host_name().split(":", 1)[0]
cookie = {
"name": "sid",
"value": frappe.session.sid,
"domain": domain,
"sameSite": "Strict",
}
_result, error = self.send("Network.enable")
if error:
raise RuntimeError(f"Error enabling network: {error}")
_result, error = self.send("Network.setCookie", cookie)
if error:
raise RuntimeError(f"Error setting cookie: {error}")
_result, error = self.send("Network.disable")
if error:
raise RuntimeError(f"Error disabling network: {error}")
def intercept_request_and_fulfill(self, url_pattern):
"""Starts intercepting network requests for the given target_id and URL pattern."""
data = {}
def on_request_paused_event(future, response):
"""Callback for when a request is paused (intercepted)."""
params = response.get("params")
if params and params.get("requestId"):
data["request_id"] = params["requestId"]
if not future.done():
future.set_result(data["request_id"])
# Start listening for requestPaused event
event = self.session.start_listener(
"Fetch.requestPaused", on_request_paused_event, self.session_id, self.target_id, self.frame_id
)
# Enable request interception for the specified URL pattern
self.session.send("Fetch.enable", {"patterns": [{"urlPattern": url_pattern}]})
def intercept_and_fulfill():
self.session.wait_for_event(event)
self.session.send(
"Fetch.fulfillRequest",
{"requestId": event[1].result(), "responseCode": 200},
return_future=True,
)
self.session.remove_listener("Fetch.requestPaused", event)
return intercept_and_fulfill
def intercept_request_for_local_resources(self, url_pattern="*"):
"""Starts intercepting network requests for the given target_id and URL pattern."""
data = {}
def on_request_paused_event(future, response):
"""Callback for when a request is paused (intercepted)."""
params = response.get("params")
if params and params.get("requestId"):
data["request_id"] = params["requestId"]
url = params["request"]["url"]
if url.startswith(frappe.request.host_url):
path = url.replace(frappe.request.host_url, "").split("?v", 1)[0]
if path.startswith("assets/") or path.startswith("files/"):
path = urllib.parse.unquote(path)
if path.startswith("files/"):
path = frappe.utils.get_site_path("public", path)
content = frappe.read_file(path, as_base64=True)
response_headers = []
# write logic to handle all file types as required
if path.endswith(".svg"):
response_headers.append({"name": "Content-Type", "value": "image/svg+xml"})
if content:
self.session.send(
"Fetch.fulfillRequest",
{
"requestId": data["request_id"],
"responseCode": 200, # actually hande the response code from the request
"responseHeaders": response_headers,
"body": content,
},
return_future=True,
)
return
self.session.send(
"Fetch.continueRequest",
{"requestId": data["request_id"]},
return_future=True,
)
# Start listening for requestPaused event
self.session.start_listener(
"Fetch.requestPaused", on_request_paused_event, self.session_id, self.target_id, self.frame_id
)
# Enable request interception for the specified URL pattern
self.session.send("Fetch.enable", {"patterns": [{"urlPattern": url_pattern}]})
def set_tab_url(self, url):
"""Navigate to a URL and fulfill the request with status code 200."""
# Intercept and fulfill request with 200 status code
wait_and_fulfill = self.intercept_request_and_fulfill(url)
# Now, navigate after intercepting the request
wait_start = self.wait_for_load(wait_for="load")
page_navigate = self.send("Page.navigate", {"url": url}, return_future=True)
wait_and_fulfill()
def wait_for_navigate():
self.session.wait_for_event(page_navigate, 3)
wait_start()
self.wait_for_navigate = wait_for_navigate
def evaluate(self, expression, await_promise=False):
self.send("Runtime.enable")
result, error = self.send(
"Runtime.evaluate", {"expression": expression, "awaitPromise": await_promise}
)
if error:
# retry if error in 500ms for 3 times (just safe guard as i had few edge cases where it failed).
# waiting for network is still slower than this.
for _i in range(3):
print(f"Error evaluating expression: {error}. Retrying in 500ms")
time.sleep(0.5)
result, error = self.send(
"Runtime.evaluate", {"expression": expression, "awaitPromise": await_promise}
)
if not error:
break
raise RuntimeError(f"Error evaluating expression: {error}")
self.send("Runtime.disable")
return result
# set wait_for to networkIdle if pdf is not rendering correctly.
# if you face header Height to be incorrect as some external script is changing elements.
# networkIdle is most stable option but make it a lot slower so avoiding for now. enable if not stable
def set_content(self, html, wait_for=None):
if not wait_for:
wait_for = ["load", "DOMContentLoaded"]
self.intercept_request_for_local_resources()
wait_start = self.wait_for_load(wait_for=wait_for)
self.send("Page.setDocumentContent", {"frameId": self._ensure_frame_id(), "html": html})
self.wait_for_set_content = wait_start
def wait_for_load(self, wait_for, timeout=60):
self.send("Page.setLifecycleEventsEnabled", {"enabled": True})
status = {}
if isinstance(wait_for, str):
status[wait_for] = False
if isinstance(wait_for, list):
for event in wait_for:
status[event] = False
def on_lifecycle_event(future, response):
params = response.get("params", {})
if params.get("name") in status.keys():
status[params.get("name")] = True
if all(status.values()):
if not future.done():
future.set_result(response)
event = self.session.start_listener(
"Page.lifecycleEvent", on_lifecycle_event, self.session_id, self.target_id, self.frame_id
)
def start_wait():
self.session.wait_for_event(event, timeout)
self.session.remove_listener("Page.lifecycleEvent", event)
return start_wait
def get_element_height(self, selector="body"):
try:
if not self.is_print_designer:
selector = ".wrapper"
self.send("DOM.enable")
doc_result, doc_error = self.send("DOM.getDocument")
if doc_error:
raise RuntimeError(f"Error getting document node: {doc_error}")
doc_node_id = doc_result["root"]["nodeId"]
result, error = self.send("DOM.querySelector", {"nodeId": doc_node_id, "selector": selector})
if error:
raise RuntimeError(f"Error querying selector: {error}")
node_id = result["nodeId"]
result, error = self.send("DOM.getBoxModel", {"nodeId": node_id})
if error:
raise RuntimeError(f"Error getting computed style: {error}")
height = result["model"]["height"]
finally:
self.send("DOM.disable")
return height
def add_page_size_css(self):
width = str(self.options["paperWidth"]) + "in"
height = str(self.options["paperHeight"]) + "in"
marginLeft = str(self.options["marginLeft"]) + "in"
marginRight = str(self.options["marginRight"]) + "in"
marginTop = str(self.options["marginTop"]) + "in"
marginBottom = str(self.options["marginBottom"]) + "in"
# Enable DOM and CSS agents
result, error = self.send("DOM.enable")
if error:
raise RuntimeError(f"Error enabling DOM: {error}")
result, error = self.send("CSS.enable")
if error:
raise RuntimeError(f"Error enabling CSS: {error}")
# Create a new stylesheet
result, error = self.send("CSS.createStyleSheet", {"frameId": self._ensure_frame_id()})
if error:
raise RuntimeError(f"Error creating stylesheet: {error}")
style_sheet_id = result["styleSheetId"]
# Define the CSS rule for the page size
css_rule = f"""
@page {{
size: {width} {height};
margin: {marginTop} {marginRight} {marginBottom} {marginLeft};
}}
"""
# Apply the CSS rule to the created stylesheet
result, error = self.send("CSS.setStyleSheetText", {"styleSheetId": style_sheet_id, "text": css_rule})
if error:
raise RuntimeError(f"Error setting stylesheet text: {error}")
self.send("CSS.disable")
self.send("DOM.disable")
def generate_pdf(self, wait_for_pdf=True, raw=False):
self.add_page_size_css()
if not wait_for_pdf:
self.wait_for_pdf = self.send("Page.printToPDF", self.options, return_future=True)
return
result, error = self.send("Page.printToPDF", self.options)
if error:
raise RuntimeError(f"Error generating PDF: {error}")
if "stream" not in result:
raise ValueError("Stream handle not returned from Page.printToPDF")
return self.get_pdf_from_stream(result["stream"], raw)
def get_pdf_stream_id(self):
# wait for task to complete
self.session.wait_for_event(self.wait_for_pdf)
# wait for event to complete
task = self.wait_for_pdf.result()
future = task.result()
stream_id = future["result"]["stream"]
return stream_id
def get_pdf_from_stream(self, stream_id, raw=False):
from io import BytesIO
from pypdf import PdfReader
pdf_data = b""
offset = 0
while True:
chunk_result, error = self.send("IO.read", {"handle": stream_id, "offset": offset, "size": 4096})
if error:
raise RuntimeError(f"Error reading PDF chunk: {error}")
chunk_data = chunk_result["data"]
# we don't use base64Encode option but added check anyway as it is one of the valid options.
if chunk_result.get("base64Encoded", False):
chunk_data = base64.b64decode(chunk_data)
pdf_data += chunk_data
offset += len(chunk_data)
if chunk_result.get("eof", False):
break
_result, error = self.send("IO.close", {"handle": stream_id})
if error:
raise RuntimeError(f"Error closing PDF stream: {error}")
if raw:
return pdf_data
return PdfReader(BytesIO(pdf_data))
def close(self):
self.session.send("Fetch.disable")
_result, error = self.send("Target.closeTarget", {"targetId": self.target_id})
if error:
raise RuntimeError(f"Error closing target: {error}")

View file

@ -0,0 +1,117 @@
class PDFTransformer:
def __init__(self, browser):
self.browser = browser
self.body_pdf = browser.body_pdf
self.is_print_designer = browser.is_print_designer
self._set_header_pdf()
self._set_footer_pdf()
if not self.header_pdf and not self.footer_pdf:
return
self.no_of_pages = len(self.body_pdf.pages)
self.encrypt_password = self.browser.options.get("password", None)
# if not header / footer then return body pdf
def _set_header_pdf(self):
self.header_pdf = None
if hasattr(self.browser, "header_pdf"):
self.header_pdf = self.browser.header_pdf
self.is_header_dynamic = self.browser.is_header_dynamic
def _set_footer_pdf(self):
self.footer_pdf = None
if hasattr(self.browser, "footer_pdf"):
self.footer_pdf = self.browser.footer_pdf
self.is_footer_dynamic = self.browser.is_footer_dynamic
def transform_pdf(self, output=None):
from pypdf import PdfWriter
header = self.header_pdf
body = self.body_pdf
footer = self.footer_pdf
if not header and not footer:
return body
body_height = body.pages[0].mediabox.top
body_transform = header_height = footer_height = header_body_top = 0
if footer:
footer_height = footer.pages[0].mediabox.top
body_transform = footer_height
if header:
header_height = header.pages[0].mediabox.top
header_transform = body_height + footer_height
header_body_top = header_height + body_height + footer_height
if header and not self.is_header_dynamic:
for h in header.pages:
self._transform(h, header_body_top, header_transform)
for p in body.pages:
if header_body_top:
self._transform(p, header_body_top, body_transform)
if header:
if self.is_header_dynamic:
p.merge_page(
self._transform(header.pages[p.page_number], header_body_top, header_transform)
)
elif self.is_print_designer:
if p.page_number == 0:
p.merge_page(header.pages[0])
elif p.page_number == self.no_of_pages - 1:
p.merge_page(header.pages[3])
elif p.page_number % 2 == 0:
p.merge_page(header.pages[2])
else:
p.merge_page(header.pages[1])
else:
p.merge_page(header.pages[0])
if footer:
if self.is_footer_dynamic:
p.merge_page(footer.pages[p.page_number])
elif self.is_print_designer:
if p.page_number == 0:
p.merge_page(footer.pages[0])
elif p.page_number == self.no_of_pages - 1:
p.merge_page(footer.pages[3])
elif p.page_number % 2 == 0:
p.merge_page(footer.pages[2])
else:
p.merge_page(footer.pages[1])
else:
p.merge_page(footer.pages[0])
if output:
output.append_pages_from_reader(body)
return output
writer = PdfWriter()
writer.append_pages_from_reader(body)
if self.encrypt_password:
writer.encrypt(self.encrypt_password)
return self.get_file_data_from_writer(writer)
def _transform(self, page, page_top, ty):
from pypdf import PdfWriter, Transformation
transform = Transformation().translate(ty=ty)
page.mediabox.upper_right = (page.mediabox.right, page_top)
page.add_transformation(transform)
return page
def get_file_data_from_writer(self, writer_obj):
from io import BytesIO
# https://docs.python.org/3/library/io.html
stream = BytesIO()
writer_obj.write(stream)
# Change the stream position to start of the stream
stream.seek(0)
# Read up to size bytes from the object and return them
return stream.read()

View file

@ -229,6 +229,9 @@ def download_pdf(
letterhead=None,
pdf_generator: Literal["wkhtmltopdf", "chrome"] | None = None,
):
if pdf_generator is None:
pdf_generator = "wkhtmltopdf"
doc = doc or frappe.get_doc(doctype, name)
validate_print_permission(doc)

View file

@ -1,8 +1,18 @@
import os
import re
from typing import Literal
import click
import frappe
from frappe.utils.data import cint, cstr
EXECUTABLE_PATHS = {
"linux": ["chrome-linux", "headless_shell"],
"darwin": ["chrome-mac", "headless_shell"],
"windows": ["chrome-win", "headless_shell.exe"],
}
def get_print(
doctype=None,
@ -144,3 +154,336 @@ def attach_print(
file_name = cstr(file_name).replace(" ", "").replace("/", "-") + ext
return {"fname": file_name, "fcontent": content}
def setup_chromium():
"""Setup Chromium at the bench level."""
# Load Chromium version from common_site_config.json or use default
try:
executable = find_or_download_chromium_executable()
click.echo(f"Chromium is already set up at {executable}")
except Exception as e:
click.echo(f"Failed to setup Chromium: {e}")
raise RuntimeError(f"Failed to setup Chromium: {e}")
return executable
def find_or_download_chromium_executable():
"""Finds the Chromium executable or downloads if not found."""
import platform
from pathlib import Path
bench_path = frappe.utils.get_bench_path()
"""Determine the path to the Chromium executable."""
chromium_dir = os.path.join(bench_path, "chromium")
platform_name = platform.system().lower()
if platform_name not in ["linux", "darwin", "windows"]:
click.echo(f"Unsupported platform: {platform_name}")
executable_name = EXECUTABLE_PATHS.get(platform_name)
# Construct the full path to the executable
exec_path = Path(chromium_dir).joinpath(*executable_name)
if not exec_path.exists():
click.echo("Chromium is not available. downloading...")
download_chromium()
if not exec_path.exists():
click.echo("Error while downloading chrome")
return str(exec_path)
def download_chromium():
import platform
import shutil
import zipfile
import requests
bench_path = frappe.utils.get_bench_path()
"""Download and extract Chromium for the specific version at the bench level."""
chromium_dir = os.path.join(bench_path, "chromium")
# Remove old Chromium directory if it exists
if os.path.exists(chromium_dir):
click.echo("Removing old Chromium directory...")
shutil.rmtree(chromium_dir, ignore_errors=True)
os.makedirs(chromium_dir, exist_ok=True)
download_url = get_chromium_download_url()
file_name = os.path.basename(download_url)
zip_path = os.path.join(chromium_dir, file_name)
try:
click.echo(f"Downloading Chromium from {download_url}...")
# playwright's requires a user agent
headers = {"User-Agent": "Wget/1.21.1"}
with requests.get(download_url, stream=True, timeout=(10, 60), headers=headers) as r:
r.raise_for_status() # Raise an error for bad status codes
total_size = int(r.headers.get("content-length", 0)) # Get total file size
bar = click.progressbar(length=total_size, label="Downloading Chromium")
with open(zip_path, "wb") as f:
for chunk in r.iter_content(chunk_size=65536):
f.write(chunk)
bar.update(len(chunk))
click.echo("Extracting Chromium...")
with zipfile.ZipFile(zip_path, "r") as zip_ref:
zip_ref.extractall(chromium_dir)
if os.path.exists(zip_path):
os.remove(zip_path)
# There should be only one directory
# Ensure the correct directory is renamed
extracted = os.listdir(chromium_dir)[0]
executable_path = EXECUTABLE_PATHS[platform.system().lower()]
chrome_folder_name = executable_path[0]
if extracted != chrome_folder_name:
extracted_dir = os.path.join(chromium_dir, extracted)
renamed_dir = os.path.join(chromium_dir, chrome_folder_name)
if os.path.exists(extracted_dir):
click.echo(f"Renaming {extracted_dir} to {renamed_dir}")
os.rename(extracted_dir, renamed_dir)
else:
raise RuntimeError(f"Failed to rename extracted directory. Expected {chrome_folder_name}.")
if os.path.exists(renamed_dir):
executable_shell = os.path.join(renamed_dir, "chrome-headless-shell")
if os.path.exists(executable_shell):
os.rename(executable_shell, os.path.join(renamed_dir, "headless_shell"))
else:
raise RuntimeError("Failed to rename executable. Expected chrome-headless-shell.")
# Make the `headless_shell` executable
exec_path = os.path.join(renamed_dir, executable_path[1])
make_chromium_executable(exec_path)
click.echo(f"Chromium is ready to use at: {chromium_dir}")
except requests.Timeout:
click.echo("Download timed out. Check your internet connection.")
raise RuntimeError("Download timed out.")
except requests.ConnectionError:
click.echo("Failed to connect to Chromium download server.")
raise RuntimeError("Connection error.")
except requests.RequestException as e:
click.echo(f"Failed to download Chromium: {e}")
raise RuntimeError(f"Failed to download Chromium: {e}")
except zipfile.BadZipFile as e:
click.echo(f"Failed to extract Chromium: {e}")
raise RuntimeError(f"Failed to extract Chromium: {e}")
def get_chromium_download_url():
# Avoid this unless it is going to run on a single type of platform and you have the correct binary hosted.
common_config = frappe.get_common_site_config()
chrome_download_url = common_config.get("chromium_download_url", None)
if chrome_download_url:
return chrome_download_url
"""
We are going to use chrome-for-testing builds but unfortunately it doesn't have linux arm64 https://github.com/GoogleChromeLabs/chrome-for-testing/issues/1
so we will use playwright's fallback builds for linux arm64
TODO: we will also use the fallback builds for windows arm
https://community.arm.com/arm-community-blogs/b/tools-software-ides-blog/posts/native-chromium-builds-windows-on-arm
"""
"""
To find the CHROME_VERSION AND CHROME_FALLBACK_VERSION, follow these steps:
1. Visit the GitHub Actions page for Playwright: https://github.com/microsoft/playwright/actions/workflows/roll_browser_into_playwright.yml
2. Open the latest job run.
3. Navigate to the "Roll to New Browser Version" step.
4. In the logs, look for a line similar to:
Downloading Chromium 133.0.6943.16 (playwright build v1155)
Here, the first number (e.g., 133.0.6943.16) is the CHROME_VERSION, and the second number (e.g., 1155) is the CHROME_FALLBACK_VERSION.
"""
# Using Google's chrome-for-testing-public builds for most platforms. (close to end user experience)
# For Linux ARM64, we use Playwright's Chromium builds due to the lack of official support.
download_path = {
"linux64": "%s/linux64/chrome-headless-shell-linux64.zip",
"mac-arm64": "%s/mac-arm64/chrome-headless-shell-mac-arm64.zip",
"mac-x64": "%s/mac-x64/chrome-headless-shell-mac-x64.zip",
"win32": "%s/win32/chrome-headless-shell-win32.zip",
"win64": "%s/win64/chrome-headless-shell-win64.zip",
}
linux_arm_download_path = {
"ubuntu20.04-arm64": "%s/chromium-headless-shell-linux-arm64.zip",
"ubuntu22.04-arm64": "%s/chromium-headless-shell-linux-arm64.zip",
"ubuntu24.04-arm64": "%s/chromium-headless-shell-linux-arm64.zip",
"debian11-arm64": "%s/chromium-headless-shell-linux-arm64.zip",
"debian12-arm64": "%s/chromium-headless-shell-linux-arm64.zip",
}
platform_key = calculate_platform()
version = "133.0.6943.35"
playwright_build_version = "1157"
base_url = "https://storage.googleapis.com/chrome-for-testing-public/"
playwright_base_url = "https://cdn.playwright.dev/dbazure/download/playwright/builds/chromium/"
# Overwrite with values from common_site_config.json ( escape hatch )
version = common_config.get("chromium_version", version)
playwright_build_version = common_config.get("playwright_chromium_version", playwright_build_version)
# make sure that you have all required flavours at correct urls
base_url = common_config.get("chromium_download_base_url", base_url)
playwright_base_url = common_config.get("playwright_chromium_download_base_url", playwright_base_url)
if platform_key in download_path:
relative_path = download_path[platform_key]
elif platform_key in linux_arm_download_path:
version = playwright_build_version
base_url = playwright_base_url
relative_path = linux_arm_download_path[platform_key]
else:
frappe.throw(
f"No download path configured or Chromium download not available for platform: {platform_key}"
)
return f"{base_url}{relative_path % version}"
def make_chromium_executable(executable):
"""Make the Chromium executable."""
if os.path.exists(executable):
# check if the file is executable
if os.access(executable, os.X_OK):
click.echo(f"Chromium executable is already executable: {executable}")
return
click.echo(f"Making Chromium executable: {executable}")
os.chmod(executable, 0o755) # Set executable permissions
click.echo(f"Chromium executable permissions set: {executable}")
else:
raise RuntimeError(f"Chromium executable not found: {executable}.")
def calculate_platform():
"""
Determines the host platform and returns it as a string.
Includes logic for Linux ARM, Linux x64, macOS (Intel and ARM), and Windows (32-bit and 64-bit).
Returns:
str: The detected platform string (e.g., 'linux64', 'mac-arm64', etc.).
"""
import platform
system = platform.system().lower()
arch = platform.machine().lower()
# Handle Linux ARM-specific logic
if system == "linux" and arch == "aarch64":
distro_info = get_linux_distribution_info()
distro_id = distro_info.get("id", "")
version = distro_info.get("version", "")
major_version = int(version.split(".")[0]) if version else 0
if distro_id == "ubuntu":
if major_version < 20:
return "ubuntu18.04-arm64"
if major_version < 22:
return "ubuntu20.04-arm64"
if major_version < 24:
return "ubuntu22.04-arm64"
if major_version < 26:
return "ubuntu24.04-arm64"
return "<unknown>"
if distro_id in ["debian", "raspbian"]:
if major_version < 11:
return "debian10-arm64"
if major_version < 12:
return "debian11-arm64"
return "debian12-arm64"
return "<unknown>"
# Handle other platforms
elif system == "linux" and arch == "x86_64":
return "linux64"
elif system == "darwin" and arch == "arm64":
return "mac-arm64"
elif system == "darwin" and arch == "x86_64":
return "mac-x64"
elif system == "windows" and arch == "x86":
return "win32"
elif system == "windows" and arch == "x86_64":
return "win64"
return "<unknown>"
def get_linux_distribution_info():
# not tested
"""Retrieve Linux distribution information using the `distro` library."""
import distro
if not distro:
return {"id": "", "version": ""}
return {"id": distro.id().lower(), "version": distro.version()}
def parse_float_and_unit(input_text, default_unit="px"):
if isinstance(input_text, int | float):
return {"value": input_text, "unit": default_unit}
if not isinstance(input_text, str):
return
number = float(re.search(r"[+-]?([0-9]*[.])?[0-9]+", input_text).group())
valid_units = [r"px", r"mm", r"cm", r"in"]
unit = [match.group() for rx in valid_units if (match := re.search(rx, input_text))]
return {"value": number, "unit": unit[0] if len(unit) == 1 else default_unit}
def convert_uom(
number: float,
from_uom: Literal["px", "mm", "cm", "in"] = "px",
to_uom: Literal["px", "mm", "cm", "in"] = "px",
only_number: bool = False,
) -> float:
unit_values = {
"px": 1,
"mm": 3.7795275591,
"cm": 37.795275591,
"in": 96,
}
from_px = (
{
"to_px": 1,
"to_mm": unit_values["px"] / unit_values["mm"],
"to_cm": unit_values["px"] / unit_values["cm"],
"to_in": unit_values["px"] / unit_values["in"],
},
)
from_mm = (
{
"to_mm": 1,
"to_px": unit_values["mm"] / unit_values["px"],
"to_cm": unit_values["mm"] / unit_values["cm"],
"to_in": unit_values["mm"] / unit_values["in"],
},
)
from_cm = (
{
"to_cm": 1,
"to_px": unit_values["cm"] / unit_values["px"],
"to_mm": unit_values["cm"] / unit_values["mm"],
"to_in": unit_values["cm"] / unit_values["in"],
},
)
from_in = {
"to_in": 1,
"to_px": unit_values["in"] / unit_values["px"],
"to_mm": unit_values["in"] / unit_values["mm"],
"to_cm": unit_values["in"] / unit_values["cm"],
}
converstion_factor = ({"from_px": from_px, "from_mm": from_mm, "from_cm": from_cm, "from_in": from_in},)
if only_number:
return round(number * converstion_factor[0][f"from_{from_uom}"][0][f"to_{to_uom}"], 3)
return f"{round(number * converstion_factor[0][f'from_{from_uom}'][0][f'to_{to_uom}'], 3)}{to_uom}"

View file

@ -90,6 +90,8 @@ dependencies = [
"posthog~=5.0.0",
"vobject~=0.9.9",
"pycountry~=24.6.1",
"websockets"
]
[project.urls]