diff --git a/frappe/hooks.py b/frappe/hooks.py index a977f39529..b37e79b7d4 100644 --- a/frappe/hooks.py +++ b/frappe/hooks.py @@ -216,7 +216,6 @@ scheduler_events = { "frappe.deferred_insert.save_to_db", "frappe.automation.doctype.reminder.reminder.send_reminders", "frappe.model.utils.link_count.update_link_count", - "frappe.search.sqlite_search.build_index_if_not_exists", "frappe.utils.telemetry.pulse.client.send_queued_events", ], # 10 minutes @@ -227,6 +226,9 @@ scheduler_events = { "30 * * * *": [], # Daily but offset by 45 minutes "45 0 * * *": [], + "0 */3 * * *": [ + "frappe.search.sqlite_search.build_index_if_not_exists", + ], }, "all": [ "frappe.email.queue.flush", diff --git a/frappe/search/sqlite_search.py b/frappe/search/sqlite_search.py index a3ca394fab..2cd4b2078f 100644 --- a/frappe/search/sqlite_search.py +++ b/frappe/search/sqlite_search.py @@ -291,14 +291,15 @@ class SQLiteSearch(ABC): }, } - def build_index(self, batch_size=1000, max_runtime_minutes=1, is_continuation=False): - """Build the search index incrementally with progress tracking and time-based exit.""" + def build_index(self, batch_size=1000, is_continuation=False): + """Build the search index incrementally with progress tracking. + + The job runs until completion or until killed by the queue timeout. + Progress is tracked in the database so builds can resume from where they left off. + """ if not self.is_search_enabled(): return - start_time = time.time() - max_runtime = max_runtime_minutes * 60 - # Use temporary database path for atomic replacement (only for new index builds) temp_db_path = None original_db_path = self.db_path @@ -326,7 +327,6 @@ class SQLiteSearch(ABC): print(f"Continuation: Using regular database {self.db_path}") # If no temp db exists, we're continuing with regular database (already set) - # Debug: Show which database we're working with if temp_db_path: print(f"Working with temporary database: {self.db_path}") else: @@ -340,13 +340,7 @@ class SQLiteSearch(ABC): # Clear existing index data for fresh build if temp_db_path and not is_continuation: - conn = self._get_connection() - try: - cursor = conn.cursor() - cursor.execute("DELETE FROM search_fts") - conn.commit() - finally: - conn.close() + self._with_connection(lambda cursor: cursor.execute("DELETE FROM search_fts")) # Initialize progress tracking (only for completely fresh builds) if not is_continuation: @@ -365,14 +359,6 @@ class SQLiteSearch(ABC): processed_doctypes = 0 for doctype in self.doc_configs.keys(): - # Check time limit - if time.time() - start_time > max_runtime: - self._update_progress( - "Time limit reached, queuing continuation job", 90, 100, absolute=True - ) - self._queue_continuation_job() - return - doctype_progress = progress.get(doctype, {}) # Skip if doctype is already complete @@ -392,17 +378,6 @@ class SQLiteSearch(ABC): batch_count = 0 while True: - # Check time limit before each batch - if time.time() - start_time > max_runtime: - self._update_progress( - "Time limit reached during doctype processing, queuing continuation", - 90, - 100, - absolute=True, - ) - self._queue_continuation_job() - return - # Get batch of documents docs = self.get_documents_paginated( doctype, limit=batch_size, last_indexed_modified=last_indexed_modified @@ -414,8 +389,6 @@ class SQLiteSearch(ABC): break # Prepare and index documents - print(f"preparing {len(docs)} documents {doctype}") - print(time.time() - start_time) documents = [] for doc in docs: document = self.prepare_document(doc) @@ -435,14 +408,21 @@ class SQLiteSearch(ABC): batch_count += 1 - # Show progress within doctype + # Show progress based on current doctype's document counts if batch_count % 5 == 0: # Update every 5 batches - current_progress = 20 + (processed_doctypes * 60 // total_doctypes) - self._update_progress( - f"Indexing {doctype} (batch {batch_count})", current_progress, 100, absolute=True - ) + indexed_docs, total_docs = self._get_doctype_progress(doctype) + if total_docs > 0: + progress_percent = ( + indexed_docs * 100 + ) // total_docs # 0-100% for current doctype + self._update_progress( + f"Indexing {doctype} - {indexed_docs:,}/{total_docs:,} documents ({progress_percent}%)", + progress_percent, + 100, + absolute=True, + ) - processed_doctypes += 1 + processed_doctypes += 1 # Check if all doctypes are indexed before building vocabulary if not self._is_vocabulary_built_needed(): @@ -463,31 +443,44 @@ class SQLiteSearch(ABC): # Print warning summary self._print_warning_summary() - except Exception: - # Clean up temp file on error - if temp_db_path and os.path.exists(temp_db_path): - os.unlink(temp_db_path) + except Exception as e: + # Log the error + frappe.log_error( + title="Search Index Build Error", + message=f"Error during search index build: {e}", + ) raise finally: # Restore original database path if temp_db_path: self.db_path = original_db_path - def _is_vocabulary_built_needed(self): - """Check if vocabulary still needs to be built.""" + def _get_incomplete_count(self, where_clause): + """Get count of incomplete records from search_index_progress table. + + Args: + where_clause: SQL WHERE clause condition (without 'WHERE' keyword) + + Returns: + int: Count of matching records, or -1 on error + """ try: result = self.sql( - """ + f""" SELECT COUNT(*) as incomplete_count FROM search_index_progress - WHERE is_complete = 0 + WHERE {where_clause} """, read_only=True, ) - - return result[0]["incomplete_count"] > 0 + return result[0]["incomplete_count"] except sqlite3.Error: - return True + return -1 + + def _is_vocabulary_built_needed(self): + """Check if vocabulary still needs to be built.""" + count = self._get_incomplete_count("is_complete = 0") + return count > 0 if count >= 0 else True def _build_vocabulary_incremental(self): """Build vocabulary incrementally from indexed documents.""" @@ -541,14 +534,11 @@ class SQLiteSearch(ABC): # Build vocabulary tables as before if word_freq: # Clear existing data - conn = self._get_connection() - try: - cursor = conn.cursor() + def clear_vocabulary(cursor): cursor.execute("DELETE FROM search_vocabulary") cursor.execute("DELETE FROM search_trigrams") - conn.commit() - finally: - conn.close() + + self._with_connection(clear_vocabulary) # Prepare batch data vocab_data = [] @@ -566,31 +556,32 @@ class SQLiteSearch(ABC): trigram_data.append(trigram_key) # Batch insert - conn = self._get_connection() - try: - cursor = conn.cursor() + def insert_vocabulary(cursor): cursor.executemany( "INSERT INTO search_vocabulary (word, frequency, length) VALUES (?, ?, ?)", vocab_data ) cursor.executemany("INSERT INTO search_trigrams (trigram, word) VALUES (?, ?)", trigram_data) - conn.commit() - finally: - conn.close() + + self._with_connection(insert_vocabulary) # Status and Validation Methods + def _table_exists(self, table_name): + """Check if a table exists in the database.""" + try: + result = self.sql( + f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}'", read_only=True + ) + return bool(result) + except sqlite3.Error: + return False + def index_exists(self): """Check if FTS index exists.""" if not os.path.exists(self.db_path): return False - try: - result = self.sql( - "SELECT name FROM sqlite_master WHERE type='table' AND name='search_fts'", read_only=True - ) - return bool(result) - except sqlite3.Error: - return False + return self._table_exists("search_fts") def drop_index(self): """Drop the search index by removing the database file.""" @@ -633,7 +624,6 @@ class SQLiteSearch(ABC): return [] filters = config.get("filters", {}).copy() - filters = config.get("filters", {}).copy() # Ensure 'modified' field is always included for progress tracking fields = config["fields"].copy() @@ -646,7 +636,7 @@ class SQLiteSearch(ABC): doctype, fields=fields, filters=filters, - order_by="modified ASC, name ASC", # Secondary sort by name for consistency + order_by="creation ASC, name ASC", # Secondary sort by name for consistency limit=limit, ) @@ -686,10 +676,8 @@ class SQLiteSearch(ABC): def _initialize_index_progress(self): """Initialize progress tracking for all doctypes.""" - conn = self._get_connection() - try: - cursor = conn.cursor() + def init_progress(cursor): # Clear existing progress cursor.execute("DELETE FROM search_index_progress") @@ -710,15 +698,12 @@ class SQLiteSearch(ABC): (doctype, total_count), ) - conn.commit() - finally: - conn.close() + self._with_connection(init_progress) def _update_index_progress(self, doctype, last_indexed_name, last_indexed_modified, indexed_count): """Update progress for a specific doctype.""" - conn = self._get_connection() - try: - cursor = conn.cursor() + + def update_progress(cursor): cursor.execute( """ UPDATE search_index_progress @@ -730,15 +715,13 @@ class SQLiteSearch(ABC): """, (last_indexed_name, last_indexed_modified, indexed_count, doctype), ) - conn.commit() - finally: - conn.close() + + self._with_connection(update_progress) def _mark_doctype_complete(self, doctype): """Mark a doctype as completely indexed.""" - conn = self._get_connection() - try: - cursor = conn.cursor() + + def mark_complete(cursor): cursor.execute( """ UPDATE search_index_progress @@ -747,61 +730,49 @@ class SQLiteSearch(ABC): """, (doctype,), ) - conn.commit() - finally: - conn.close() + + self._with_connection(mark_complete) def _mark_vocabulary_built(self): """Mark vocabulary as built.""" - conn = self._get_connection() - try: - cursor = conn.cursor() + + def mark_built(cursor): cursor.execute(""" UPDATE search_index_progress SET vocabulary_built = 1, updated_at = datetime('now') """) - conn.commit() - finally: - conn.close() + + self._with_connection(mark_built) def _is_indexing_complete(self): """Check if all doctypes are completely indexed and vocabulary is built.""" + count = self._get_incomplete_count("is_complete = 0 OR vocabulary_built = 0") + return count == 0 if count >= 0 else False + + def _get_doctype_progress(self, doctype): + """Get indexing progress for a specific doctype.""" try: result = self.sql( """ - SELECT COUNT(*) as incomplete_count + SELECT total_docs, indexed_docs FROM search_index_progress - WHERE is_complete = 0 OR vocabulary_built = 0 + WHERE doctype = ? """, + (doctype,), read_only=True, ) - return result[0]["incomplete_count"] == 0 + if result and result[0]: + total_docs = result[0]["total_docs"] or 0 + indexed_docs = result[0]["indexed_docs"] or 0 + return indexed_docs, total_docs + return 0, 0 except sqlite3.Error: - return False - - def _queue_continuation_job(self): - """Queue a continuation job to resume indexing.""" - search_class_path = f"{self.__class__.__module__}.{self.__class__.__name__}" - frappe.enqueue( - "frappe.search.sqlite_search.build_index", - queue="long", - job_id=f"{search_class_path}_continuation", - deduplicate=True, - search_class_path=search_class_path, - force=True, - is_continuation=True, - ) + return 0, 0 def _tables_exist(self): """Check if the required tables exist in the current database.""" - try: - result = self.sql( - "SELECT name FROM sqlite_master WHERE type='table' AND name='search_fts'", read_only=True - ) - return bool(result) - except sqlite3.Error: - return False + return self._table_exists("search_fts") # Private Implementation Methods @@ -917,6 +888,7 @@ class SQLiteSearch(ABC): ORDER BY bm25_score LIMIT ? """ + print(sql) return self.sql(sql, params, read_only=True) def _process_search_results(self, raw_results, query): @@ -1179,80 +1151,6 @@ class SQLiteSearch(ABC): similarities.sort(key=lambda x: x[1], reverse=True) return [word for word, score in similarities[:max_suggestions]] - def _build_vocabulary(self, documents): - """Build vocabulary and trigram index from documents for spelling correction.""" - import re - - word_freq = defaultdict(int) - word_regex = re.compile(r"\w+") # Compile regex once for efficiency - - # Extract words from all documents in batches - for i, doc in enumerate(documents): - # Show progress for large document sets - if i % 1000 == 0: - progress = 80 + int((i / len(documents)) * 15) # 80-95% range - self._update_progress( - f"Processing vocabulary ({i}/{len(documents)})", progress, 100, absolute=True - ) - - # Process title and content together for efficiency - combined_text = " ".join( - [(doc.get("title", "") or "").lower(), (doc.get("content", "") or "").lower()] - ) - - # Extract all words at once with compiled regex - words = word_regex.findall(combined_text) - - for word in words: - if len(word) > MIN_WORD_LENGTH - 1 and word.isalpha(): # Filter out short words and non-alpha - word_freq[word] += 1 - - # Clear existing data in a single transaction - conn = self._get_connection() - try: - cursor = conn.cursor() - cursor.execute("DELETE FROM search_vocabulary") - cursor.execute("DELETE FROM search_trigrams") - conn.commit() - finally: - conn.close() - - if not word_freq: - return - - # Prepare batch data for vocabulary - vocab_data = [] - trigram_data = [] - trigram_set = set() # Use set to avoid duplicate trigrams - - for word, freq in word_freq.items(): - vocab_data.append((word, freq, len(word))) - - # Generate trigrams for this word - trigrams = self._generate_trigrams(word) - for trigram in trigrams: - trigram_key = (trigram, word) - if trigram_key not in trigram_set: - trigram_set.add(trigram_key) - trigram_data.append(trigram_key) - - # Use batch inserts with a single transaction - conn = self._get_connection() - try: - cursor = conn.cursor() - - # Batch insert vocabulary - cursor.executemany( - "INSERT INTO search_vocabulary (word, frequency, length) VALUES (?, ?, ?)", vocab_data - ) - - # Batch insert trigrams (duplicates already removed) - cursor.executemany("INSERT INTO search_trigrams (trigram, word) VALUES (?, ?)", trigram_data) - - conn.commit() - finally: - conn.close() - # Database and Infrastructure Methods def _get_connection(self, read_only=False): @@ -1281,6 +1179,26 @@ class SQLiteSearch(ABC): if is_read: cursor.execute("PRAGMA query_only = 1;") # Read-only optimization + def _with_connection(self, callback, read_only=False): + """Execute a callback with a managed database connection. + + Args: + callback: Function that takes (cursor) and performs database operations + read_only: Whether the connection is read-only + + Returns: + The return value of the callback, if any + """ + conn = self._get_connection(read_only=read_only) + try: + cursor = conn.cursor() + result = callback(cursor) + if not read_only: + conn.commit() + return result + finally: + conn.close() + def _ensure_fts_table(self): """Create FTS table and related tables if they don't exist.""" # Get schema from subclass @@ -1288,11 +1206,7 @@ class SQLiteSearch(ABC): metadata_fields = self.schema["metadata_fields"] tokenizer = self.schema["tokenizer"] - # Use a single transaction for all table creation operations - conn = self._get_connection() - try: - cursor = conn.cursor() - + def create_tables(cursor): # Create the FTS table with dynamic columns cursor.execute(f""" CREATE VIRTUAL TABLE IF NOT EXISTS search_fts USING fts5( @@ -1347,9 +1261,7 @@ class SQLiteSearch(ABC): CREATE INDEX IF NOT EXISTS idx_progress_doctype ON search_index_progress(doctype) """) - conn.commit() - finally: - conn.close() + self._with_connection(create_tables) def _index_documents(self, documents): """Bulk index documents into SQLite FTS.""" @@ -1372,10 +1284,8 @@ class SQLiteSearch(ABC): # Process documents in chunks to prevent memory issues with large datasets chunk_size = 1000 - conn = self._get_connection() - try: - cursor = conn.cursor() + def index_chunks(cursor): for i in range(0, len(documents), chunk_size): chunk = documents[i : i + chunk_size] doc_ids_to_delete = [] @@ -1411,6 +1321,7 @@ class SQLiteSearch(ABC): values.append(doc.get(field, "")) doc_ids_to_delete.append(doc_id) + values_to_insert.append(tuple(values)) # Delete existing rows for these doc_ids first using a single statement @@ -1423,9 +1334,7 @@ class SQLiteSearch(ABC): if values_to_insert: cursor.executemany(insert_sql, values_to_insert) - conn.commit() - finally: - conn.close() + self._with_connection(index_chunks) def index_doc(self, doctype, docname): """Index a single document.""" @@ -1786,11 +1695,27 @@ class SQLiteSearch(ABC): def build_index_if_not_exists(): - """Build index if it doesn't exist.""" + """Build index if it doesn't exist or continue if temp DB exists. + + Called by scheduler every 3 hours to continue incomplete builds. + """ search_classes = get_search_classes() for SearchClass in search_classes: - build_index(SearchClass, force=False) + search = SearchClass() + if not search.is_search_enabled(): + continue + + # Check if a temp DB exists (incomplete build from previous run) + temp_db_path = search._get_db_path(is_temp=True) + if os.path.exists(temp_db_path): + # Continue the incomplete build + print(f"{SearchClass.__name__}: Found temp DB, continuing build...") + build_index(SearchClass, force=True, is_continuation=True) + elif not search.index_exists(): + # No index exists, start fresh build + print(f"{SearchClass.__name__}: No index exists, starting fresh build...") + build_index(SearchClass, force=False) def build_index( @@ -1810,9 +1735,11 @@ def build_index( if not search.is_search_enabled(): return + if search.index_exists() and not force: + return + # For continuation jobs, always proceed regardless of existing index - # For fresh builds, only build if index doesn't exist or force=True - if is_continuation or not search.index_exists() or force: + if is_continuation or force: if is_continuation: print(f"{SearchClass.__name__}: Continuing incremental index build...") else: @@ -1820,8 +1747,38 @@ def build_index( search.build_index(is_continuation=is_continuation) +def _enqueue_index_job(search_class_path: str, is_continuation: bool = False): + """Enqueue a search index build job. + + Args: + search_class_path: Full path to the search class (e.g., 'module.ClassName') + is_continuation: Whether this is a continuation of an incomplete build + """ + job_id = f"{search_class_path}_continuation" if is_continuation else search_class_path + job_type = "continuation" if is_continuation else "fresh build" + print(f"Enqueuing {job_type} for {search_class_path}.build_index") + + # timeout for 2 hour 10 minutes to account for job queue delays + timeout = 2 * 60 * 60 + 10 * 60 + + enqueue_kwargs = { + "queue": "long", + "job_id": job_id, + "deduplicate": True, + "search_class_path": search_class_path, + "force": True, + "is_continuation": is_continuation, + "timeout": timeout, + } + + frappe.enqueue("frappe.search.sqlite_search.build_index", **enqueue_kwargs) + + def build_index_in_background(): - """Enqueue index building in background.""" + """Enqueue index building in background. + + Called after migrate to start/continue index building. + """ search_classes = get_search_classes() for SearchClass in search_classes: search = SearchClass() @@ -1830,46 +1787,16 @@ def build_index_in_background(): search_class_path = f"{SearchClass.__module__}.{SearchClass.__name__}" - # Check if indexing is already in progress or complete - if search.index_exists(): - try: - # Check if there are any incomplete progress records - progress = search._get_index_progress() - if progress and not search._is_indexing_complete(): - print(f"Enqueuing continuation for {search_class_path}.build_index") - frappe.enqueue( - "frappe.search.sqlite_search.build_index", - queue="long", - job_id=f"{search_class_path}_continuation", - deduplicate=True, - search_class_path=search_class_path, - force=True, - is_continuation=True, - ) - else: - print(f"Index for {search_class_path} is already complete") - except Exception: - # If we can't check progress, assume we need to rebuild - print(f"Enqueuing fresh build for {search_class_path}.build_index") - frappe.enqueue( - "frappe.search.sqlite_search.build_index", - queue="long", - job_id=search_class_path, - deduplicate=True, - search_class_path=search_class_path, - force=True, - ) - else: + # Check if a temp DB exists (incomplete build from previous run) + temp_db_path = search._get_db_path(is_temp=True) + if os.path.exists(temp_db_path): + # Continue the incomplete build + _enqueue_index_job(search_class_path, is_continuation=True) + elif not search.index_exists(): # No index exists, start fresh build - print(f"Enqueuing fresh build for {search_class_path}.build_index") - frappe.enqueue( - "frappe.search.sqlite_search.build_index", - queue="long", - job_id=search_class_path, - deduplicate=True, - search_class_path=search_class_path, - force=True, - ) + _enqueue_index_job(search_class_path, is_continuation=False) + else: + print(f"Index for {search_class_path} already exists") def update_doc_index(doc: Document, method=None): diff --git a/frappe/tests/test_sqlite_search.py b/frappe/tests/test_sqlite_search.py index 8f6bfcee99..691c9161dd 100644 --- a/frappe/tests/test_sqlite_search.py +++ b/frappe/tests/test_sqlite_search.py @@ -483,6 +483,29 @@ class TestSQLiteSearchAPI(IntegrationTestCase): disabled_search.build_index() # Should not raise error but do nothing self.assertFalse(disabled_search.index_exists()) + @patch("frappe.enqueue") + def test_background_operations(self, mock_enqueue): + """Test background job integration and module-level functions.""" + from frappe.search.sqlite_search import build_index_in_background, get_search_classes + + # Test getting search classes + with patch("frappe.get_hooks") as mock_get_hooks: + mock_get_hooks.return_value = ["frappe.tests.test_sqlite_search.TestSQLiteSearch"] + classes = get_search_classes() + self.assertEqual(len(classes), 1) + self.assertEqual(classes[0], TestSQLiteSearch) + + # Ensure index doesn't exist so build_index_in_background will enqueue a job + self.search.drop_index() + + # Test background index building + with patch("frappe.get_hooks") as mock_get_hooks: + mock_get_hooks.return_value = ["frappe.tests.test_sqlite_search.TestSQLiteSearch"] + build_index_in_background() + + # Should have enqueued a background job since index doesn't exist + self.assertTrue(mock_enqueue.called) + def test_deduplication_on_reindex(self): """Test that re-indexing the same document does not create duplicates.""" self.search.build_index() @@ -546,26 +569,3 @@ class TestSQLiteSearchAPI(IntegrationTestCase): finally: test_note.delete() - - @patch("frappe.enqueue") - def test_background_operations(self, mock_enqueue): - """Test background job integration and module-level functions.""" - from frappe.search.sqlite_search import ( - build_index_in_background, - get_search_classes, - ) - - # Test getting search classes - with patch("frappe.get_hooks") as mock_get_hooks: - mock_get_hooks.return_value = ["frappe.tests.test_sqlite_search.TestSQLiteSearch"] - classes = get_search_classes() - self.assertEqual(len(classes), 1) - self.assertEqual(classes[0], TestSQLiteSearch) - - # Test background index building - with patch("frappe.get_hooks") as mock_get_hooks: - mock_get_hooks.return_value = ["frappe.tests.test_sqlite_search.TestSQLiteSearch"] - build_index_in_background() - - # Should have enqueued a background job - self.assertTrue(mock_enqueue.called)