From 2a7e6a7793a7d3b605d0020b75666d30445c6edb Mon Sep 17 00:00:00 2001 From: Faris Ansari Date: Thu, 16 Oct 2025 02:19:13 +0530 Subject: [PATCH 1/3] refactor: resumable search index --- frappe/search/sqlite_search.py | 560 ++++++++++++++++++++++++++++++--- 1 file changed, 509 insertions(+), 51 deletions(-) diff --git a/frappe/search/sqlite_search.py b/frappe/search/sqlite_search.py index 2f9ffd9ee8..a3ca394fab 100644 --- a/frappe/search/sqlite_search.py +++ b/frappe/search/sqlite_search.py @@ -291,59 +291,172 @@ class SQLiteSearch(ABC): }, } - def build_index(self): - """Build the complete search index from scratch using atomic replacement.""" + def build_index(self, batch_size=1000, max_runtime_minutes=1, is_continuation=False): + """Build the search index incrementally with progress tracking and time-based exit.""" if not self.is_search_enabled(): return - # Use temporary database path for atomic replacement - temp_db_path = self._get_db_path(is_temp=True) + start_time = time.time() + max_runtime = max_runtime_minutes * 60 + + # Use temporary database path for atomic replacement (only for new index builds) + temp_db_path = None original_db_path = self.db_path - # Remove temp file if it exists - if os.path.exists(temp_db_path): - os.unlink(temp_db_path) + # Check if this is a completely fresh build or a continuation + if not is_continuation and not self.index_exists(): + # Fresh build - use temporary database for atomic replacement + temp_db_path = self._get_db_path(is_temp=True) - # Temporarily switch to temp database for building - self.db_path = temp_db_path + # Remove temp file if it exists + if os.path.exists(temp_db_path): + os.unlink(temp_db_path) + + # Switch to temp database for building + self.db_path = temp_db_path + elif is_continuation: + # Check if we're continuing a fresh build (temp db exists) + potential_temp_path = self._get_db_path(is_temp=True) + if os.path.exists(potential_temp_path): + # Continue with temporary database from fresh build + temp_db_path = potential_temp_path + self.db_path = temp_db_path + print(f"Continuation: Using temporary database {temp_db_path}") + else: + print(f"Continuation: Using regular database {self.db_path}") + # If no temp db exists, we're continuing with regular database (already set) + + # Debug: Show which database we're working with + if temp_db_path: + print(f"Working with temporary database: {self.db_path}") + else: + print(f"Working with regular database: {self.db_path}") try: - self._update_progress("Setting up search tables", 0, 100, absolute=True) + # Setup tables if needed (for fresh builds or when temp DB was just created) + if not is_continuation or (temp_db_path and not self._tables_exist()): + self._update_progress("Setting up search tables", 0, 100, absolute=True) + self._ensure_fts_table() - # Setup tables in temp database - self._ensure_fts_table() + # Clear existing index data for fresh build + if temp_db_path and not is_continuation: + conn = self._get_connection() + try: + cursor = conn.cursor() + cursor.execute("DELETE FROM search_fts") + conn.commit() + finally: + conn.close() - self._update_progress("Fetching records", 20, 100, absolute=True) + # Initialize progress tracking (only for completely fresh builds) + if not is_continuation: + self._initialize_index_progress() - records = self.get_documents() - documents = [] + # Get current progress + progress = self._get_index_progress() - self._update_progress("Preparing documents", 30, 100, absolute=True) + # Check if indexing is already complete + if self._is_indexing_complete(): + self._update_progress("Search index already complete", 100, 100, absolute=True) + return - total_records = len(records) - for i, doc in enumerate(records): - document = self.prepare_document(doc) - if document: - documents.append(document) + # Process each doctype incrementally + total_doctypes = len(self.doc_configs) + processed_doctypes = 0 - # Update progress during document preparation - if i % 100 == 0: - progress = 30 + int((i / total_records) * 20) # 30-50% range - self._update_progress("Preparing documents", progress, 100, absolute=True) + for doctype in self.doc_configs.keys(): + # Check time limit + if time.time() - start_time > max_runtime: + self._update_progress( + "Time limit reached, queuing continuation job", 90, 100, absolute=True + ) + self._queue_continuation_job() + return - self._update_progress("Indexing documents", 50, 100, absolute=True) + doctype_progress = progress.get(doctype, {}) - self._index_documents(documents) + # Skip if doctype is already complete + if doctype_progress.get("is_complete"): + processed_doctypes += 1 + continue - self._update_progress("Building spell correction vocabulary", 80, 100, absolute=True) + self._update_progress( + f"Indexing {doctype}", + 20 + (processed_doctypes * 60 // total_doctypes), + 100, + absolute=True, + ) - # Build vocabulary for spelling correction - self._build_vocabulary(documents) + # Process this doctype in batches + last_indexed_modified = doctype_progress.get("last_indexed_modified") + batch_count = 0 - # Atomic replacement: move temp database to final location - if os.path.exists(original_db_path): - os.unlink(original_db_path) - os.rename(temp_db_path, original_db_path) + while True: + # Check time limit before each batch + if time.time() - start_time > max_runtime: + self._update_progress( + "Time limit reached during doctype processing, queuing continuation", + 90, + 100, + absolute=True, + ) + self._queue_continuation_job() + return + + # Get batch of documents + docs = self.get_documents_paginated( + doctype, limit=batch_size, last_indexed_modified=last_indexed_modified + ) + + if not docs: + # No more documents for this doctype + self._mark_doctype_complete(doctype) + break + + # Prepare and index documents + print(f"preparing {len(docs)} documents {doctype}") + print(time.time() - start_time) + documents = [] + for doc in docs: + document = self.prepare_document(doc) + if document: + documents.append(document) + + if documents: + self._index_documents(documents) + + # Update progress with last processed document's modification time + # Use hardcoded 'modified' field since it's reliable in all Frappe doctypes + last_doc_modified = docs[-1]["modified"] + + last_doc_name = docs[-1]["name"] + self._update_index_progress(doctype, last_doc_name, last_doc_modified, len(documents)) + last_indexed_modified = last_doc_modified + + batch_count += 1 + + # Show progress within doctype + if batch_count % 5 == 0: # Update every 5 batches + current_progress = 20 + (processed_doctypes * 60 // total_doctypes) + self._update_progress( + f"Indexing {doctype} (batch {batch_count})", current_progress, 100, absolute=True + ) + + processed_doctypes += 1 + + # Check if all doctypes are indexed before building vocabulary + if not self._is_vocabulary_built_needed(): + self._update_progress("All documents indexed, building vocabulary", 80, 100, absolute=True) + + # Build vocabulary incrementally + self._build_vocabulary_incremental() + self._mark_vocabulary_built() + + # Final atomic replacement if this was a fresh build + if temp_db_path and os.path.exists(temp_db_path): + if os.path.exists(original_db_path): + os.unlink(original_db_path) + os.rename(temp_db_path, original_db_path) self._update_progress("Search index build complete", 100, 100, absolute=True) @@ -352,12 +465,117 @@ class SQLiteSearch(ABC): except Exception: # Clean up temp file on error - if os.path.exists(temp_db_path): + if temp_db_path and os.path.exists(temp_db_path): os.unlink(temp_db_path) raise finally: # Restore original database path - self.db_path = original_db_path + if temp_db_path: + self.db_path = original_db_path + + def _is_vocabulary_built_needed(self): + """Check if vocabulary still needs to be built.""" + try: + result = self.sql( + """ + SELECT COUNT(*) as incomplete_count + FROM search_index_progress + WHERE is_complete = 0 + """, + read_only=True, + ) + + return result[0]["incomplete_count"] > 0 + except sqlite3.Error: + return True + + def _build_vocabulary_incremental(self): + """Build vocabulary incrementally from indexed documents.""" + import re + + word_freq = defaultdict(int) + word_regex = re.compile(r"\w+") + + # Get all indexed documents in batches to avoid memory issues + batch_size = 1000 + offset = 0 + + while True: + try: + # Get batch of documents from FTS table + documents = self.sql( + f""" + SELECT title, content + FROM search_fts + LIMIT {batch_size} OFFSET {offset} + """, + read_only=True, + ) + + if not documents: + break + + # Process this batch + for i, doc in enumerate(documents): + # Show progress for large document sets + if (offset + i) % 1000 == 0: + self._update_progress( + f"Processing vocabulary ({offset + i} docs)", 85, 100, absolute=True + ) + + # Process title and content together + combined_text = " ".join([(doc["title"] or "").lower(), (doc["content"] or "").lower()]) + + # Extract all words at once + words = word_regex.findall(combined_text) + + for word in words: + if len(word) > MIN_WORD_LENGTH - 1 and word.isalpha(): + word_freq[word] += 1 + + offset += batch_size + + except sqlite3.Error: + break + + # Build vocabulary tables as before + if word_freq: + # Clear existing data + conn = self._get_connection() + try: + cursor = conn.cursor() + cursor.execute("DELETE FROM search_vocabulary") + cursor.execute("DELETE FROM search_trigrams") + conn.commit() + finally: + conn.close() + + # Prepare batch data + vocab_data = [] + trigram_data = [] + trigram_set = set() + + for word, freq in word_freq.items(): + vocab_data.append((word, freq, len(word))) + + trigrams = self._generate_trigrams(word) + for trigram in trigrams: + trigram_key = (trigram, word) + if trigram_key not in trigram_set: + trigram_set.add(trigram_key) + trigram_data.append(trigram_key) + + # Batch insert + conn = self._get_connection() + try: + cursor = conn.cursor() + cursor.executemany( + "INSERT INTO search_vocabulary (word, frequency, length) VALUES (?, ?, ?)", vocab_data + ) + cursor.executemany("INSERT INTO search_trigrams (trigram, word) VALUES (?, ?)", trigram_data) + conn.commit() + finally: + conn.close() # Status and Validation Methods @@ -408,6 +626,183 @@ class SQLiteSearch(ABC): return records + def get_documents_paginated(self, doctype, limit=1000, last_indexed_modified=None): + """Get records for a specific doctype with pagination support.""" + config = self.doc_configs.get(doctype) + if not config: + return [] + + filters = config.get("filters", {}).copy() + filters = config.get("filters", {}).copy() + + # Ensure 'modified' field is always included for progress tracking + fields = config["fields"].copy() + if "modified" not in fields: + fields.append("modified") + + # Build query with proper ordering and pagination + # Order by modified field for reliable resume capability + query = frappe.qb.get_query( + doctype, + fields=fields, + filters=filters, + order_by="modified ASC, name ASC", # Secondary sort by name for consistency + limit=limit, + ) + + # If resuming from a specific timestamp, filter by modification time + # This is more reliable than name-based filtering for VARCHAR names + if last_indexed_modified: + Table = frappe.qb.DocType(doctype) + query = query.where(Table.modified > last_indexed_modified) + + docs = query.run(as_dict=True) + + for doc in docs: + doc.doctype = doctype + + return docs + + def _get_index_progress(self): + """Get current indexing progress for all doctypes.""" + try: + result = self.sql( + """ + SELECT doctype, last_indexed_name, last_indexed_modified, + total_docs, indexed_docs, batch_size, is_complete, + started_at, updated_at, vocabulary_built + FROM search_index_progress + """, + read_only=True, + ) + + progress = {} + for row in result: + progress[row["doctype"]] = dict(row) + + return progress + except sqlite3.Error: + return {} + + def _initialize_index_progress(self): + """Initialize progress tracking for all doctypes.""" + conn = self._get_connection() + try: + cursor = conn.cursor() + + # Clear existing progress + cursor.execute("DELETE FROM search_index_progress") + + # Initialize progress for each doctype + for doctype in self.doc_configs.keys(): + # Get total count for this doctype + config = self.doc_configs[doctype] + total_count = frappe.qb.get_query( + doctype, filters=config.get("filters", {}), fields=[{"COUNT": "name", "as": "count"}] + ).run(as_dict=True)[0]["count"] + + cursor.execute( + """ + INSERT INTO search_index_progress + (doctype, total_docs, indexed_docs, batch_size, is_complete, started_at, updated_at, vocabulary_built, last_indexed_modified) + VALUES (?, ?, 0, 1000, 0, datetime('now'), datetime('now'), 0, 0) + """, + (doctype, total_count), + ) + + conn.commit() + finally: + conn.close() + + def _update_index_progress(self, doctype, last_indexed_name, last_indexed_modified, indexed_count): + """Update progress for a specific doctype.""" + conn = self._get_connection() + try: + cursor = conn.cursor() + cursor.execute( + """ + UPDATE search_index_progress + SET last_indexed_name = ?, + last_indexed_modified = ?, + indexed_docs = indexed_docs + ?, + updated_at = datetime('now') + WHERE doctype = ? + """, + (last_indexed_name, last_indexed_modified, indexed_count, doctype), + ) + conn.commit() + finally: + conn.close() + + def _mark_doctype_complete(self, doctype): + """Mark a doctype as completely indexed.""" + conn = self._get_connection() + try: + cursor = conn.cursor() + cursor.execute( + """ + UPDATE search_index_progress + SET is_complete = 1, updated_at = datetime('now') + WHERE doctype = ? + """, + (doctype,), + ) + conn.commit() + finally: + conn.close() + + def _mark_vocabulary_built(self): + """Mark vocabulary as built.""" + conn = self._get_connection() + try: + cursor = conn.cursor() + cursor.execute(""" + UPDATE search_index_progress + SET vocabulary_built = 1, updated_at = datetime('now') + """) + conn.commit() + finally: + conn.close() + + def _is_indexing_complete(self): + """Check if all doctypes are completely indexed and vocabulary is built.""" + try: + result = self.sql( + """ + SELECT COUNT(*) as incomplete_count + FROM search_index_progress + WHERE is_complete = 0 OR vocabulary_built = 0 + """, + read_only=True, + ) + + return result[0]["incomplete_count"] == 0 + except sqlite3.Error: + return False + + def _queue_continuation_job(self): + """Queue a continuation job to resume indexing.""" + search_class_path = f"{self.__class__.__module__}.{self.__class__.__name__}" + frappe.enqueue( + "frappe.search.sqlite_search.build_index", + queue="long", + job_id=f"{search_class_path}_continuation", + deduplicate=True, + search_class_path=search_class_path, + force=True, + is_continuation=True, + ) + + def _tables_exist(self): + """Check if the required tables exist in the current database.""" + try: + result = self.sql( + "SELECT name FROM sqlite_master WHERE type='table' AND name='search_fts'", read_only=True + ) + return bool(result) + except sqlite3.Error: + return False + # Private Implementation Methods def _execute_search_query(self, fts_query, title_only, filters): @@ -925,11 +1320,33 @@ class SQLiteSearch(ABC): ) """) + # Create the index progress tracking table + cursor.execute(""" + CREATE TABLE IF NOT EXISTS search_index_progress ( + id INTEGER PRIMARY KEY, + doctype TEXT, + last_indexed_name TEXT, + last_indexed_modified TEXT, + total_docs INTEGER DEFAULT 0, + indexed_docs INTEGER DEFAULT 0, + batch_size INTEGER DEFAULT 1000, + is_complete BOOLEAN DEFAULT 0, + started_at DATETIME, + updated_at DATETIME, + vocabulary_built BOOLEAN DEFAULT 0 + ) + """) + # Index for fast trigram lookups cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_trigram_lookup ON search_trigrams(trigram) """) + # Index for progress tracking lookups + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_progress_doctype ON search_index_progress(doctype) + """) + conn.commit() finally: conn.close() @@ -1377,7 +1794,10 @@ def build_index_if_not_exists(): def build_index( - SearchClass: type[SQLiteSearch] | None = None, search_class_path: str | None = None, force: bool = False + SearchClass: type[SQLiteSearch] | None = None, + search_class_path: str | None = None, + force: bool = False, + is_continuation: bool = False, ): """Build search index for SearchClass""" if not SearchClass and not search_class_path: @@ -1389,9 +1809,15 @@ def build_index( search = SearchClass() if not search.is_search_enabled(): return - if not search.index_exists() or force: - print(f"{SearchClass.__name__}: Index does not exist, building...") - search.build_index() + + # For continuation jobs, always proceed regardless of existing index + # For fresh builds, only build if index doesn't exist or force=True + if is_continuation or not search.index_exists() or force: + if is_continuation: + print(f"{SearchClass.__name__}: Continuing incremental index build...") + else: + print(f"{SearchClass.__name__}: Index does not exist or force=True, building...") + search.build_index(is_continuation=is_continuation) def build_index_in_background(): @@ -1400,18 +1826,50 @@ def build_index_in_background(): for SearchClass in search_classes: search = SearchClass() if not search.is_search_enabled(): - return + continue + search_class_path = f"{SearchClass.__module__}.{SearchClass.__name__}" - print(f"Enqueuing {search_class_path}.build_index") - frappe.enqueue( - "frappe.search.sqlite_search.build_index", - queue="long", - job_id=search_class_path, - deduplicate=True, - # build_index args - search_class_path=search_class_path, - force=True, - ) + + # Check if indexing is already in progress or complete + if search.index_exists(): + try: + # Check if there are any incomplete progress records + progress = search._get_index_progress() + if progress and not search._is_indexing_complete(): + print(f"Enqueuing continuation for {search_class_path}.build_index") + frappe.enqueue( + "frappe.search.sqlite_search.build_index", + queue="long", + job_id=f"{search_class_path}_continuation", + deduplicate=True, + search_class_path=search_class_path, + force=True, + is_continuation=True, + ) + else: + print(f"Index for {search_class_path} is already complete") + except Exception: + # If we can't check progress, assume we need to rebuild + print(f"Enqueuing fresh build for {search_class_path}.build_index") + frappe.enqueue( + "frappe.search.sqlite_search.build_index", + queue="long", + job_id=search_class_path, + deduplicate=True, + search_class_path=search_class_path, + force=True, + ) + else: + # No index exists, start fresh build + print(f"Enqueuing fresh build for {search_class_path}.build_index") + frappe.enqueue( + "frappe.search.sqlite_search.build_index", + queue="long", + job_id=search_class_path, + deduplicate=True, + search_class_path=search_class_path, + force=True, + ) def update_doc_index(doc: Document, method=None): From 6f88468d686c49645c0e154e7be32b65dfd98453 Mon Sep 17 00:00:00 2001 From: Ritvik Sardana Date: Fri, 6 Feb 2026 14:58:17 +0530 Subject: [PATCH 2/3] refactor: resumable indexing without max_runtime --- frappe/hooks.py | 4 +- frappe/search/sqlite_search.py | 431 ++++++++++++----------------- frappe/tests/test_sqlite_search.py | 46 +-- 3 files changed, 205 insertions(+), 276 deletions(-) diff --git a/frappe/hooks.py b/frappe/hooks.py index a977f39529..b37e79b7d4 100644 --- a/frappe/hooks.py +++ b/frappe/hooks.py @@ -216,7 +216,6 @@ scheduler_events = { "frappe.deferred_insert.save_to_db", "frappe.automation.doctype.reminder.reminder.send_reminders", "frappe.model.utils.link_count.update_link_count", - "frappe.search.sqlite_search.build_index_if_not_exists", "frappe.utils.telemetry.pulse.client.send_queued_events", ], # 10 minutes @@ -227,6 +226,9 @@ scheduler_events = { "30 * * * *": [], # Daily but offset by 45 minutes "45 0 * * *": [], + "0 */3 * * *": [ + "frappe.search.sqlite_search.build_index_if_not_exists", + ], }, "all": [ "frappe.email.queue.flush", diff --git a/frappe/search/sqlite_search.py b/frappe/search/sqlite_search.py index a3ca394fab..2cd4b2078f 100644 --- a/frappe/search/sqlite_search.py +++ b/frappe/search/sqlite_search.py @@ -291,14 +291,15 @@ class SQLiteSearch(ABC): }, } - def build_index(self, batch_size=1000, max_runtime_minutes=1, is_continuation=False): - """Build the search index incrementally with progress tracking and time-based exit.""" + def build_index(self, batch_size=1000, is_continuation=False): + """Build the search index incrementally with progress tracking. + + The job runs until completion or until killed by the queue timeout. + Progress is tracked in the database so builds can resume from where they left off. + """ if not self.is_search_enabled(): return - start_time = time.time() - max_runtime = max_runtime_minutes * 60 - # Use temporary database path for atomic replacement (only for new index builds) temp_db_path = None original_db_path = self.db_path @@ -326,7 +327,6 @@ class SQLiteSearch(ABC): print(f"Continuation: Using regular database {self.db_path}") # If no temp db exists, we're continuing with regular database (already set) - # Debug: Show which database we're working with if temp_db_path: print(f"Working with temporary database: {self.db_path}") else: @@ -340,13 +340,7 @@ class SQLiteSearch(ABC): # Clear existing index data for fresh build if temp_db_path and not is_continuation: - conn = self._get_connection() - try: - cursor = conn.cursor() - cursor.execute("DELETE FROM search_fts") - conn.commit() - finally: - conn.close() + self._with_connection(lambda cursor: cursor.execute("DELETE FROM search_fts")) # Initialize progress tracking (only for completely fresh builds) if not is_continuation: @@ -365,14 +359,6 @@ class SQLiteSearch(ABC): processed_doctypes = 0 for doctype in self.doc_configs.keys(): - # Check time limit - if time.time() - start_time > max_runtime: - self._update_progress( - "Time limit reached, queuing continuation job", 90, 100, absolute=True - ) - self._queue_continuation_job() - return - doctype_progress = progress.get(doctype, {}) # Skip if doctype is already complete @@ -392,17 +378,6 @@ class SQLiteSearch(ABC): batch_count = 0 while True: - # Check time limit before each batch - if time.time() - start_time > max_runtime: - self._update_progress( - "Time limit reached during doctype processing, queuing continuation", - 90, - 100, - absolute=True, - ) - self._queue_continuation_job() - return - # Get batch of documents docs = self.get_documents_paginated( doctype, limit=batch_size, last_indexed_modified=last_indexed_modified @@ -414,8 +389,6 @@ class SQLiteSearch(ABC): break # Prepare and index documents - print(f"preparing {len(docs)} documents {doctype}") - print(time.time() - start_time) documents = [] for doc in docs: document = self.prepare_document(doc) @@ -435,14 +408,21 @@ class SQLiteSearch(ABC): batch_count += 1 - # Show progress within doctype + # Show progress based on current doctype's document counts if batch_count % 5 == 0: # Update every 5 batches - current_progress = 20 + (processed_doctypes * 60 // total_doctypes) - self._update_progress( - f"Indexing {doctype} (batch {batch_count})", current_progress, 100, absolute=True - ) + indexed_docs, total_docs = self._get_doctype_progress(doctype) + if total_docs > 0: + progress_percent = ( + indexed_docs * 100 + ) // total_docs # 0-100% for current doctype + self._update_progress( + f"Indexing {doctype} - {indexed_docs:,}/{total_docs:,} documents ({progress_percent}%)", + progress_percent, + 100, + absolute=True, + ) - processed_doctypes += 1 + processed_doctypes += 1 # Check if all doctypes are indexed before building vocabulary if not self._is_vocabulary_built_needed(): @@ -463,31 +443,44 @@ class SQLiteSearch(ABC): # Print warning summary self._print_warning_summary() - except Exception: - # Clean up temp file on error - if temp_db_path and os.path.exists(temp_db_path): - os.unlink(temp_db_path) + except Exception as e: + # Log the error + frappe.log_error( + title="Search Index Build Error", + message=f"Error during search index build: {e}", + ) raise finally: # Restore original database path if temp_db_path: self.db_path = original_db_path - def _is_vocabulary_built_needed(self): - """Check if vocabulary still needs to be built.""" + def _get_incomplete_count(self, where_clause): + """Get count of incomplete records from search_index_progress table. + + Args: + where_clause: SQL WHERE clause condition (without 'WHERE' keyword) + + Returns: + int: Count of matching records, or -1 on error + """ try: result = self.sql( - """ + f""" SELECT COUNT(*) as incomplete_count FROM search_index_progress - WHERE is_complete = 0 + WHERE {where_clause} """, read_only=True, ) - - return result[0]["incomplete_count"] > 0 + return result[0]["incomplete_count"] except sqlite3.Error: - return True + return -1 + + def _is_vocabulary_built_needed(self): + """Check if vocabulary still needs to be built.""" + count = self._get_incomplete_count("is_complete = 0") + return count > 0 if count >= 0 else True def _build_vocabulary_incremental(self): """Build vocabulary incrementally from indexed documents.""" @@ -541,14 +534,11 @@ class SQLiteSearch(ABC): # Build vocabulary tables as before if word_freq: # Clear existing data - conn = self._get_connection() - try: - cursor = conn.cursor() + def clear_vocabulary(cursor): cursor.execute("DELETE FROM search_vocabulary") cursor.execute("DELETE FROM search_trigrams") - conn.commit() - finally: - conn.close() + + self._with_connection(clear_vocabulary) # Prepare batch data vocab_data = [] @@ -566,31 +556,32 @@ class SQLiteSearch(ABC): trigram_data.append(trigram_key) # Batch insert - conn = self._get_connection() - try: - cursor = conn.cursor() + def insert_vocabulary(cursor): cursor.executemany( "INSERT INTO search_vocabulary (word, frequency, length) VALUES (?, ?, ?)", vocab_data ) cursor.executemany("INSERT INTO search_trigrams (trigram, word) VALUES (?, ?)", trigram_data) - conn.commit() - finally: - conn.close() + + self._with_connection(insert_vocabulary) # Status and Validation Methods + def _table_exists(self, table_name): + """Check if a table exists in the database.""" + try: + result = self.sql( + f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}'", read_only=True + ) + return bool(result) + except sqlite3.Error: + return False + def index_exists(self): """Check if FTS index exists.""" if not os.path.exists(self.db_path): return False - try: - result = self.sql( - "SELECT name FROM sqlite_master WHERE type='table' AND name='search_fts'", read_only=True - ) - return bool(result) - except sqlite3.Error: - return False + return self._table_exists("search_fts") def drop_index(self): """Drop the search index by removing the database file.""" @@ -633,7 +624,6 @@ class SQLiteSearch(ABC): return [] filters = config.get("filters", {}).copy() - filters = config.get("filters", {}).copy() # Ensure 'modified' field is always included for progress tracking fields = config["fields"].copy() @@ -646,7 +636,7 @@ class SQLiteSearch(ABC): doctype, fields=fields, filters=filters, - order_by="modified ASC, name ASC", # Secondary sort by name for consistency + order_by="creation ASC, name ASC", # Secondary sort by name for consistency limit=limit, ) @@ -686,10 +676,8 @@ class SQLiteSearch(ABC): def _initialize_index_progress(self): """Initialize progress tracking for all doctypes.""" - conn = self._get_connection() - try: - cursor = conn.cursor() + def init_progress(cursor): # Clear existing progress cursor.execute("DELETE FROM search_index_progress") @@ -710,15 +698,12 @@ class SQLiteSearch(ABC): (doctype, total_count), ) - conn.commit() - finally: - conn.close() + self._with_connection(init_progress) def _update_index_progress(self, doctype, last_indexed_name, last_indexed_modified, indexed_count): """Update progress for a specific doctype.""" - conn = self._get_connection() - try: - cursor = conn.cursor() + + def update_progress(cursor): cursor.execute( """ UPDATE search_index_progress @@ -730,15 +715,13 @@ class SQLiteSearch(ABC): """, (last_indexed_name, last_indexed_modified, indexed_count, doctype), ) - conn.commit() - finally: - conn.close() + + self._with_connection(update_progress) def _mark_doctype_complete(self, doctype): """Mark a doctype as completely indexed.""" - conn = self._get_connection() - try: - cursor = conn.cursor() + + def mark_complete(cursor): cursor.execute( """ UPDATE search_index_progress @@ -747,61 +730,49 @@ class SQLiteSearch(ABC): """, (doctype,), ) - conn.commit() - finally: - conn.close() + + self._with_connection(mark_complete) def _mark_vocabulary_built(self): """Mark vocabulary as built.""" - conn = self._get_connection() - try: - cursor = conn.cursor() + + def mark_built(cursor): cursor.execute(""" UPDATE search_index_progress SET vocabulary_built = 1, updated_at = datetime('now') """) - conn.commit() - finally: - conn.close() + + self._with_connection(mark_built) def _is_indexing_complete(self): """Check if all doctypes are completely indexed and vocabulary is built.""" + count = self._get_incomplete_count("is_complete = 0 OR vocabulary_built = 0") + return count == 0 if count >= 0 else False + + def _get_doctype_progress(self, doctype): + """Get indexing progress for a specific doctype.""" try: result = self.sql( """ - SELECT COUNT(*) as incomplete_count + SELECT total_docs, indexed_docs FROM search_index_progress - WHERE is_complete = 0 OR vocabulary_built = 0 + WHERE doctype = ? """, + (doctype,), read_only=True, ) - return result[0]["incomplete_count"] == 0 + if result and result[0]: + total_docs = result[0]["total_docs"] or 0 + indexed_docs = result[0]["indexed_docs"] or 0 + return indexed_docs, total_docs + return 0, 0 except sqlite3.Error: - return False - - def _queue_continuation_job(self): - """Queue a continuation job to resume indexing.""" - search_class_path = f"{self.__class__.__module__}.{self.__class__.__name__}" - frappe.enqueue( - "frappe.search.sqlite_search.build_index", - queue="long", - job_id=f"{search_class_path}_continuation", - deduplicate=True, - search_class_path=search_class_path, - force=True, - is_continuation=True, - ) + return 0, 0 def _tables_exist(self): """Check if the required tables exist in the current database.""" - try: - result = self.sql( - "SELECT name FROM sqlite_master WHERE type='table' AND name='search_fts'", read_only=True - ) - return bool(result) - except sqlite3.Error: - return False + return self._table_exists("search_fts") # Private Implementation Methods @@ -917,6 +888,7 @@ class SQLiteSearch(ABC): ORDER BY bm25_score LIMIT ? """ + print(sql) return self.sql(sql, params, read_only=True) def _process_search_results(self, raw_results, query): @@ -1179,80 +1151,6 @@ class SQLiteSearch(ABC): similarities.sort(key=lambda x: x[1], reverse=True) return [word for word, score in similarities[:max_suggestions]] - def _build_vocabulary(self, documents): - """Build vocabulary and trigram index from documents for spelling correction.""" - import re - - word_freq = defaultdict(int) - word_regex = re.compile(r"\w+") # Compile regex once for efficiency - - # Extract words from all documents in batches - for i, doc in enumerate(documents): - # Show progress for large document sets - if i % 1000 == 0: - progress = 80 + int((i / len(documents)) * 15) # 80-95% range - self._update_progress( - f"Processing vocabulary ({i}/{len(documents)})", progress, 100, absolute=True - ) - - # Process title and content together for efficiency - combined_text = " ".join( - [(doc.get("title", "") or "").lower(), (doc.get("content", "") or "").lower()] - ) - - # Extract all words at once with compiled regex - words = word_regex.findall(combined_text) - - for word in words: - if len(word) > MIN_WORD_LENGTH - 1 and word.isalpha(): # Filter out short words and non-alpha - word_freq[word] += 1 - - # Clear existing data in a single transaction - conn = self._get_connection() - try: - cursor = conn.cursor() - cursor.execute("DELETE FROM search_vocabulary") - cursor.execute("DELETE FROM search_trigrams") - conn.commit() - finally: - conn.close() - - if not word_freq: - return - - # Prepare batch data for vocabulary - vocab_data = [] - trigram_data = [] - trigram_set = set() # Use set to avoid duplicate trigrams - - for word, freq in word_freq.items(): - vocab_data.append((word, freq, len(word))) - - # Generate trigrams for this word - trigrams = self._generate_trigrams(word) - for trigram in trigrams: - trigram_key = (trigram, word) - if trigram_key not in trigram_set: - trigram_set.add(trigram_key) - trigram_data.append(trigram_key) - - # Use batch inserts with a single transaction - conn = self._get_connection() - try: - cursor = conn.cursor() - - # Batch insert vocabulary - cursor.executemany( - "INSERT INTO search_vocabulary (word, frequency, length) VALUES (?, ?, ?)", vocab_data - ) - - # Batch insert trigrams (duplicates already removed) - cursor.executemany("INSERT INTO search_trigrams (trigram, word) VALUES (?, ?)", trigram_data) - - conn.commit() - finally: - conn.close() - # Database and Infrastructure Methods def _get_connection(self, read_only=False): @@ -1281,6 +1179,26 @@ class SQLiteSearch(ABC): if is_read: cursor.execute("PRAGMA query_only = 1;") # Read-only optimization + def _with_connection(self, callback, read_only=False): + """Execute a callback with a managed database connection. + + Args: + callback: Function that takes (cursor) and performs database operations + read_only: Whether the connection is read-only + + Returns: + The return value of the callback, if any + """ + conn = self._get_connection(read_only=read_only) + try: + cursor = conn.cursor() + result = callback(cursor) + if not read_only: + conn.commit() + return result + finally: + conn.close() + def _ensure_fts_table(self): """Create FTS table and related tables if they don't exist.""" # Get schema from subclass @@ -1288,11 +1206,7 @@ class SQLiteSearch(ABC): metadata_fields = self.schema["metadata_fields"] tokenizer = self.schema["tokenizer"] - # Use a single transaction for all table creation operations - conn = self._get_connection() - try: - cursor = conn.cursor() - + def create_tables(cursor): # Create the FTS table with dynamic columns cursor.execute(f""" CREATE VIRTUAL TABLE IF NOT EXISTS search_fts USING fts5( @@ -1347,9 +1261,7 @@ class SQLiteSearch(ABC): CREATE INDEX IF NOT EXISTS idx_progress_doctype ON search_index_progress(doctype) """) - conn.commit() - finally: - conn.close() + self._with_connection(create_tables) def _index_documents(self, documents): """Bulk index documents into SQLite FTS.""" @@ -1372,10 +1284,8 @@ class SQLiteSearch(ABC): # Process documents in chunks to prevent memory issues with large datasets chunk_size = 1000 - conn = self._get_connection() - try: - cursor = conn.cursor() + def index_chunks(cursor): for i in range(0, len(documents), chunk_size): chunk = documents[i : i + chunk_size] doc_ids_to_delete = [] @@ -1411,6 +1321,7 @@ class SQLiteSearch(ABC): values.append(doc.get(field, "")) doc_ids_to_delete.append(doc_id) + values_to_insert.append(tuple(values)) # Delete existing rows for these doc_ids first using a single statement @@ -1423,9 +1334,7 @@ class SQLiteSearch(ABC): if values_to_insert: cursor.executemany(insert_sql, values_to_insert) - conn.commit() - finally: - conn.close() + self._with_connection(index_chunks) def index_doc(self, doctype, docname): """Index a single document.""" @@ -1786,11 +1695,27 @@ class SQLiteSearch(ABC): def build_index_if_not_exists(): - """Build index if it doesn't exist.""" + """Build index if it doesn't exist or continue if temp DB exists. + + Called by scheduler every 3 hours to continue incomplete builds. + """ search_classes = get_search_classes() for SearchClass in search_classes: - build_index(SearchClass, force=False) + search = SearchClass() + if not search.is_search_enabled(): + continue + + # Check if a temp DB exists (incomplete build from previous run) + temp_db_path = search._get_db_path(is_temp=True) + if os.path.exists(temp_db_path): + # Continue the incomplete build + print(f"{SearchClass.__name__}: Found temp DB, continuing build...") + build_index(SearchClass, force=True, is_continuation=True) + elif not search.index_exists(): + # No index exists, start fresh build + print(f"{SearchClass.__name__}: No index exists, starting fresh build...") + build_index(SearchClass, force=False) def build_index( @@ -1810,9 +1735,11 @@ def build_index( if not search.is_search_enabled(): return + if search.index_exists() and not force: + return + # For continuation jobs, always proceed regardless of existing index - # For fresh builds, only build if index doesn't exist or force=True - if is_continuation or not search.index_exists() or force: + if is_continuation or force: if is_continuation: print(f"{SearchClass.__name__}: Continuing incremental index build...") else: @@ -1820,8 +1747,38 @@ def build_index( search.build_index(is_continuation=is_continuation) +def _enqueue_index_job(search_class_path: str, is_continuation: bool = False): + """Enqueue a search index build job. + + Args: + search_class_path: Full path to the search class (e.g., 'module.ClassName') + is_continuation: Whether this is a continuation of an incomplete build + """ + job_id = f"{search_class_path}_continuation" if is_continuation else search_class_path + job_type = "continuation" if is_continuation else "fresh build" + print(f"Enqueuing {job_type} for {search_class_path}.build_index") + + # timeout for 2 hour 10 minutes to account for job queue delays + timeout = 2 * 60 * 60 + 10 * 60 + + enqueue_kwargs = { + "queue": "long", + "job_id": job_id, + "deduplicate": True, + "search_class_path": search_class_path, + "force": True, + "is_continuation": is_continuation, + "timeout": timeout, + } + + frappe.enqueue("frappe.search.sqlite_search.build_index", **enqueue_kwargs) + + def build_index_in_background(): - """Enqueue index building in background.""" + """Enqueue index building in background. + + Called after migrate to start/continue index building. + """ search_classes = get_search_classes() for SearchClass in search_classes: search = SearchClass() @@ -1830,46 +1787,16 @@ def build_index_in_background(): search_class_path = f"{SearchClass.__module__}.{SearchClass.__name__}" - # Check if indexing is already in progress or complete - if search.index_exists(): - try: - # Check if there are any incomplete progress records - progress = search._get_index_progress() - if progress and not search._is_indexing_complete(): - print(f"Enqueuing continuation for {search_class_path}.build_index") - frappe.enqueue( - "frappe.search.sqlite_search.build_index", - queue="long", - job_id=f"{search_class_path}_continuation", - deduplicate=True, - search_class_path=search_class_path, - force=True, - is_continuation=True, - ) - else: - print(f"Index for {search_class_path} is already complete") - except Exception: - # If we can't check progress, assume we need to rebuild - print(f"Enqueuing fresh build for {search_class_path}.build_index") - frappe.enqueue( - "frappe.search.sqlite_search.build_index", - queue="long", - job_id=search_class_path, - deduplicate=True, - search_class_path=search_class_path, - force=True, - ) - else: + # Check if a temp DB exists (incomplete build from previous run) + temp_db_path = search._get_db_path(is_temp=True) + if os.path.exists(temp_db_path): + # Continue the incomplete build + _enqueue_index_job(search_class_path, is_continuation=True) + elif not search.index_exists(): # No index exists, start fresh build - print(f"Enqueuing fresh build for {search_class_path}.build_index") - frappe.enqueue( - "frappe.search.sqlite_search.build_index", - queue="long", - job_id=search_class_path, - deduplicate=True, - search_class_path=search_class_path, - force=True, - ) + _enqueue_index_job(search_class_path, is_continuation=False) + else: + print(f"Index for {search_class_path} already exists") def update_doc_index(doc: Document, method=None): diff --git a/frappe/tests/test_sqlite_search.py b/frappe/tests/test_sqlite_search.py index 8f6bfcee99..691c9161dd 100644 --- a/frappe/tests/test_sqlite_search.py +++ b/frappe/tests/test_sqlite_search.py @@ -483,6 +483,29 @@ class TestSQLiteSearchAPI(IntegrationTestCase): disabled_search.build_index() # Should not raise error but do nothing self.assertFalse(disabled_search.index_exists()) + @patch("frappe.enqueue") + def test_background_operations(self, mock_enqueue): + """Test background job integration and module-level functions.""" + from frappe.search.sqlite_search import build_index_in_background, get_search_classes + + # Test getting search classes + with patch("frappe.get_hooks") as mock_get_hooks: + mock_get_hooks.return_value = ["frappe.tests.test_sqlite_search.TestSQLiteSearch"] + classes = get_search_classes() + self.assertEqual(len(classes), 1) + self.assertEqual(classes[0], TestSQLiteSearch) + + # Ensure index doesn't exist so build_index_in_background will enqueue a job + self.search.drop_index() + + # Test background index building + with patch("frappe.get_hooks") as mock_get_hooks: + mock_get_hooks.return_value = ["frappe.tests.test_sqlite_search.TestSQLiteSearch"] + build_index_in_background() + + # Should have enqueued a background job since index doesn't exist + self.assertTrue(mock_enqueue.called) + def test_deduplication_on_reindex(self): """Test that re-indexing the same document does not create duplicates.""" self.search.build_index() @@ -546,26 +569,3 @@ class TestSQLiteSearchAPI(IntegrationTestCase): finally: test_note.delete() - - @patch("frappe.enqueue") - def test_background_operations(self, mock_enqueue): - """Test background job integration and module-level functions.""" - from frappe.search.sqlite_search import ( - build_index_in_background, - get_search_classes, - ) - - # Test getting search classes - with patch("frappe.get_hooks") as mock_get_hooks: - mock_get_hooks.return_value = ["frappe.tests.test_sqlite_search.TestSQLiteSearch"] - classes = get_search_classes() - self.assertEqual(len(classes), 1) - self.assertEqual(classes[0], TestSQLiteSearch) - - # Test background index building - with patch("frappe.get_hooks") as mock_get_hooks: - mock_get_hooks.return_value = ["frappe.tests.test_sqlite_search.TestSQLiteSearch"] - build_index_in_background() - - # Should have enqueued a background job - self.assertTrue(mock_enqueue.called) From aef35985a3f1527a8e84a7b284bafb292b4f5f0e Mon Sep 17 00:00:00 2001 From: Ritvik Sardana Date: Sat, 7 Feb 2026 19:21:03 +0530 Subject: [PATCH 3/3] fix: doctype indexing progress --- frappe/search/sqlite_search.py | 31 +++++++++++++------------------ 1 file changed, 13 insertions(+), 18 deletions(-) diff --git a/frappe/search/sqlite_search.py b/frappe/search/sqlite_search.py index 2cd4b2078f..061145462c 100644 --- a/frappe/search/sqlite_search.py +++ b/frappe/search/sqlite_search.py @@ -408,19 +408,16 @@ class SQLiteSearch(ABC): batch_count += 1 - # Show progress based on current doctype's document counts - if batch_count % 5 == 0: # Update every 5 batches - indexed_docs, total_docs = self._get_doctype_progress(doctype) - if total_docs > 0: - progress_percent = ( - indexed_docs * 100 - ) // total_docs # 0-100% for current doctype - self._update_progress( - f"Indexing {doctype} - {indexed_docs:,}/{total_docs:,} documents ({progress_percent}%)", - progress_percent, - 100, - absolute=True, - ) + # Show progress based on total document counts across all doctypes + indexed_docs, total_docs = self._get_indexing_progress() + if total_docs > 0: + progress_percent = 20 + (indexed_docs * 60) // total_docs + self._update_progress( + f"Indexing {doctype} {indexed_docs}/{total_docs}", + progress_percent, + 100, + absolute=True, + ) processed_doctypes += 1 @@ -749,16 +746,14 @@ class SQLiteSearch(ABC): count = self._get_incomplete_count("is_complete = 0 OR vocabulary_built = 0") return count == 0 if count >= 0 else False - def _get_doctype_progress(self, doctype): - """Get indexing progress for a specific doctype.""" + def _get_indexing_progress(self): + """Get overall indexing progress across all doctypes.""" try: result = self.sql( """ - SELECT total_docs, indexed_docs + SELECT SUM(total_docs) as total_docs, SUM(indexed_docs) as indexed_docs FROM search_index_progress - WHERE doctype = ? """, - (doctype,), read_only=True, )