diff --git a/frappe/hooks.py b/frappe/hooks.py index a977f39529..b37e79b7d4 100644 --- a/frappe/hooks.py +++ b/frappe/hooks.py @@ -216,7 +216,6 @@ scheduler_events = { "frappe.deferred_insert.save_to_db", "frappe.automation.doctype.reminder.reminder.send_reminders", "frappe.model.utils.link_count.update_link_count", - "frappe.search.sqlite_search.build_index_if_not_exists", "frappe.utils.telemetry.pulse.client.send_queued_events", ], # 10 minutes @@ -227,6 +226,9 @@ scheduler_events = { "30 * * * *": [], # Daily but offset by 45 minutes "45 0 * * *": [], + "0 */3 * * *": [ + "frappe.search.sqlite_search.build_index_if_not_exists", + ], }, "all": [ "frappe.email.queue.flush", diff --git a/frappe/search/sqlite_search.py b/frappe/search/sqlite_search.py index 2f9ffd9ee8..061145462c 100644 --- a/frappe/search/sqlite_search.py +++ b/frappe/search/sqlite_search.py @@ -291,88 +291,294 @@ class SQLiteSearch(ABC): }, } - def build_index(self): - """Build the complete search index from scratch using atomic replacement.""" + def build_index(self, batch_size=1000, is_continuation=False): + """Build the search index incrementally with progress tracking. + + The job runs until completion or until killed by the queue timeout. + Progress is tracked in the database so builds can resume from where they left off. + """ if not self.is_search_enabled(): return - # Use temporary database path for atomic replacement - temp_db_path = self._get_db_path(is_temp=True) + # Use temporary database path for atomic replacement (only for new index builds) + temp_db_path = None original_db_path = self.db_path - # Remove temp file if it exists - if os.path.exists(temp_db_path): - os.unlink(temp_db_path) + # Check if this is a completely fresh build or a continuation + if not is_continuation and not self.index_exists(): + # Fresh build - use temporary database for atomic replacement + temp_db_path = self._get_db_path(is_temp=True) - # Temporarily switch to temp database for building - self.db_path = temp_db_path + # Remove temp file if it exists + if os.path.exists(temp_db_path): + os.unlink(temp_db_path) + + # Switch to temp database for building + self.db_path = temp_db_path + elif is_continuation: + # Check if we're continuing a fresh build (temp db exists) + potential_temp_path = self._get_db_path(is_temp=True) + if os.path.exists(potential_temp_path): + # Continue with temporary database from fresh build + temp_db_path = potential_temp_path + self.db_path = temp_db_path + print(f"Continuation: Using temporary database {temp_db_path}") + else: + print(f"Continuation: Using regular database {self.db_path}") + # If no temp db exists, we're continuing with regular database (already set) + + if temp_db_path: + print(f"Working with temporary database: {self.db_path}") + else: + print(f"Working with regular database: {self.db_path}") try: - self._update_progress("Setting up search tables", 0, 100, absolute=True) + # Setup tables if needed (for fresh builds or when temp DB was just created) + if not is_continuation or (temp_db_path and not self._tables_exist()): + self._update_progress("Setting up search tables", 0, 100, absolute=True) + self._ensure_fts_table() - # Setup tables in temp database - self._ensure_fts_table() + # Clear existing index data for fresh build + if temp_db_path and not is_continuation: + self._with_connection(lambda cursor: cursor.execute("DELETE FROM search_fts")) - self._update_progress("Fetching records", 20, 100, absolute=True) + # Initialize progress tracking (only for completely fresh builds) + if not is_continuation: + self._initialize_index_progress() - records = self.get_documents() - documents = [] + # Get current progress + progress = self._get_index_progress() - self._update_progress("Preparing documents", 30, 100, absolute=True) + # Check if indexing is already complete + if self._is_indexing_complete(): + self._update_progress("Search index already complete", 100, 100, absolute=True) + return - total_records = len(records) - for i, doc in enumerate(records): - document = self.prepare_document(doc) - if document: - documents.append(document) + # Process each doctype incrementally + total_doctypes = len(self.doc_configs) + processed_doctypes = 0 - # Update progress during document preparation - if i % 100 == 0: - progress = 30 + int((i / total_records) * 20) # 30-50% range - self._update_progress("Preparing documents", progress, 100, absolute=True) + for doctype in self.doc_configs.keys(): + doctype_progress = progress.get(doctype, {}) - self._update_progress("Indexing documents", 50, 100, absolute=True) + # Skip if doctype is already complete + if doctype_progress.get("is_complete"): + processed_doctypes += 1 + continue - self._index_documents(documents) + self._update_progress( + f"Indexing {doctype}", + 20 + (processed_doctypes * 60 // total_doctypes), + 100, + absolute=True, + ) - self._update_progress("Building spell correction vocabulary", 80, 100, absolute=True) + # Process this doctype in batches + last_indexed_modified = doctype_progress.get("last_indexed_modified") + batch_count = 0 - # Build vocabulary for spelling correction - self._build_vocabulary(documents) + while True: + # Get batch of documents + docs = self.get_documents_paginated( + doctype, limit=batch_size, last_indexed_modified=last_indexed_modified + ) - # Atomic replacement: move temp database to final location - if os.path.exists(original_db_path): - os.unlink(original_db_path) - os.rename(temp_db_path, original_db_path) + if not docs: + # No more documents for this doctype + self._mark_doctype_complete(doctype) + break + + # Prepare and index documents + documents = [] + for doc in docs: + document = self.prepare_document(doc) + if document: + documents.append(document) + + if documents: + self._index_documents(documents) + + # Update progress with last processed document's modification time + # Use hardcoded 'modified' field since it's reliable in all Frappe doctypes + last_doc_modified = docs[-1]["modified"] + + last_doc_name = docs[-1]["name"] + self._update_index_progress(doctype, last_doc_name, last_doc_modified, len(documents)) + last_indexed_modified = last_doc_modified + + batch_count += 1 + + # Show progress based on total document counts across all doctypes + indexed_docs, total_docs = self._get_indexing_progress() + if total_docs > 0: + progress_percent = 20 + (indexed_docs * 60) // total_docs + self._update_progress( + f"Indexing {doctype} {indexed_docs}/{total_docs}", + progress_percent, + 100, + absolute=True, + ) + + processed_doctypes += 1 + + # Check if all doctypes are indexed before building vocabulary + if not self._is_vocabulary_built_needed(): + self._update_progress("All documents indexed, building vocabulary", 80, 100, absolute=True) + + # Build vocabulary incrementally + self._build_vocabulary_incremental() + self._mark_vocabulary_built() + + # Final atomic replacement if this was a fresh build + if temp_db_path and os.path.exists(temp_db_path): + if os.path.exists(original_db_path): + os.unlink(original_db_path) + os.rename(temp_db_path, original_db_path) self._update_progress("Search index build complete", 100, 100, absolute=True) # Print warning summary self._print_warning_summary() - except Exception: - # Clean up temp file on error - if os.path.exists(temp_db_path): - os.unlink(temp_db_path) + except Exception as e: + # Log the error + frappe.log_error( + title="Search Index Build Error", + message=f"Error during search index build: {e}", + ) raise finally: # Restore original database path - self.db_path = original_db_path + if temp_db_path: + self.db_path = original_db_path + + def _get_incomplete_count(self, where_clause): + """Get count of incomplete records from search_index_progress table. + + Args: + where_clause: SQL WHERE clause condition (without 'WHERE' keyword) + + Returns: + int: Count of matching records, or -1 on error + """ + try: + result = self.sql( + f""" + SELECT COUNT(*) as incomplete_count + FROM search_index_progress + WHERE {where_clause} + """, + read_only=True, + ) + return result[0]["incomplete_count"] + except sqlite3.Error: + return -1 + + def _is_vocabulary_built_needed(self): + """Check if vocabulary still needs to be built.""" + count = self._get_incomplete_count("is_complete = 0") + return count > 0 if count >= 0 else True + + def _build_vocabulary_incremental(self): + """Build vocabulary incrementally from indexed documents.""" + import re + + word_freq = defaultdict(int) + word_regex = re.compile(r"\w+") + + # Get all indexed documents in batches to avoid memory issues + batch_size = 1000 + offset = 0 + + while True: + try: + # Get batch of documents from FTS table + documents = self.sql( + f""" + SELECT title, content + FROM search_fts + LIMIT {batch_size} OFFSET {offset} + """, + read_only=True, + ) + + if not documents: + break + + # Process this batch + for i, doc in enumerate(documents): + # Show progress for large document sets + if (offset + i) % 1000 == 0: + self._update_progress( + f"Processing vocabulary ({offset + i} docs)", 85, 100, absolute=True + ) + + # Process title and content together + combined_text = " ".join([(doc["title"] or "").lower(), (doc["content"] or "").lower()]) + + # Extract all words at once + words = word_regex.findall(combined_text) + + for word in words: + if len(word) > MIN_WORD_LENGTH - 1 and word.isalpha(): + word_freq[word] += 1 + + offset += batch_size + + except sqlite3.Error: + break + + # Build vocabulary tables as before + if word_freq: + # Clear existing data + def clear_vocabulary(cursor): + cursor.execute("DELETE FROM search_vocabulary") + cursor.execute("DELETE FROM search_trigrams") + + self._with_connection(clear_vocabulary) + + # Prepare batch data + vocab_data = [] + trigram_data = [] + trigram_set = set() + + for word, freq in word_freq.items(): + vocab_data.append((word, freq, len(word))) + + trigrams = self._generate_trigrams(word) + for trigram in trigrams: + trigram_key = (trigram, word) + if trigram_key not in trigram_set: + trigram_set.add(trigram_key) + trigram_data.append(trigram_key) + + # Batch insert + def insert_vocabulary(cursor): + cursor.executemany( + "INSERT INTO search_vocabulary (word, frequency, length) VALUES (?, ?, ?)", vocab_data + ) + cursor.executemany("INSERT INTO search_trigrams (trigram, word) VALUES (?, ?)", trigram_data) + + self._with_connection(insert_vocabulary) # Status and Validation Methods + def _table_exists(self, table_name): + """Check if a table exists in the database.""" + try: + result = self.sql( + f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}'", read_only=True + ) + return bool(result) + except sqlite3.Error: + return False + def index_exists(self): """Check if FTS index exists.""" if not os.path.exists(self.db_path): return False - try: - result = self.sql( - "SELECT name FROM sqlite_master WHERE type='table' AND name='search_fts'", read_only=True - ) - return bool(result) - except sqlite3.Error: - return False + return self._table_exists("search_fts") def drop_index(self): """Drop the search index by removing the database file.""" @@ -408,6 +614,161 @@ class SQLiteSearch(ABC): return records + def get_documents_paginated(self, doctype, limit=1000, last_indexed_modified=None): + """Get records for a specific doctype with pagination support.""" + config = self.doc_configs.get(doctype) + if not config: + return [] + + filters = config.get("filters", {}).copy() + + # Ensure 'modified' field is always included for progress tracking + fields = config["fields"].copy() + if "modified" not in fields: + fields.append("modified") + + # Build query with proper ordering and pagination + # Order by modified field for reliable resume capability + query = frappe.qb.get_query( + doctype, + fields=fields, + filters=filters, + order_by="creation ASC, name ASC", # Secondary sort by name for consistency + limit=limit, + ) + + # If resuming from a specific timestamp, filter by modification time + # This is more reliable than name-based filtering for VARCHAR names + if last_indexed_modified: + Table = frappe.qb.DocType(doctype) + query = query.where(Table.modified > last_indexed_modified) + + docs = query.run(as_dict=True) + + for doc in docs: + doc.doctype = doctype + + return docs + + def _get_index_progress(self): + """Get current indexing progress for all doctypes.""" + try: + result = self.sql( + """ + SELECT doctype, last_indexed_name, last_indexed_modified, + total_docs, indexed_docs, batch_size, is_complete, + started_at, updated_at, vocabulary_built + FROM search_index_progress + """, + read_only=True, + ) + + progress = {} + for row in result: + progress[row["doctype"]] = dict(row) + + return progress + except sqlite3.Error: + return {} + + def _initialize_index_progress(self): + """Initialize progress tracking for all doctypes.""" + + def init_progress(cursor): + # Clear existing progress + cursor.execute("DELETE FROM search_index_progress") + + # Initialize progress for each doctype + for doctype in self.doc_configs.keys(): + # Get total count for this doctype + config = self.doc_configs[doctype] + total_count = frappe.qb.get_query( + doctype, filters=config.get("filters", {}), fields=[{"COUNT": "name", "as": "count"}] + ).run(as_dict=True)[0]["count"] + + cursor.execute( + """ + INSERT INTO search_index_progress + (doctype, total_docs, indexed_docs, batch_size, is_complete, started_at, updated_at, vocabulary_built, last_indexed_modified) + VALUES (?, ?, 0, 1000, 0, datetime('now'), datetime('now'), 0, 0) + """, + (doctype, total_count), + ) + + self._with_connection(init_progress) + + def _update_index_progress(self, doctype, last_indexed_name, last_indexed_modified, indexed_count): + """Update progress for a specific doctype.""" + + def update_progress(cursor): + cursor.execute( + """ + UPDATE search_index_progress + SET last_indexed_name = ?, + last_indexed_modified = ?, + indexed_docs = indexed_docs + ?, + updated_at = datetime('now') + WHERE doctype = ? + """, + (last_indexed_name, last_indexed_modified, indexed_count, doctype), + ) + + self._with_connection(update_progress) + + def _mark_doctype_complete(self, doctype): + """Mark a doctype as completely indexed.""" + + def mark_complete(cursor): + cursor.execute( + """ + UPDATE search_index_progress + SET is_complete = 1, updated_at = datetime('now') + WHERE doctype = ? + """, + (doctype,), + ) + + self._with_connection(mark_complete) + + def _mark_vocabulary_built(self): + """Mark vocabulary as built.""" + + def mark_built(cursor): + cursor.execute(""" + UPDATE search_index_progress + SET vocabulary_built = 1, updated_at = datetime('now') + """) + + self._with_connection(mark_built) + + def _is_indexing_complete(self): + """Check if all doctypes are completely indexed and vocabulary is built.""" + count = self._get_incomplete_count("is_complete = 0 OR vocabulary_built = 0") + return count == 0 if count >= 0 else False + + def _get_indexing_progress(self): + """Get overall indexing progress across all doctypes.""" + try: + result = self.sql( + """ + SELECT SUM(total_docs) as total_docs, SUM(indexed_docs) as indexed_docs + FROM search_index_progress + """, + read_only=True, + ) + + if result and result[0]: + total_docs = result[0]["total_docs"] or 0 + indexed_docs = result[0]["indexed_docs"] or 0 + return indexed_docs, total_docs + return 0, 0 + except sqlite3.Error: + return 0, 0 + + def _tables_exist(self): + """Check if the required tables exist in the current database.""" + return self._table_exists("search_fts") + # Private Implementation Methods def _execute_search_query(self, fts_query, title_only, filters): @@ -522,6 +883,7 @@ class SQLiteSearch(ABC): ORDER BY bm25_score LIMIT ? """ + print(sql) return self.sql(sql, params, read_only=True) def _process_search_results(self, raw_results, query): @@ -784,80 +1146,6 @@ class SQLiteSearch(ABC): similarities.sort(key=lambda x: x[1], reverse=True) return [word for word, score in similarities[:max_suggestions]] - def _build_vocabulary(self, documents): - """Build vocabulary and trigram index from documents for spelling correction.""" - import re - - word_freq = defaultdict(int) - word_regex = re.compile(r"\w+") # Compile regex once for efficiency - - # Extract words from all documents in batches - for i, doc in enumerate(documents): - # Show progress for large document sets - if i % 1000 == 0: - progress = 80 + int((i / len(documents)) * 15) # 80-95% range - self._update_progress( - f"Processing vocabulary ({i}/{len(documents)})", progress, 100, absolute=True - ) - - # Process title and content together for efficiency - combined_text = " ".join( - [(doc.get("title", "") or "").lower(), (doc.get("content", "") or "").lower()] - ) - - # Extract all words at once with compiled regex - words = word_regex.findall(combined_text) - - for word in words: - if len(word) > MIN_WORD_LENGTH - 1 and word.isalpha(): # Filter out short words and non-alpha - word_freq[word] += 1 - - # Clear existing data in a single transaction - conn = self._get_connection() - try: - cursor = conn.cursor() - cursor.execute("DELETE FROM search_vocabulary") - cursor.execute("DELETE FROM search_trigrams") - conn.commit() - finally: - conn.close() - - if not word_freq: - return - - # Prepare batch data for vocabulary - vocab_data = [] - trigram_data = [] - trigram_set = set() # Use set to avoid duplicate trigrams - - for word, freq in word_freq.items(): - vocab_data.append((word, freq, len(word))) - - # Generate trigrams for this word - trigrams = self._generate_trigrams(word) - for trigram in trigrams: - trigram_key = (trigram, word) - if trigram_key not in trigram_set: - trigram_set.add(trigram_key) - trigram_data.append(trigram_key) - - # Use batch inserts with a single transaction - conn = self._get_connection() - try: - cursor = conn.cursor() - - # Batch insert vocabulary - cursor.executemany( - "INSERT INTO search_vocabulary (word, frequency, length) VALUES (?, ?, ?)", vocab_data - ) - - # Batch insert trigrams (duplicates already removed) - cursor.executemany("INSERT INTO search_trigrams (trigram, word) VALUES (?, ?)", trigram_data) - - conn.commit() - finally: - conn.close() - # Database and Infrastructure Methods def _get_connection(self, read_only=False): @@ -886,6 +1174,26 @@ class SQLiteSearch(ABC): if is_read: cursor.execute("PRAGMA query_only = 1;") # Read-only optimization + def _with_connection(self, callback, read_only=False): + """Execute a callback with a managed database connection. + + Args: + callback: Function that takes (cursor) and performs database operations + read_only: Whether the connection is read-only + + Returns: + The return value of the callback, if any + """ + conn = self._get_connection(read_only=read_only) + try: + cursor = conn.cursor() + result = callback(cursor) + if not read_only: + conn.commit() + return result + finally: + conn.close() + def _ensure_fts_table(self): """Create FTS table and related tables if they don't exist.""" # Get schema from subclass @@ -893,11 +1201,7 @@ class SQLiteSearch(ABC): metadata_fields = self.schema["metadata_fields"] tokenizer = self.schema["tokenizer"] - # Use a single transaction for all table creation operations - conn = self._get_connection() - try: - cursor = conn.cursor() - + def create_tables(cursor): # Create the FTS table with dynamic columns cursor.execute(f""" CREATE VIRTUAL TABLE IF NOT EXISTS search_fts USING fts5( @@ -925,14 +1229,34 @@ class SQLiteSearch(ABC): ) """) + # Create the index progress tracking table + cursor.execute(""" + CREATE TABLE IF NOT EXISTS search_index_progress ( + id INTEGER PRIMARY KEY, + doctype TEXT, + last_indexed_name TEXT, + last_indexed_modified TEXT, + total_docs INTEGER DEFAULT 0, + indexed_docs INTEGER DEFAULT 0, + batch_size INTEGER DEFAULT 1000, + is_complete BOOLEAN DEFAULT 0, + started_at DATETIME, + updated_at DATETIME, + vocabulary_built BOOLEAN DEFAULT 0 + ) + """) + # Index for fast trigram lookups cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_trigram_lookup ON search_trigrams(trigram) """) - conn.commit() - finally: - conn.close() + # Index for progress tracking lookups + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_progress_doctype ON search_index_progress(doctype) + """) + + self._with_connection(create_tables) def _index_documents(self, documents): """Bulk index documents into SQLite FTS.""" @@ -955,10 +1279,8 @@ class SQLiteSearch(ABC): # Process documents in chunks to prevent memory issues with large datasets chunk_size = 1000 - conn = self._get_connection() - try: - cursor = conn.cursor() + def index_chunks(cursor): for i in range(0, len(documents), chunk_size): chunk = documents[i : i + chunk_size] doc_ids_to_delete = [] @@ -994,6 +1316,7 @@ class SQLiteSearch(ABC): values.append(doc.get(field, "")) doc_ids_to_delete.append(doc_id) + values_to_insert.append(tuple(values)) # Delete existing rows for these doc_ids first using a single statement @@ -1006,9 +1329,7 @@ class SQLiteSearch(ABC): if values_to_insert: cursor.executemany(insert_sql, values_to_insert) - conn.commit() - finally: - conn.close() + self._with_connection(index_chunks) def index_doc(self, doctype, docname): """Index a single document.""" @@ -1369,15 +1690,34 @@ class SQLiteSearch(ABC): def build_index_if_not_exists(): - """Build index if it doesn't exist.""" + """Build index if it doesn't exist or continue if temp DB exists. + + Called by scheduler every 3 hours to continue incomplete builds. + """ search_classes = get_search_classes() for SearchClass in search_classes: - build_index(SearchClass, force=False) + search = SearchClass() + if not search.is_search_enabled(): + continue + + # Check if a temp DB exists (incomplete build from previous run) + temp_db_path = search._get_db_path(is_temp=True) + if os.path.exists(temp_db_path): + # Continue the incomplete build + print(f"{SearchClass.__name__}: Found temp DB, continuing build...") + build_index(SearchClass, force=True, is_continuation=True) + elif not search.index_exists(): + # No index exists, start fresh build + print(f"{SearchClass.__name__}: No index exists, starting fresh build...") + build_index(SearchClass, force=False) def build_index( - SearchClass: type[SQLiteSearch] | None = None, search_class_path: str | None = None, force: bool = False + SearchClass: type[SQLiteSearch] | None = None, + search_class_path: str | None = None, + force: bool = False, + is_continuation: bool = False, ): """Build search index for SearchClass""" if not SearchClass and not search_class_path: @@ -1389,29 +1729,69 @@ def build_index( search = SearchClass() if not search.is_search_enabled(): return - if not search.index_exists() or force: - print(f"{SearchClass.__name__}: Index does not exist, building...") - search.build_index() + + if search.index_exists() and not force: + return + + # For continuation jobs, always proceed regardless of existing index + if is_continuation or force: + if is_continuation: + print(f"{SearchClass.__name__}: Continuing incremental index build...") + else: + print(f"{SearchClass.__name__}: Index does not exist or force=True, building...") + search.build_index(is_continuation=is_continuation) + + +def _enqueue_index_job(search_class_path: str, is_continuation: bool = False): + """Enqueue a search index build job. + + Args: + search_class_path: Full path to the search class (e.g., 'module.ClassName') + is_continuation: Whether this is a continuation of an incomplete build + """ + job_id = f"{search_class_path}_continuation" if is_continuation else search_class_path + job_type = "continuation" if is_continuation else "fresh build" + print(f"Enqueuing {job_type} for {search_class_path}.build_index") + + # timeout for 2 hour 10 minutes to account for job queue delays + timeout = 2 * 60 * 60 + 10 * 60 + + enqueue_kwargs = { + "queue": "long", + "job_id": job_id, + "deduplicate": True, + "search_class_path": search_class_path, + "force": True, + "is_continuation": is_continuation, + "timeout": timeout, + } + + frappe.enqueue("frappe.search.sqlite_search.build_index", **enqueue_kwargs) def build_index_in_background(): - """Enqueue index building in background.""" + """Enqueue index building in background. + + Called after migrate to start/continue index building. + """ search_classes = get_search_classes() for SearchClass in search_classes: search = SearchClass() if not search.is_search_enabled(): - return + continue + search_class_path = f"{SearchClass.__module__}.{SearchClass.__name__}" - print(f"Enqueuing {search_class_path}.build_index") - frappe.enqueue( - "frappe.search.sqlite_search.build_index", - queue="long", - job_id=search_class_path, - deduplicate=True, - # build_index args - search_class_path=search_class_path, - force=True, - ) + + # Check if a temp DB exists (incomplete build from previous run) + temp_db_path = search._get_db_path(is_temp=True) + if os.path.exists(temp_db_path): + # Continue the incomplete build + _enqueue_index_job(search_class_path, is_continuation=True) + elif not search.index_exists(): + # No index exists, start fresh build + _enqueue_index_job(search_class_path, is_continuation=False) + else: + print(f"Index for {search_class_path} already exists") def update_doc_index(doc: Document, method=None): diff --git a/frappe/tests/test_sqlite_search.py b/frappe/tests/test_sqlite_search.py index 8f6bfcee99..691c9161dd 100644 --- a/frappe/tests/test_sqlite_search.py +++ b/frappe/tests/test_sqlite_search.py @@ -483,6 +483,29 @@ class TestSQLiteSearchAPI(IntegrationTestCase): disabled_search.build_index() # Should not raise error but do nothing self.assertFalse(disabled_search.index_exists()) + @patch("frappe.enqueue") + def test_background_operations(self, mock_enqueue): + """Test background job integration and module-level functions.""" + from frappe.search.sqlite_search import build_index_in_background, get_search_classes + + # Test getting search classes + with patch("frappe.get_hooks") as mock_get_hooks: + mock_get_hooks.return_value = ["frappe.tests.test_sqlite_search.TestSQLiteSearch"] + classes = get_search_classes() + self.assertEqual(len(classes), 1) + self.assertEqual(classes[0], TestSQLiteSearch) + + # Ensure index doesn't exist so build_index_in_background will enqueue a job + self.search.drop_index() + + # Test background index building + with patch("frappe.get_hooks") as mock_get_hooks: + mock_get_hooks.return_value = ["frappe.tests.test_sqlite_search.TestSQLiteSearch"] + build_index_in_background() + + # Should have enqueued a background job since index doesn't exist + self.assertTrue(mock_enqueue.called) + def test_deduplication_on_reindex(self): """Test that re-indexing the same document does not create duplicates.""" self.search.build_index() @@ -546,26 +569,3 @@ class TestSQLiteSearchAPI(IntegrationTestCase): finally: test_note.delete() - - @patch("frappe.enqueue") - def test_background_operations(self, mock_enqueue): - """Test background job integration and module-level functions.""" - from frappe.search.sqlite_search import ( - build_index_in_background, - get_search_classes, - ) - - # Test getting search classes - with patch("frappe.get_hooks") as mock_get_hooks: - mock_get_hooks.return_value = ["frappe.tests.test_sqlite_search.TestSQLiteSearch"] - classes = get_search_classes() - self.assertEqual(len(classes), 1) - self.assertEqual(classes[0], TestSQLiteSearch) - - # Test background index building - with patch("frappe.get_hooks") as mock_get_hooks: - mock_get_hooks.return_value = ["frappe.tests.test_sqlite_search.TestSQLiteSearch"] - build_index_in_background() - - # Should have enqueued a background job - self.assertTrue(mock_enqueue.called)