refactor: add queue for indexing docs in sqlite search (#37840)
This commit is contained in:
parent
a655bbdfa6
commit
5168c52ebe
3 changed files with 182 additions and 5 deletions
|
|
@ -210,6 +210,7 @@ scheduler_events = {
|
|||
# 5 minutes
|
||||
"0/5 * * * *": [
|
||||
"frappe.email.doctype.notification.notification.trigger_offset_alerts",
|
||||
"frappe.search.sqlite_search.index_docs_in_queue",
|
||||
],
|
||||
# 15 minutes
|
||||
"0/15 * * * *": [
|
||||
|
|
|
|||
|
|
@ -1202,6 +1202,7 @@ class SQLiteSearch(ABC):
|
|||
|
||||
def _set_pragmas(self, cursor, is_read=False):
|
||||
"""Set SQLite performance pragmas."""
|
||||
cursor.execute("PRAGMA busy_timeout = 5000;") # Wait up to 5 seconds if the database is locked
|
||||
cursor.execute("PRAGMA journal_mode = WAL;") # Write-Ahead Logging for concurrency
|
||||
cursor.execute("PRAGMA synchronous = NORMAL;") # Better performance vs FULL
|
||||
cursor.execute("PRAGMA cache_size = -8192;") # 8MB cache
|
||||
|
|
@ -1291,6 +1292,13 @@ class SQLiteSearch(ABC):
|
|||
CREATE INDEX IF NOT EXISTS idx_progress_doctype ON search_index_progress(doctype)
|
||||
""")
|
||||
|
||||
# Queue table for indexing docs later
|
||||
cursor.execute("""
|
||||
CREATE TABLE IF NOT EXISTS search_index_queue (
|
||||
doc_id TEXT PRIMARY KEY
|
||||
)
|
||||
""")
|
||||
|
||||
self._with_connection(create_tables)
|
||||
|
||||
def _index_documents(self, documents):
|
||||
|
|
@ -1368,17 +1376,23 @@ class SQLiteSearch(ABC):
|
|||
|
||||
def index_doc(self, doctype, docname):
|
||||
"""Index a single document."""
|
||||
doc = frappe.get_doc(doctype, docname)
|
||||
self.raise_if_not_indexed()
|
||||
document = self.prepare_document(doc)
|
||||
if document:
|
||||
self._index_documents([document])
|
||||
# add the document in the queue table, handled later by a background job "index_docs_in_queue"
|
||||
if frappe.db.exists(doctype, docname):
|
||||
self._ensure_fts_table()
|
||||
self.add_to_queue(f"{doctype}:{docname}")
|
||||
|
||||
def add_to_queue(self, doc_id):
|
||||
"""Add a doc_id to the indexing queue."""
|
||||
# Using INSERT OR IGNORE to prevent duplicate entries in the queue
|
||||
self.sql("INSERT OR IGNORE INTO search_index_queue (doc_id) VALUES (?)", (doc_id,), commit=True)
|
||||
|
||||
def remove_doc(self, doctype, docname):
|
||||
"""Remove a single document from the index."""
|
||||
self.raise_if_not_indexed()
|
||||
doc_id = f"{doctype}:{docname}"
|
||||
self.sql("DELETE FROM search_fts WHERE doc_id = ?", (doc_id,), commit=True)
|
||||
self.sql("DELETE FROM search_index_queue WHERE doc_id = ?", (doc_id,), commit=True)
|
||||
|
||||
# Utility Methods
|
||||
|
||||
|
|
@ -1890,3 +1904,50 @@ def get_search_classes() -> list[type[SQLiteSearch]]:
|
|||
raise TypeError(f"Search class {search_class.__name__} must extend SQLiteSearch")
|
||||
|
||||
return search_classes
|
||||
|
||||
|
||||
# Called by scheduler every 5 minutes to process indexing queue
|
||||
def index_docs_in_queue():
|
||||
"""Process documents in the indexing queue."""
|
||||
search_classes = get_search_classes()
|
||||
|
||||
for SearchClass in search_classes:
|
||||
search = SearchClass()
|
||||
if not (search.is_search_enabled() and search.index_exists()):
|
||||
continue
|
||||
|
||||
def process_queue(cursor):
|
||||
rows = cursor.execute("SELECT doc_id FROM search_index_queue LIMIT 30").fetchall()
|
||||
doc_ids = [row["doc_id"] for row in rows]
|
||||
documents = []
|
||||
failed_doc_ids = []
|
||||
|
||||
for doc_id in doc_ids:
|
||||
try:
|
||||
doctype, name = doc_id.split(":", 1)
|
||||
doc = frappe.get_doc(doctype, name)
|
||||
document = search.prepare_document(doc)
|
||||
if document:
|
||||
documents.append(document)
|
||||
except frappe.DoesNotExistError:
|
||||
# Doc was deleted before the queue was processed — skip it
|
||||
continue
|
||||
except Exception:
|
||||
failed_doc_ids.append(doc_id)
|
||||
|
||||
if documents:
|
||||
search._index_documents(documents)
|
||||
|
||||
if failed_doc_ids:
|
||||
frappe.log_error(
|
||||
title="SQLite Search Index Queue Error",
|
||||
message=f"Failed to index documents {failed_doc_ids} from queue in {search.__class__.__name__}",
|
||||
)
|
||||
|
||||
# Cleanup processed doc_ids from the queue
|
||||
if doc_ids:
|
||||
placeholders_for_delete = ",".join(["?" for _ in doc_ids])
|
||||
delete_sql = f"DELETE FROM search_index_queue WHERE doc_id IN ({placeholders_for_delete})"
|
||||
cursor.execute(delete_sql, doc_ids)
|
||||
|
||||
search._with_connection(process_queue)
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ from typing import ClassVar
|
|||
from unittest.mock import patch
|
||||
|
||||
import frappe
|
||||
from frappe.search.sqlite_search import SQLiteSearch, SQLiteSearchIndexMissingError
|
||||
from frappe.search.sqlite_search import SQLiteSearch, SQLiteSearchIndexMissingError, index_docs_in_queue
|
||||
from frappe.tests import IntegrationTestCase
|
||||
|
||||
|
||||
|
|
@ -308,6 +308,91 @@ class TestSQLiteSearchAPI(IntegrationTestCase):
|
|||
# Should return empty results or minimal results
|
||||
self.assertLessEqual(len(results["results"]), 1)
|
||||
|
||||
def test_index_queue(self):
|
||||
self.search.build_index()
|
||||
new_note = frappe.get_doc(
|
||||
{
|
||||
"doctype": "Note",
|
||||
"title": "Newly Added Document",
|
||||
"content": "This document was added after initial indexing",
|
||||
}
|
||||
)
|
||||
new_note.insert()
|
||||
|
||||
try:
|
||||
self.search.index_doc("Note", new_note.name)
|
||||
doc_id = f"Note:{new_note.name}"
|
||||
conn = sqlite3.connect(self.search.db_path)
|
||||
cursor = conn.cursor()
|
||||
doc_id = f"Note:{new_note.name}"
|
||||
|
||||
# should be present in the queue
|
||||
cursor.execute("SELECT COUNT(*) FROM search_index_queue WHERE doc_id = ?", (doc_id,))
|
||||
queue_count = cursor.fetchone()[0]
|
||||
self.assertEqual(
|
||||
queue_count, 1, "Document should be present in the index queue after index_doc call"
|
||||
)
|
||||
|
||||
# should not be indexed yet
|
||||
cursor.execute("SELECT COUNT(*) FROM search_fts WHERE doc_id = ?", (doc_id,))
|
||||
db_count = cursor.fetchone()[0]
|
||||
self.assertEqual(
|
||||
db_count,
|
||||
0,
|
||||
"Document should not be indexed immediately after index_doc call, should be in queue instead",
|
||||
)
|
||||
|
||||
with patch("frappe.get_hooks") as mock_get_hooks:
|
||||
mock_get_hooks.return_value = ["frappe.tests.test_sqlite_search.TestSQLiteSearch"]
|
||||
index_docs_in_queue()
|
||||
|
||||
# after processing the queue, it should be indexed and removed from the queue
|
||||
cursor.execute("SELECT COUNT(*) FROM search_index_queue WHERE doc_id = ?", (doc_id,))
|
||||
queue_count_after = cursor.fetchone()[0]
|
||||
self.assertEqual(
|
||||
queue_count_after, 0, "Document should be removed from index queue after processing"
|
||||
)
|
||||
|
||||
cursor.execute("SELECT COUNT(*) FROM search_fts WHERE doc_id = ?", (doc_id,))
|
||||
db_count_after = cursor.fetchone()[0]
|
||||
self.assertEqual(db_count_after, 1, "Document should be indexed after processing the queue")
|
||||
conn.close()
|
||||
|
||||
finally:
|
||||
new_note.delete()
|
||||
|
||||
def test_index_queue_deduplication(self):
|
||||
self.search.build_index()
|
||||
new_note = frappe.get_doc(
|
||||
{
|
||||
"doctype": "Note",
|
||||
"title": "Newly Added Document",
|
||||
"content": "This document was added after initial indexing",
|
||||
}
|
||||
)
|
||||
new_note.insert()
|
||||
|
||||
try:
|
||||
self.search.index_doc("Note", new_note.name)
|
||||
new_note.reload()
|
||||
new_note.content = "Updated content"
|
||||
new_note.save()
|
||||
|
||||
doc_id = f"Note:{new_note.name}"
|
||||
conn = sqlite3.connect(self.search.db_path)
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("SELECT COUNT(*) FROM search_index_queue WHERE doc_id = ?", (doc_id,))
|
||||
queue_count = cursor.fetchall()
|
||||
self.assertEqual(
|
||||
len(queue_count),
|
||||
1,
|
||||
"There should only be one entry in the index queue for the document, even after multiple index_doc calls",
|
||||
)
|
||||
conn.close()
|
||||
|
||||
finally:
|
||||
new_note.delete()
|
||||
|
||||
def test_document_indexing_operations(self):
|
||||
"""Test individual document indexing and removal operations."""
|
||||
self.search.build_index()
|
||||
|
|
@ -329,6 +414,10 @@ class TestSQLiteSearchAPI(IntegrationTestCase):
|
|||
|
||||
# Index the new document
|
||||
self.search.index_doc("Note", new_note.name)
|
||||
# Need to process the queue to make it findable in search results
|
||||
with patch("frappe.get_hooks") as mock_get_hooks:
|
||||
mock_get_hooks.return_value = ["frappe.tests.test_sqlite_search.TestSQLiteSearch"]
|
||||
index_docs_in_queue()
|
||||
|
||||
# Now it should be findable
|
||||
results = self.search.search("Newly Added Document")
|
||||
|
|
@ -442,6 +531,10 @@ class TestSQLiteSearchAPI(IntegrationTestCase):
|
|||
# Index the document
|
||||
self.search.index_doc("Note", html_note.name)
|
||||
|
||||
with patch("frappe.get_hooks") as mock_get_hooks:
|
||||
mock_get_hooks.return_value = ["frappe.tests.test_sqlite_search.TestSQLiteSearch"]
|
||||
index_docs_in_queue()
|
||||
|
||||
# Search should find processed content
|
||||
results = self.search.search("bold text links")
|
||||
|
||||
|
|
@ -524,6 +617,17 @@ class TestSQLiteSearchAPI(IntegrationTestCase):
|
|||
# Index the document
|
||||
self.search.index_doc("Note", test_note.name)
|
||||
|
||||
# search results should be 0 as the document is in the queue and not indexed yet
|
||||
results = self.search.search("Deduplication Test")
|
||||
initial_count = len([r for r in results["results"] if r["name"] == test_note.name])
|
||||
self.assertEqual(
|
||||
initial_count, 0, "Document should not be found in search results before processing the queue"
|
||||
)
|
||||
|
||||
with patch("frappe.get_hooks") as mock_get_hooks:
|
||||
mock_get_hooks.return_value = ["frappe.tests.test_sqlite_search.TestSQLiteSearch"]
|
||||
index_docs_in_queue()
|
||||
|
||||
# Search for the document - should find exactly one result
|
||||
results = self.search.search("Deduplication Test")
|
||||
initial_count = len([r for r in results["results"] if r["name"] == test_note.name])
|
||||
|
|
@ -534,6 +638,10 @@ class TestSQLiteSearchAPI(IntegrationTestCase):
|
|||
self.search.index_doc("Note", test_note.name)
|
||||
self.search.index_doc("Note", test_note.name)
|
||||
|
||||
with patch("frappe.get_hooks") as mock_get_hooks:
|
||||
mock_get_hooks.return_value = ["frappe.tests.test_sqlite_search.TestSQLiteSearch"]
|
||||
index_docs_in_queue()
|
||||
|
||||
# Search again - should still find exactly one result
|
||||
results = self.search.search("Deduplication Test")
|
||||
final_count = len([r for r in results["results"] if r["name"] == test_note.name])
|
||||
|
|
@ -543,6 +651,9 @@ class TestSQLiteSearchAPI(IntegrationTestCase):
|
|||
test_note.content = "Updated content for deduplication testing"
|
||||
test_note.save()
|
||||
self.search.index_doc("Note", test_note.name)
|
||||
with patch("frappe.get_hooks") as mock_get_hooks:
|
||||
mock_get_hooks.return_value = ["frappe.tests.test_sqlite_search.TestSQLiteSearch"]
|
||||
index_docs_in_queue()
|
||||
|
||||
# Search with updated content - should find exactly one result with new content
|
||||
results = self.search.search("Updated content deduplication")
|
||||
|
|
@ -554,6 +665,10 @@ class TestSQLiteSearchAPI(IntegrationTestCase):
|
|||
|
||||
# Rebuild entire index - should not create duplicates
|
||||
self.search.build_index()
|
||||
with patch("frappe.get_hooks") as mock_get_hooks:
|
||||
mock_get_hooks.return_value = ["frappe.tests.test_sqlite_search.TestSQLiteSearch"]
|
||||
index_docs_in_queue()
|
||||
|
||||
results = self.search.search("Deduplication Test")
|
||||
rebuild_count = len([r for r in results["results"] if r["name"] == test_note.name])
|
||||
self.assertEqual(rebuild_count, 1, "Should still find exactly one instance after full rebuild")
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue