refactor: resumable indexing without max_runtime

This commit is contained in:
Ritvik Sardana 2026-02-06 14:58:17 +05:30
parent 2a7e6a7793
commit 6f88468d68
3 changed files with 205 additions and 276 deletions

View file

@ -216,7 +216,6 @@ scheduler_events = {
"frappe.deferred_insert.save_to_db",
"frappe.automation.doctype.reminder.reminder.send_reminders",
"frappe.model.utils.link_count.update_link_count",
"frappe.search.sqlite_search.build_index_if_not_exists",
"frappe.utils.telemetry.pulse.client.send_queued_events",
],
# 10 minutes
@ -227,6 +226,9 @@ scheduler_events = {
"30 * * * *": [],
# Daily but offset by 45 minutes
"45 0 * * *": [],
"0 */3 * * *": [
"frappe.search.sqlite_search.build_index_if_not_exists",
],
},
"all": [
"frappe.email.queue.flush",

View file

@ -291,14 +291,15 @@ class SQLiteSearch(ABC):
},
}
def build_index(self, batch_size=1000, max_runtime_minutes=1, is_continuation=False):
"""Build the search index incrementally with progress tracking and time-based exit."""
def build_index(self, batch_size=1000, is_continuation=False):
"""Build the search index incrementally with progress tracking.
The job runs until completion or until killed by the queue timeout.
Progress is tracked in the database so builds can resume from where they left off.
"""
if not self.is_search_enabled():
return
start_time = time.time()
max_runtime = max_runtime_minutes * 60
# Use temporary database path for atomic replacement (only for new index builds)
temp_db_path = None
original_db_path = self.db_path
@ -326,7 +327,6 @@ class SQLiteSearch(ABC):
print(f"Continuation: Using regular database {self.db_path}")
# If no temp db exists, we're continuing with regular database (already set)
# Debug: Show which database we're working with
if temp_db_path:
print(f"Working with temporary database: {self.db_path}")
else:
@ -340,13 +340,7 @@ class SQLiteSearch(ABC):
# Clear existing index data for fresh build
if temp_db_path and not is_continuation:
conn = self._get_connection()
try:
cursor = conn.cursor()
cursor.execute("DELETE FROM search_fts")
conn.commit()
finally:
conn.close()
self._with_connection(lambda cursor: cursor.execute("DELETE FROM search_fts"))
# Initialize progress tracking (only for completely fresh builds)
if not is_continuation:
@ -365,14 +359,6 @@ class SQLiteSearch(ABC):
processed_doctypes = 0
for doctype in self.doc_configs.keys():
# Check time limit
if time.time() - start_time > max_runtime:
self._update_progress(
"Time limit reached, queuing continuation job", 90, 100, absolute=True
)
self._queue_continuation_job()
return
doctype_progress = progress.get(doctype, {})
# Skip if doctype is already complete
@ -392,17 +378,6 @@ class SQLiteSearch(ABC):
batch_count = 0
while True:
# Check time limit before each batch
if time.time() - start_time > max_runtime:
self._update_progress(
"Time limit reached during doctype processing, queuing continuation",
90,
100,
absolute=True,
)
self._queue_continuation_job()
return
# Get batch of documents
docs = self.get_documents_paginated(
doctype, limit=batch_size, last_indexed_modified=last_indexed_modified
@ -414,8 +389,6 @@ class SQLiteSearch(ABC):
break
# Prepare and index documents
print(f"preparing {len(docs)} documents {doctype}")
print(time.time() - start_time)
documents = []
for doc in docs:
document = self.prepare_document(doc)
@ -435,14 +408,21 @@ class SQLiteSearch(ABC):
batch_count += 1
# Show progress within doctype
# Show progress based on current doctype's document counts
if batch_count % 5 == 0: # Update every 5 batches
current_progress = 20 + (processed_doctypes * 60 // total_doctypes)
self._update_progress(
f"Indexing {doctype} (batch {batch_count})", current_progress, 100, absolute=True
)
indexed_docs, total_docs = self._get_doctype_progress(doctype)
if total_docs > 0:
progress_percent = (
indexed_docs * 100
) // total_docs # 0-100% for current doctype
self._update_progress(
f"Indexing {doctype} - {indexed_docs:,}/{total_docs:,} documents ({progress_percent}%)",
progress_percent,
100,
absolute=True,
)
processed_doctypes += 1
processed_doctypes += 1
# Check if all doctypes are indexed before building vocabulary
if not self._is_vocabulary_built_needed():
@ -463,31 +443,44 @@ class SQLiteSearch(ABC):
# Print warning summary
self._print_warning_summary()
except Exception:
# Clean up temp file on error
if temp_db_path and os.path.exists(temp_db_path):
os.unlink(temp_db_path)
except Exception as e:
# Log the error
frappe.log_error(
title="Search Index Build Error",
message=f"Error during search index build: {e}",
)
raise
finally:
# Restore original database path
if temp_db_path:
self.db_path = original_db_path
def _is_vocabulary_built_needed(self):
"""Check if vocabulary still needs to be built."""
def _get_incomplete_count(self, where_clause):
"""Get count of incomplete records from search_index_progress table.
Args:
where_clause: SQL WHERE clause condition (without 'WHERE' keyword)
Returns:
int: Count of matching records, or -1 on error
"""
try:
result = self.sql(
"""
f"""
SELECT COUNT(*) as incomplete_count
FROM search_index_progress
WHERE is_complete = 0
WHERE {where_clause}
""",
read_only=True,
)
return result[0]["incomplete_count"] > 0
return result[0]["incomplete_count"]
except sqlite3.Error:
return True
return -1
def _is_vocabulary_built_needed(self):
"""Check if vocabulary still needs to be built."""
count = self._get_incomplete_count("is_complete = 0")
return count > 0 if count >= 0 else True
def _build_vocabulary_incremental(self):
"""Build vocabulary incrementally from indexed documents."""
@ -541,14 +534,11 @@ class SQLiteSearch(ABC):
# Build vocabulary tables as before
if word_freq:
# Clear existing data
conn = self._get_connection()
try:
cursor = conn.cursor()
def clear_vocabulary(cursor):
cursor.execute("DELETE FROM search_vocabulary")
cursor.execute("DELETE FROM search_trigrams")
conn.commit()
finally:
conn.close()
self._with_connection(clear_vocabulary)
# Prepare batch data
vocab_data = []
@ -566,31 +556,32 @@ class SQLiteSearch(ABC):
trigram_data.append(trigram_key)
# Batch insert
conn = self._get_connection()
try:
cursor = conn.cursor()
def insert_vocabulary(cursor):
cursor.executemany(
"INSERT INTO search_vocabulary (word, frequency, length) VALUES (?, ?, ?)", vocab_data
)
cursor.executemany("INSERT INTO search_trigrams (trigram, word) VALUES (?, ?)", trigram_data)
conn.commit()
finally:
conn.close()
self._with_connection(insert_vocabulary)
# Status and Validation Methods
def _table_exists(self, table_name):
"""Check if a table exists in the database."""
try:
result = self.sql(
f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}'", read_only=True
)
return bool(result)
except sqlite3.Error:
return False
def index_exists(self):
"""Check if FTS index exists."""
if not os.path.exists(self.db_path):
return False
try:
result = self.sql(
"SELECT name FROM sqlite_master WHERE type='table' AND name='search_fts'", read_only=True
)
return bool(result)
except sqlite3.Error:
return False
return self._table_exists("search_fts")
def drop_index(self):
"""Drop the search index by removing the database file."""
@ -633,7 +624,6 @@ class SQLiteSearch(ABC):
return []
filters = config.get("filters", {}).copy()
filters = config.get("filters", {}).copy()
# Ensure 'modified' field is always included for progress tracking
fields = config["fields"].copy()
@ -646,7 +636,7 @@ class SQLiteSearch(ABC):
doctype,
fields=fields,
filters=filters,
order_by="modified ASC, name ASC", # Secondary sort by name for consistency
order_by="creation ASC, name ASC", # Secondary sort by name for consistency
limit=limit,
)
@ -686,10 +676,8 @@ class SQLiteSearch(ABC):
def _initialize_index_progress(self):
"""Initialize progress tracking for all doctypes."""
conn = self._get_connection()
try:
cursor = conn.cursor()
def init_progress(cursor):
# Clear existing progress
cursor.execute("DELETE FROM search_index_progress")
@ -710,15 +698,12 @@ class SQLiteSearch(ABC):
(doctype, total_count),
)
conn.commit()
finally:
conn.close()
self._with_connection(init_progress)
def _update_index_progress(self, doctype, last_indexed_name, last_indexed_modified, indexed_count):
"""Update progress for a specific doctype."""
conn = self._get_connection()
try:
cursor = conn.cursor()
def update_progress(cursor):
cursor.execute(
"""
UPDATE search_index_progress
@ -730,15 +715,13 @@ class SQLiteSearch(ABC):
""",
(last_indexed_name, last_indexed_modified, indexed_count, doctype),
)
conn.commit()
finally:
conn.close()
self._with_connection(update_progress)
def _mark_doctype_complete(self, doctype):
"""Mark a doctype as completely indexed."""
conn = self._get_connection()
try:
cursor = conn.cursor()
def mark_complete(cursor):
cursor.execute(
"""
UPDATE search_index_progress
@ -747,61 +730,49 @@ class SQLiteSearch(ABC):
""",
(doctype,),
)
conn.commit()
finally:
conn.close()
self._with_connection(mark_complete)
def _mark_vocabulary_built(self):
"""Mark vocabulary as built."""
conn = self._get_connection()
try:
cursor = conn.cursor()
def mark_built(cursor):
cursor.execute("""
UPDATE search_index_progress
SET vocabulary_built = 1, updated_at = datetime('now')
""")
conn.commit()
finally:
conn.close()
self._with_connection(mark_built)
def _is_indexing_complete(self):
"""Check if all doctypes are completely indexed and vocabulary is built."""
count = self._get_incomplete_count("is_complete = 0 OR vocabulary_built = 0")
return count == 0 if count >= 0 else False
def _get_doctype_progress(self, doctype):
"""Get indexing progress for a specific doctype."""
try:
result = self.sql(
"""
SELECT COUNT(*) as incomplete_count
SELECT total_docs, indexed_docs
FROM search_index_progress
WHERE is_complete = 0 OR vocabulary_built = 0
WHERE doctype = ?
""",
(doctype,),
read_only=True,
)
return result[0]["incomplete_count"] == 0
if result and result[0]:
total_docs = result[0]["total_docs"] or 0
indexed_docs = result[0]["indexed_docs"] or 0
return indexed_docs, total_docs
return 0, 0
except sqlite3.Error:
return False
def _queue_continuation_job(self):
"""Queue a continuation job to resume indexing."""
search_class_path = f"{self.__class__.__module__}.{self.__class__.__name__}"
frappe.enqueue(
"frappe.search.sqlite_search.build_index",
queue="long",
job_id=f"{search_class_path}_continuation",
deduplicate=True,
search_class_path=search_class_path,
force=True,
is_continuation=True,
)
return 0, 0
def _tables_exist(self):
"""Check if the required tables exist in the current database."""
try:
result = self.sql(
"SELECT name FROM sqlite_master WHERE type='table' AND name='search_fts'", read_only=True
)
return bool(result)
except sqlite3.Error:
return False
return self._table_exists("search_fts")
# Private Implementation Methods
@ -917,6 +888,7 @@ class SQLiteSearch(ABC):
ORDER BY bm25_score
LIMIT ?
"""
print(sql)
return self.sql(sql, params, read_only=True)
def _process_search_results(self, raw_results, query):
@ -1179,80 +1151,6 @@ class SQLiteSearch(ABC):
similarities.sort(key=lambda x: x[1], reverse=True)
return [word for word, score in similarities[:max_suggestions]]
def _build_vocabulary(self, documents):
"""Build vocabulary and trigram index from documents for spelling correction."""
import re
word_freq = defaultdict(int)
word_regex = re.compile(r"\w+") # Compile regex once for efficiency
# Extract words from all documents in batches
for i, doc in enumerate(documents):
# Show progress for large document sets
if i % 1000 == 0:
progress = 80 + int((i / len(documents)) * 15) # 80-95% range
self._update_progress(
f"Processing vocabulary ({i}/{len(documents)})", progress, 100, absolute=True
)
# Process title and content together for efficiency
combined_text = " ".join(
[(doc.get("title", "") or "").lower(), (doc.get("content", "") or "").lower()]
)
# Extract all words at once with compiled regex
words = word_regex.findall(combined_text)
for word in words:
if len(word) > MIN_WORD_LENGTH - 1 and word.isalpha(): # Filter out short words and non-alpha
word_freq[word] += 1
# Clear existing data in a single transaction
conn = self._get_connection()
try:
cursor = conn.cursor()
cursor.execute("DELETE FROM search_vocabulary")
cursor.execute("DELETE FROM search_trigrams")
conn.commit()
finally:
conn.close()
if not word_freq:
return
# Prepare batch data for vocabulary
vocab_data = []
trigram_data = []
trigram_set = set() # Use set to avoid duplicate trigrams
for word, freq in word_freq.items():
vocab_data.append((word, freq, len(word)))
# Generate trigrams for this word
trigrams = self._generate_trigrams(word)
for trigram in trigrams:
trigram_key = (trigram, word)
if trigram_key not in trigram_set:
trigram_set.add(trigram_key)
trigram_data.append(trigram_key)
# Use batch inserts with a single transaction
conn = self._get_connection()
try:
cursor = conn.cursor()
# Batch insert vocabulary
cursor.executemany(
"INSERT INTO search_vocabulary (word, frequency, length) VALUES (?, ?, ?)", vocab_data
)
# Batch insert trigrams (duplicates already removed)
cursor.executemany("INSERT INTO search_trigrams (trigram, word) VALUES (?, ?)", trigram_data)
conn.commit()
finally:
conn.close()
# Database and Infrastructure Methods
def _get_connection(self, read_only=False):
@ -1281,6 +1179,26 @@ class SQLiteSearch(ABC):
if is_read:
cursor.execute("PRAGMA query_only = 1;") # Read-only optimization
def _with_connection(self, callback, read_only=False):
"""Execute a callback with a managed database connection.
Args:
callback: Function that takes (cursor) and performs database operations
read_only: Whether the connection is read-only
Returns:
The return value of the callback, if any
"""
conn = self._get_connection(read_only=read_only)
try:
cursor = conn.cursor()
result = callback(cursor)
if not read_only:
conn.commit()
return result
finally:
conn.close()
def _ensure_fts_table(self):
"""Create FTS table and related tables if they don't exist."""
# Get schema from subclass
@ -1288,11 +1206,7 @@ class SQLiteSearch(ABC):
metadata_fields = self.schema["metadata_fields"]
tokenizer = self.schema["tokenizer"]
# Use a single transaction for all table creation operations
conn = self._get_connection()
try:
cursor = conn.cursor()
def create_tables(cursor):
# Create the FTS table with dynamic columns
cursor.execute(f"""
CREATE VIRTUAL TABLE IF NOT EXISTS search_fts USING fts5(
@ -1347,9 +1261,7 @@ class SQLiteSearch(ABC):
CREATE INDEX IF NOT EXISTS idx_progress_doctype ON search_index_progress(doctype)
""")
conn.commit()
finally:
conn.close()
self._with_connection(create_tables)
def _index_documents(self, documents):
"""Bulk index documents into SQLite FTS."""
@ -1372,10 +1284,8 @@ class SQLiteSearch(ABC):
# Process documents in chunks to prevent memory issues with large datasets
chunk_size = 1000
conn = self._get_connection()
try:
cursor = conn.cursor()
def index_chunks(cursor):
for i in range(0, len(documents), chunk_size):
chunk = documents[i : i + chunk_size]
doc_ids_to_delete = []
@ -1411,6 +1321,7 @@ class SQLiteSearch(ABC):
values.append(doc.get(field, ""))
doc_ids_to_delete.append(doc_id)
values_to_insert.append(tuple(values))
# Delete existing rows for these doc_ids first using a single statement
@ -1423,9 +1334,7 @@ class SQLiteSearch(ABC):
if values_to_insert:
cursor.executemany(insert_sql, values_to_insert)
conn.commit()
finally:
conn.close()
self._with_connection(index_chunks)
def index_doc(self, doctype, docname):
"""Index a single document."""
@ -1786,11 +1695,27 @@ class SQLiteSearch(ABC):
def build_index_if_not_exists():
"""Build index if it doesn't exist."""
"""Build index if it doesn't exist or continue if temp DB exists.
Called by scheduler every 3 hours to continue incomplete builds.
"""
search_classes = get_search_classes()
for SearchClass in search_classes:
build_index(SearchClass, force=False)
search = SearchClass()
if not search.is_search_enabled():
continue
# Check if a temp DB exists (incomplete build from previous run)
temp_db_path = search._get_db_path(is_temp=True)
if os.path.exists(temp_db_path):
# Continue the incomplete build
print(f"{SearchClass.__name__}: Found temp DB, continuing build...")
build_index(SearchClass, force=True, is_continuation=True)
elif not search.index_exists():
# No index exists, start fresh build
print(f"{SearchClass.__name__}: No index exists, starting fresh build...")
build_index(SearchClass, force=False)
def build_index(
@ -1810,9 +1735,11 @@ def build_index(
if not search.is_search_enabled():
return
if search.index_exists() and not force:
return
# For continuation jobs, always proceed regardless of existing index
# For fresh builds, only build if index doesn't exist or force=True
if is_continuation or not search.index_exists() or force:
if is_continuation or force:
if is_continuation:
print(f"{SearchClass.__name__}: Continuing incremental index build...")
else:
@ -1820,8 +1747,38 @@ def build_index(
search.build_index(is_continuation=is_continuation)
def _enqueue_index_job(search_class_path: str, is_continuation: bool = False):
"""Enqueue a search index build job.
Args:
search_class_path: Full path to the search class (e.g., 'module.ClassName')
is_continuation: Whether this is a continuation of an incomplete build
"""
job_id = f"{search_class_path}_continuation" if is_continuation else search_class_path
job_type = "continuation" if is_continuation else "fresh build"
print(f"Enqueuing {job_type} for {search_class_path}.build_index")
# timeout for 2 hour 10 minutes to account for job queue delays
timeout = 2 * 60 * 60 + 10 * 60
enqueue_kwargs = {
"queue": "long",
"job_id": job_id,
"deduplicate": True,
"search_class_path": search_class_path,
"force": True,
"is_continuation": is_continuation,
"timeout": timeout,
}
frappe.enqueue("frappe.search.sqlite_search.build_index", **enqueue_kwargs)
def build_index_in_background():
"""Enqueue index building in background."""
"""Enqueue index building in background.
Called after migrate to start/continue index building.
"""
search_classes = get_search_classes()
for SearchClass in search_classes:
search = SearchClass()
@ -1830,46 +1787,16 @@ def build_index_in_background():
search_class_path = f"{SearchClass.__module__}.{SearchClass.__name__}"
# Check if indexing is already in progress or complete
if search.index_exists():
try:
# Check if there are any incomplete progress records
progress = search._get_index_progress()
if progress and not search._is_indexing_complete():
print(f"Enqueuing continuation for {search_class_path}.build_index")
frappe.enqueue(
"frappe.search.sqlite_search.build_index",
queue="long",
job_id=f"{search_class_path}_continuation",
deduplicate=True,
search_class_path=search_class_path,
force=True,
is_continuation=True,
)
else:
print(f"Index for {search_class_path} is already complete")
except Exception:
# If we can't check progress, assume we need to rebuild
print(f"Enqueuing fresh build for {search_class_path}.build_index")
frappe.enqueue(
"frappe.search.sqlite_search.build_index",
queue="long",
job_id=search_class_path,
deduplicate=True,
search_class_path=search_class_path,
force=True,
)
else:
# Check if a temp DB exists (incomplete build from previous run)
temp_db_path = search._get_db_path(is_temp=True)
if os.path.exists(temp_db_path):
# Continue the incomplete build
_enqueue_index_job(search_class_path, is_continuation=True)
elif not search.index_exists():
# No index exists, start fresh build
print(f"Enqueuing fresh build for {search_class_path}.build_index")
frappe.enqueue(
"frappe.search.sqlite_search.build_index",
queue="long",
job_id=search_class_path,
deduplicate=True,
search_class_path=search_class_path,
force=True,
)
_enqueue_index_job(search_class_path, is_continuation=False)
else:
print(f"Index for {search_class_path} already exists")
def update_doc_index(doc: Document, method=None):

View file

@ -483,6 +483,29 @@ class TestSQLiteSearchAPI(IntegrationTestCase):
disabled_search.build_index() # Should not raise error but do nothing
self.assertFalse(disabled_search.index_exists())
@patch("frappe.enqueue")
def test_background_operations(self, mock_enqueue):
"""Test background job integration and module-level functions."""
from frappe.search.sqlite_search import build_index_in_background, get_search_classes
# Test getting search classes
with patch("frappe.get_hooks") as mock_get_hooks:
mock_get_hooks.return_value = ["frappe.tests.test_sqlite_search.TestSQLiteSearch"]
classes = get_search_classes()
self.assertEqual(len(classes), 1)
self.assertEqual(classes[0], TestSQLiteSearch)
# Ensure index doesn't exist so build_index_in_background will enqueue a job
self.search.drop_index()
# Test background index building
with patch("frappe.get_hooks") as mock_get_hooks:
mock_get_hooks.return_value = ["frappe.tests.test_sqlite_search.TestSQLiteSearch"]
build_index_in_background()
# Should have enqueued a background job since index doesn't exist
self.assertTrue(mock_enqueue.called)
def test_deduplication_on_reindex(self):
"""Test that re-indexing the same document does not create duplicates."""
self.search.build_index()
@ -546,26 +569,3 @@ class TestSQLiteSearchAPI(IntegrationTestCase):
finally:
test_note.delete()
@patch("frappe.enqueue")
def test_background_operations(self, mock_enqueue):
"""Test background job integration and module-level functions."""
from frappe.search.sqlite_search import (
build_index_in_background,
get_search_classes,
)
# Test getting search classes
with patch("frappe.get_hooks") as mock_get_hooks:
mock_get_hooks.return_value = ["frappe.tests.test_sqlite_search.TestSQLiteSearch"]
classes = get_search_classes()
self.assertEqual(len(classes), 1)
self.assertEqual(classes[0], TestSQLiteSearch)
# Test background index building
with patch("frappe.get_hooks") as mock_get_hooks:
mock_get_hooks.return_value = ["frappe.tests.test_sqlite_search.TestSQLiteSearch"]
build_index_in_background()
# Should have enqueued a background job
self.assertTrue(mock_enqueue.called)