diff --git a/frappe/hooks.py b/frappe/hooks.py index 719d94d3c4..6ff518f357 100644 --- a/frappe/hooks.py +++ b/frappe/hooks.py @@ -210,6 +210,7 @@ scheduler_events = { # 5 minutes "0/5 * * * *": [ "frappe.email.doctype.notification.notification.trigger_offset_alerts", + "frappe.search.sqlite_search.index_docs_in_queue", ], # 15 minutes "0/15 * * * *": [ diff --git a/frappe/search/sqlite_search.py b/frappe/search/sqlite_search.py index b79d93908a..f3ff46f47f 100644 --- a/frappe/search/sqlite_search.py +++ b/frappe/search/sqlite_search.py @@ -1202,6 +1202,7 @@ class SQLiteSearch(ABC): def _set_pragmas(self, cursor, is_read=False): """Set SQLite performance pragmas.""" + cursor.execute("PRAGMA busy_timeout = 5000;") # Wait up to 5 seconds if the database is locked cursor.execute("PRAGMA journal_mode = WAL;") # Write-Ahead Logging for concurrency cursor.execute("PRAGMA synchronous = NORMAL;") # Better performance vs FULL cursor.execute("PRAGMA cache_size = -8192;") # 8MB cache @@ -1291,6 +1292,13 @@ class SQLiteSearch(ABC): CREATE INDEX IF NOT EXISTS idx_progress_doctype ON search_index_progress(doctype) """) + # Queue table for indexing docs later + cursor.execute(""" + CREATE TABLE IF NOT EXISTS search_index_queue ( + doc_id TEXT PRIMARY KEY + ) + """) + self._with_connection(create_tables) def _index_documents(self, documents): @@ -1368,17 +1376,23 @@ class SQLiteSearch(ABC): def index_doc(self, doctype, docname): """Index a single document.""" - doc = frappe.get_doc(doctype, docname) self.raise_if_not_indexed() - document = self.prepare_document(doc) - if document: - self._index_documents([document]) + # add the document in the queue table, handled later by a background job "index_docs_in_queue" + if frappe.db.exists(doctype, docname): + self._ensure_fts_table() + self.add_to_queue(f"{doctype}:{docname}") + + def add_to_queue(self, doc_id): + """Add a doc_id to the indexing queue.""" + # Using INSERT OR IGNORE to prevent duplicate entries in the queue + self.sql("INSERT OR IGNORE INTO search_index_queue (doc_id) VALUES (?)", (doc_id,), commit=True) def remove_doc(self, doctype, docname): """Remove a single document from the index.""" self.raise_if_not_indexed() doc_id = f"{doctype}:{docname}" self.sql("DELETE FROM search_fts WHERE doc_id = ?", (doc_id,), commit=True) + self.sql("DELETE FROM search_index_queue WHERE doc_id = ?", (doc_id,), commit=True) # Utility Methods @@ -1890,3 +1904,50 @@ def get_search_classes() -> list[type[SQLiteSearch]]: raise TypeError(f"Search class {search_class.__name__} must extend SQLiteSearch") return search_classes + + +# Called by scheduler every 5 minutes to process indexing queue +def index_docs_in_queue(): + """Process documents in the indexing queue.""" + search_classes = get_search_classes() + + for SearchClass in search_classes: + search = SearchClass() + if not (search.is_search_enabled() and search.index_exists()): + continue + + def process_queue(cursor): + rows = cursor.execute("SELECT doc_id FROM search_index_queue LIMIT 30").fetchall() + doc_ids = [row["doc_id"] for row in rows] + documents = [] + failed_doc_ids = [] + + for doc_id in doc_ids: + try: + doctype, name = doc_id.split(":", 1) + doc = frappe.get_doc(doctype, name) + document = search.prepare_document(doc) + if document: + documents.append(document) + except frappe.DoesNotExistError: + # Doc was deleted before the queue was processed — skip it + continue + except Exception: + failed_doc_ids.append(doc_id) + + if documents: + search._index_documents(documents) + + if failed_doc_ids: + frappe.log_error( + title="SQLite Search Index Queue Error", + message=f"Failed to index documents {failed_doc_ids} from queue in {search.__class__.__name__}", + ) + + # Cleanup processed doc_ids from the queue + if doc_ids: + placeholders_for_delete = ",".join(["?" for _ in doc_ids]) + delete_sql = f"DELETE FROM search_index_queue WHERE doc_id IN ({placeholders_for_delete})" + cursor.execute(delete_sql, doc_ids) + + search._with_connection(process_queue) diff --git a/frappe/tests/test_sqlite_search.py b/frappe/tests/test_sqlite_search.py index 691c9161dd..ed6e1ff400 100644 --- a/frappe/tests/test_sqlite_search.py +++ b/frappe/tests/test_sqlite_search.py @@ -5,7 +5,7 @@ from typing import ClassVar from unittest.mock import patch import frappe -from frappe.search.sqlite_search import SQLiteSearch, SQLiteSearchIndexMissingError +from frappe.search.sqlite_search import SQLiteSearch, SQLiteSearchIndexMissingError, index_docs_in_queue from frappe.tests import IntegrationTestCase @@ -308,6 +308,91 @@ class TestSQLiteSearchAPI(IntegrationTestCase): # Should return empty results or minimal results self.assertLessEqual(len(results["results"]), 1) + def test_index_queue(self): + self.search.build_index() + new_note = frappe.get_doc( + { + "doctype": "Note", + "title": "Newly Added Document", + "content": "This document was added after initial indexing", + } + ) + new_note.insert() + + try: + self.search.index_doc("Note", new_note.name) + doc_id = f"Note:{new_note.name}" + conn = sqlite3.connect(self.search.db_path) + cursor = conn.cursor() + doc_id = f"Note:{new_note.name}" + + # should be present in the queue + cursor.execute("SELECT COUNT(*) FROM search_index_queue WHERE doc_id = ?", (doc_id,)) + queue_count = cursor.fetchone()[0] + self.assertEqual( + queue_count, 1, "Document should be present in the index queue after index_doc call" + ) + + # should not be indexed yet + cursor.execute("SELECT COUNT(*) FROM search_fts WHERE doc_id = ?", (doc_id,)) + db_count = cursor.fetchone()[0] + self.assertEqual( + db_count, + 0, + "Document should not be indexed immediately after index_doc call, should be in queue instead", + ) + + with patch("frappe.get_hooks") as mock_get_hooks: + mock_get_hooks.return_value = ["frappe.tests.test_sqlite_search.TestSQLiteSearch"] + index_docs_in_queue() + + # after processing the queue, it should be indexed and removed from the queue + cursor.execute("SELECT COUNT(*) FROM search_index_queue WHERE doc_id = ?", (doc_id,)) + queue_count_after = cursor.fetchone()[0] + self.assertEqual( + queue_count_after, 0, "Document should be removed from index queue after processing" + ) + + cursor.execute("SELECT COUNT(*) FROM search_fts WHERE doc_id = ?", (doc_id,)) + db_count_after = cursor.fetchone()[0] + self.assertEqual(db_count_after, 1, "Document should be indexed after processing the queue") + conn.close() + + finally: + new_note.delete() + + def test_index_queue_deduplication(self): + self.search.build_index() + new_note = frappe.get_doc( + { + "doctype": "Note", + "title": "Newly Added Document", + "content": "This document was added after initial indexing", + } + ) + new_note.insert() + + try: + self.search.index_doc("Note", new_note.name) + new_note.reload() + new_note.content = "Updated content" + new_note.save() + + doc_id = f"Note:{new_note.name}" + conn = sqlite3.connect(self.search.db_path) + cursor = conn.cursor() + cursor.execute("SELECT COUNT(*) FROM search_index_queue WHERE doc_id = ?", (doc_id,)) + queue_count = cursor.fetchall() + self.assertEqual( + len(queue_count), + 1, + "There should only be one entry in the index queue for the document, even after multiple index_doc calls", + ) + conn.close() + + finally: + new_note.delete() + def test_document_indexing_operations(self): """Test individual document indexing and removal operations.""" self.search.build_index() @@ -329,6 +414,10 @@ class TestSQLiteSearchAPI(IntegrationTestCase): # Index the new document self.search.index_doc("Note", new_note.name) + # Need to process the queue to make it findable in search results + with patch("frappe.get_hooks") as mock_get_hooks: + mock_get_hooks.return_value = ["frappe.tests.test_sqlite_search.TestSQLiteSearch"] + index_docs_in_queue() # Now it should be findable results = self.search.search("Newly Added Document") @@ -442,6 +531,10 @@ class TestSQLiteSearchAPI(IntegrationTestCase): # Index the document self.search.index_doc("Note", html_note.name) + with patch("frappe.get_hooks") as mock_get_hooks: + mock_get_hooks.return_value = ["frappe.tests.test_sqlite_search.TestSQLiteSearch"] + index_docs_in_queue() + # Search should find processed content results = self.search.search("bold text links") @@ -524,6 +617,17 @@ class TestSQLiteSearchAPI(IntegrationTestCase): # Index the document self.search.index_doc("Note", test_note.name) + # search results should be 0 as the document is in the queue and not indexed yet + results = self.search.search("Deduplication Test") + initial_count = len([r for r in results["results"] if r["name"] == test_note.name]) + self.assertEqual( + initial_count, 0, "Document should not be found in search results before processing the queue" + ) + + with patch("frappe.get_hooks") as mock_get_hooks: + mock_get_hooks.return_value = ["frappe.tests.test_sqlite_search.TestSQLiteSearch"] + index_docs_in_queue() + # Search for the document - should find exactly one result results = self.search.search("Deduplication Test") initial_count = len([r for r in results["results"] if r["name"] == test_note.name]) @@ -534,6 +638,10 @@ class TestSQLiteSearchAPI(IntegrationTestCase): self.search.index_doc("Note", test_note.name) self.search.index_doc("Note", test_note.name) + with patch("frappe.get_hooks") as mock_get_hooks: + mock_get_hooks.return_value = ["frappe.tests.test_sqlite_search.TestSQLiteSearch"] + index_docs_in_queue() + # Search again - should still find exactly one result results = self.search.search("Deduplication Test") final_count = len([r for r in results["results"] if r["name"] == test_note.name]) @@ -543,6 +651,9 @@ class TestSQLiteSearchAPI(IntegrationTestCase): test_note.content = "Updated content for deduplication testing" test_note.save() self.search.index_doc("Note", test_note.name) + with patch("frappe.get_hooks") as mock_get_hooks: + mock_get_hooks.return_value = ["frappe.tests.test_sqlite_search.TestSQLiteSearch"] + index_docs_in_queue() # Search with updated content - should find exactly one result with new content results = self.search.search("Updated content deduplication") @@ -554,6 +665,10 @@ class TestSQLiteSearchAPI(IntegrationTestCase): # Rebuild entire index - should not create duplicates self.search.build_index() + with patch("frappe.get_hooks") as mock_get_hooks: + mock_get_hooks.return_value = ["frappe.tests.test_sqlite_search.TestSQLiteSearch"] + index_docs_in_queue() + results = self.search.search("Deduplication Test") rebuild_count = len([r for r in results["results"] if r["name"] == test_note.name]) self.assertEqual(rebuild_count, 1, "Should still find exactly one instance after full rebuild")