Merge pull request #34522 from netchampfaris/sqlite-search-long-tasks
fix: resumable sqlite search indexing task for larger data
This commit is contained in:
commit
78396c4334
3 changed files with 558 additions and 176 deletions
|
|
@ -216,7 +216,6 @@ scheduler_events = {
|
|||
"frappe.deferred_insert.save_to_db",
|
||||
"frappe.automation.doctype.reminder.reminder.send_reminders",
|
||||
"frappe.model.utils.link_count.update_link_count",
|
||||
"frappe.search.sqlite_search.build_index_if_not_exists",
|
||||
"frappe.utils.telemetry.pulse.client.send_queued_events",
|
||||
],
|
||||
# 10 minutes
|
||||
|
|
@ -227,6 +226,9 @@ scheduler_events = {
|
|||
"30 * * * *": [],
|
||||
# Daily but offset by 45 minutes
|
||||
"45 0 * * *": [],
|
||||
"0 */3 * * *": [
|
||||
"frappe.search.sqlite_search.build_index_if_not_exists",
|
||||
],
|
||||
},
|
||||
"all": [
|
||||
"frappe.email.queue.flush",
|
||||
|
|
|
|||
|
|
@ -291,88 +291,294 @@ class SQLiteSearch(ABC):
|
|||
},
|
||||
}
|
||||
|
||||
def build_index(self):
|
||||
"""Build the complete search index from scratch using atomic replacement."""
|
||||
def build_index(self, batch_size=1000, is_continuation=False):
|
||||
"""Build the search index incrementally with progress tracking.
|
||||
|
||||
The job runs until completion or until killed by the queue timeout.
|
||||
Progress is tracked in the database so builds can resume from where they left off.
|
||||
"""
|
||||
if not self.is_search_enabled():
|
||||
return
|
||||
|
||||
# Use temporary database path for atomic replacement
|
||||
temp_db_path = self._get_db_path(is_temp=True)
|
||||
# Use temporary database path for atomic replacement (only for new index builds)
|
||||
temp_db_path = None
|
||||
original_db_path = self.db_path
|
||||
|
||||
# Remove temp file if it exists
|
||||
if os.path.exists(temp_db_path):
|
||||
os.unlink(temp_db_path)
|
||||
# Check if this is a completely fresh build or a continuation
|
||||
if not is_continuation and not self.index_exists():
|
||||
# Fresh build - use temporary database for atomic replacement
|
||||
temp_db_path = self._get_db_path(is_temp=True)
|
||||
|
||||
# Temporarily switch to temp database for building
|
||||
self.db_path = temp_db_path
|
||||
# Remove temp file if it exists
|
||||
if os.path.exists(temp_db_path):
|
||||
os.unlink(temp_db_path)
|
||||
|
||||
# Switch to temp database for building
|
||||
self.db_path = temp_db_path
|
||||
elif is_continuation:
|
||||
# Check if we're continuing a fresh build (temp db exists)
|
||||
potential_temp_path = self._get_db_path(is_temp=True)
|
||||
if os.path.exists(potential_temp_path):
|
||||
# Continue with temporary database from fresh build
|
||||
temp_db_path = potential_temp_path
|
||||
self.db_path = temp_db_path
|
||||
print(f"Continuation: Using temporary database {temp_db_path}")
|
||||
else:
|
||||
print(f"Continuation: Using regular database {self.db_path}")
|
||||
# If no temp db exists, we're continuing with regular database (already set)
|
||||
|
||||
if temp_db_path:
|
||||
print(f"Working with temporary database: {self.db_path}")
|
||||
else:
|
||||
print(f"Working with regular database: {self.db_path}")
|
||||
|
||||
try:
|
||||
self._update_progress("Setting up search tables", 0, 100, absolute=True)
|
||||
# Setup tables if needed (for fresh builds or when temp DB was just created)
|
||||
if not is_continuation or (temp_db_path and not self._tables_exist()):
|
||||
self._update_progress("Setting up search tables", 0, 100, absolute=True)
|
||||
self._ensure_fts_table()
|
||||
|
||||
# Setup tables in temp database
|
||||
self._ensure_fts_table()
|
||||
# Clear existing index data for fresh build
|
||||
if temp_db_path and not is_continuation:
|
||||
self._with_connection(lambda cursor: cursor.execute("DELETE FROM search_fts"))
|
||||
|
||||
self._update_progress("Fetching records", 20, 100, absolute=True)
|
||||
# Initialize progress tracking (only for completely fresh builds)
|
||||
if not is_continuation:
|
||||
self._initialize_index_progress()
|
||||
|
||||
records = self.get_documents()
|
||||
documents = []
|
||||
# Get current progress
|
||||
progress = self._get_index_progress()
|
||||
|
||||
self._update_progress("Preparing documents", 30, 100, absolute=True)
|
||||
# Check if indexing is already complete
|
||||
if self._is_indexing_complete():
|
||||
self._update_progress("Search index already complete", 100, 100, absolute=True)
|
||||
return
|
||||
|
||||
total_records = len(records)
|
||||
for i, doc in enumerate(records):
|
||||
document = self.prepare_document(doc)
|
||||
if document:
|
||||
documents.append(document)
|
||||
# Process each doctype incrementally
|
||||
total_doctypes = len(self.doc_configs)
|
||||
processed_doctypes = 0
|
||||
|
||||
# Update progress during document preparation
|
||||
if i % 100 == 0:
|
||||
progress = 30 + int((i / total_records) * 20) # 30-50% range
|
||||
self._update_progress("Preparing documents", progress, 100, absolute=True)
|
||||
for doctype in self.doc_configs.keys():
|
||||
doctype_progress = progress.get(doctype, {})
|
||||
|
||||
self._update_progress("Indexing documents", 50, 100, absolute=True)
|
||||
# Skip if doctype is already complete
|
||||
if doctype_progress.get("is_complete"):
|
||||
processed_doctypes += 1
|
||||
continue
|
||||
|
||||
self._index_documents(documents)
|
||||
self._update_progress(
|
||||
f"Indexing {doctype}",
|
||||
20 + (processed_doctypes * 60 // total_doctypes),
|
||||
100,
|
||||
absolute=True,
|
||||
)
|
||||
|
||||
self._update_progress("Building spell correction vocabulary", 80, 100, absolute=True)
|
||||
# Process this doctype in batches
|
||||
last_indexed_modified = doctype_progress.get("last_indexed_modified")
|
||||
batch_count = 0
|
||||
|
||||
# Build vocabulary for spelling correction
|
||||
self._build_vocabulary(documents)
|
||||
while True:
|
||||
# Get batch of documents
|
||||
docs = self.get_documents_paginated(
|
||||
doctype, limit=batch_size, last_indexed_modified=last_indexed_modified
|
||||
)
|
||||
|
||||
# Atomic replacement: move temp database to final location
|
||||
if os.path.exists(original_db_path):
|
||||
os.unlink(original_db_path)
|
||||
os.rename(temp_db_path, original_db_path)
|
||||
if not docs:
|
||||
# No more documents for this doctype
|
||||
self._mark_doctype_complete(doctype)
|
||||
break
|
||||
|
||||
# Prepare and index documents
|
||||
documents = []
|
||||
for doc in docs:
|
||||
document = self.prepare_document(doc)
|
||||
if document:
|
||||
documents.append(document)
|
||||
|
||||
if documents:
|
||||
self._index_documents(documents)
|
||||
|
||||
# Update progress with last processed document's modification time
|
||||
# Use hardcoded 'modified' field since it's reliable in all Frappe doctypes
|
||||
last_doc_modified = docs[-1]["modified"]
|
||||
|
||||
last_doc_name = docs[-1]["name"]
|
||||
self._update_index_progress(doctype, last_doc_name, last_doc_modified, len(documents))
|
||||
last_indexed_modified = last_doc_modified
|
||||
|
||||
batch_count += 1
|
||||
|
||||
# Show progress based on total document counts across all doctypes
|
||||
indexed_docs, total_docs = self._get_indexing_progress()
|
||||
if total_docs > 0:
|
||||
progress_percent = 20 + (indexed_docs * 60) // total_docs
|
||||
self._update_progress(
|
||||
f"Indexing {doctype} {indexed_docs}/{total_docs}",
|
||||
progress_percent,
|
||||
100,
|
||||
absolute=True,
|
||||
)
|
||||
|
||||
processed_doctypes += 1
|
||||
|
||||
# Check if all doctypes are indexed before building vocabulary
|
||||
if not self._is_vocabulary_built_needed():
|
||||
self._update_progress("All documents indexed, building vocabulary", 80, 100, absolute=True)
|
||||
|
||||
# Build vocabulary incrementally
|
||||
self._build_vocabulary_incremental()
|
||||
self._mark_vocabulary_built()
|
||||
|
||||
# Final atomic replacement if this was a fresh build
|
||||
if temp_db_path and os.path.exists(temp_db_path):
|
||||
if os.path.exists(original_db_path):
|
||||
os.unlink(original_db_path)
|
||||
os.rename(temp_db_path, original_db_path)
|
||||
|
||||
self._update_progress("Search index build complete", 100, 100, absolute=True)
|
||||
|
||||
# Print warning summary
|
||||
self._print_warning_summary()
|
||||
|
||||
except Exception:
|
||||
# Clean up temp file on error
|
||||
if os.path.exists(temp_db_path):
|
||||
os.unlink(temp_db_path)
|
||||
except Exception as e:
|
||||
# Log the error
|
||||
frappe.log_error(
|
||||
title="Search Index Build Error",
|
||||
message=f"Error during search index build: {e}",
|
||||
)
|
||||
raise
|
||||
finally:
|
||||
# Restore original database path
|
||||
self.db_path = original_db_path
|
||||
if temp_db_path:
|
||||
self.db_path = original_db_path
|
||||
|
||||
def _get_incomplete_count(self, where_clause):
|
||||
"""Get count of incomplete records from search_index_progress table.
|
||||
|
||||
Args:
|
||||
where_clause: SQL WHERE clause condition (without 'WHERE' keyword)
|
||||
|
||||
Returns:
|
||||
int: Count of matching records, or -1 on error
|
||||
"""
|
||||
try:
|
||||
result = self.sql(
|
||||
f"""
|
||||
SELECT COUNT(*) as incomplete_count
|
||||
FROM search_index_progress
|
||||
WHERE {where_clause}
|
||||
""",
|
||||
read_only=True,
|
||||
)
|
||||
return result[0]["incomplete_count"]
|
||||
except sqlite3.Error:
|
||||
return -1
|
||||
|
||||
def _is_vocabulary_built_needed(self):
|
||||
"""Check if vocabulary still needs to be built."""
|
||||
count = self._get_incomplete_count("is_complete = 0")
|
||||
return count > 0 if count >= 0 else True
|
||||
|
||||
def _build_vocabulary_incremental(self):
|
||||
"""Build vocabulary incrementally from indexed documents."""
|
||||
import re
|
||||
|
||||
word_freq = defaultdict(int)
|
||||
word_regex = re.compile(r"\w+")
|
||||
|
||||
# Get all indexed documents in batches to avoid memory issues
|
||||
batch_size = 1000
|
||||
offset = 0
|
||||
|
||||
while True:
|
||||
try:
|
||||
# Get batch of documents from FTS table
|
||||
documents = self.sql(
|
||||
f"""
|
||||
SELECT title, content
|
||||
FROM search_fts
|
||||
LIMIT {batch_size} OFFSET {offset}
|
||||
""",
|
||||
read_only=True,
|
||||
)
|
||||
|
||||
if not documents:
|
||||
break
|
||||
|
||||
# Process this batch
|
||||
for i, doc in enumerate(documents):
|
||||
# Show progress for large document sets
|
||||
if (offset + i) % 1000 == 0:
|
||||
self._update_progress(
|
||||
f"Processing vocabulary ({offset + i} docs)", 85, 100, absolute=True
|
||||
)
|
||||
|
||||
# Process title and content together
|
||||
combined_text = " ".join([(doc["title"] or "").lower(), (doc["content"] or "").lower()])
|
||||
|
||||
# Extract all words at once
|
||||
words = word_regex.findall(combined_text)
|
||||
|
||||
for word in words:
|
||||
if len(word) > MIN_WORD_LENGTH - 1 and word.isalpha():
|
||||
word_freq[word] += 1
|
||||
|
||||
offset += batch_size
|
||||
|
||||
except sqlite3.Error:
|
||||
break
|
||||
|
||||
# Build vocabulary tables as before
|
||||
if word_freq:
|
||||
# Clear existing data
|
||||
def clear_vocabulary(cursor):
|
||||
cursor.execute("DELETE FROM search_vocabulary")
|
||||
cursor.execute("DELETE FROM search_trigrams")
|
||||
|
||||
self._with_connection(clear_vocabulary)
|
||||
|
||||
# Prepare batch data
|
||||
vocab_data = []
|
||||
trigram_data = []
|
||||
trigram_set = set()
|
||||
|
||||
for word, freq in word_freq.items():
|
||||
vocab_data.append((word, freq, len(word)))
|
||||
|
||||
trigrams = self._generate_trigrams(word)
|
||||
for trigram in trigrams:
|
||||
trigram_key = (trigram, word)
|
||||
if trigram_key not in trigram_set:
|
||||
trigram_set.add(trigram_key)
|
||||
trigram_data.append(trigram_key)
|
||||
|
||||
# Batch insert
|
||||
def insert_vocabulary(cursor):
|
||||
cursor.executemany(
|
||||
"INSERT INTO search_vocabulary (word, frequency, length) VALUES (?, ?, ?)", vocab_data
|
||||
)
|
||||
cursor.executemany("INSERT INTO search_trigrams (trigram, word) VALUES (?, ?)", trigram_data)
|
||||
|
||||
self._with_connection(insert_vocabulary)
|
||||
|
||||
# Status and Validation Methods
|
||||
|
||||
def _table_exists(self, table_name):
|
||||
"""Check if a table exists in the database."""
|
||||
try:
|
||||
result = self.sql(
|
||||
f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}'", read_only=True
|
||||
)
|
||||
return bool(result)
|
||||
except sqlite3.Error:
|
||||
return False
|
||||
|
||||
def index_exists(self):
|
||||
"""Check if FTS index exists."""
|
||||
if not os.path.exists(self.db_path):
|
||||
return False
|
||||
|
||||
try:
|
||||
result = self.sql(
|
||||
"SELECT name FROM sqlite_master WHERE type='table' AND name='search_fts'", read_only=True
|
||||
)
|
||||
return bool(result)
|
||||
except sqlite3.Error:
|
||||
return False
|
||||
return self._table_exists("search_fts")
|
||||
|
||||
def drop_index(self):
|
||||
"""Drop the search index by removing the database file."""
|
||||
|
|
@ -408,6 +614,161 @@ class SQLiteSearch(ABC):
|
|||
|
||||
return records
|
||||
|
||||
def get_documents_paginated(self, doctype, limit=1000, last_indexed_modified=None):
|
||||
"""Get records for a specific doctype with pagination support."""
|
||||
config = self.doc_configs.get(doctype)
|
||||
if not config:
|
||||
return []
|
||||
|
||||
filters = config.get("filters", {}).copy()
|
||||
|
||||
# Ensure 'modified' field is always included for progress tracking
|
||||
fields = config["fields"].copy()
|
||||
if "modified" not in fields:
|
||||
fields.append("modified")
|
||||
|
||||
# Build query with proper ordering and pagination
|
||||
# Order by modified field for reliable resume capability
|
||||
query = frappe.qb.get_query(
|
||||
doctype,
|
||||
fields=fields,
|
||||
filters=filters,
|
||||
order_by="creation ASC, name ASC", # Secondary sort by name for consistency
|
||||
limit=limit,
|
||||
)
|
||||
|
||||
# If resuming from a specific timestamp, filter by modification time
|
||||
# This is more reliable than name-based filtering for VARCHAR names
|
||||
if last_indexed_modified:
|
||||
Table = frappe.qb.DocType(doctype)
|
||||
query = query.where(Table.modified > last_indexed_modified)
|
||||
|
||||
docs = query.run(as_dict=True)
|
||||
|
||||
for doc in docs:
|
||||
doc.doctype = doctype
|
||||
|
||||
return docs
|
||||
|
||||
def _get_index_progress(self):
|
||||
"""Get current indexing progress for all doctypes."""
|
||||
try:
|
||||
result = self.sql(
|
||||
"""
|
||||
SELECT doctype, last_indexed_name, last_indexed_modified,
|
||||
total_docs, indexed_docs, batch_size, is_complete,
|
||||
started_at, updated_at, vocabulary_built
|
||||
FROM search_index_progress
|
||||
""",
|
||||
read_only=True,
|
||||
)
|
||||
|
||||
progress = {}
|
||||
for row in result:
|
||||
progress[row["doctype"]] = dict(row)
|
||||
|
||||
return progress
|
||||
except sqlite3.Error:
|
||||
return {}
|
||||
|
||||
def _initialize_index_progress(self):
|
||||
"""Initialize progress tracking for all doctypes."""
|
||||
|
||||
def init_progress(cursor):
|
||||
# Clear existing progress
|
||||
cursor.execute("DELETE FROM search_index_progress")
|
||||
|
||||
# Initialize progress for each doctype
|
||||
for doctype in self.doc_configs.keys():
|
||||
# Get total count for this doctype
|
||||
config = self.doc_configs[doctype]
|
||||
total_count = frappe.qb.get_query(
|
||||
doctype, filters=config.get("filters", {}), fields=[{"COUNT": "name", "as": "count"}]
|
||||
).run(as_dict=True)[0]["count"]
|
||||
|
||||
cursor.execute(
|
||||
"""
|
||||
INSERT INTO search_index_progress
|
||||
(doctype, total_docs, indexed_docs, batch_size, is_complete, started_at, updated_at, vocabulary_built, last_indexed_modified)
|
||||
VALUES (?, ?, 0, 1000, 0, datetime('now'), datetime('now'), 0, 0)
|
||||
""",
|
||||
(doctype, total_count),
|
||||
)
|
||||
|
||||
self._with_connection(init_progress)
|
||||
|
||||
def _update_index_progress(self, doctype, last_indexed_name, last_indexed_modified, indexed_count):
|
||||
"""Update progress for a specific doctype."""
|
||||
|
||||
def update_progress(cursor):
|
||||
cursor.execute(
|
||||
"""
|
||||
UPDATE search_index_progress
|
||||
SET last_indexed_name = ?,
|
||||
last_indexed_modified = ?,
|
||||
indexed_docs = indexed_docs + ?,
|
||||
updated_at = datetime('now')
|
||||
WHERE doctype = ?
|
||||
""",
|
||||
(last_indexed_name, last_indexed_modified, indexed_count, doctype),
|
||||
)
|
||||
|
||||
self._with_connection(update_progress)
|
||||
|
||||
def _mark_doctype_complete(self, doctype):
|
||||
"""Mark a doctype as completely indexed."""
|
||||
|
||||
def mark_complete(cursor):
|
||||
cursor.execute(
|
||||
"""
|
||||
UPDATE search_index_progress
|
||||
SET is_complete = 1, updated_at = datetime('now')
|
||||
WHERE doctype = ?
|
||||
""",
|
||||
(doctype,),
|
||||
)
|
||||
|
||||
self._with_connection(mark_complete)
|
||||
|
||||
def _mark_vocabulary_built(self):
|
||||
"""Mark vocabulary as built."""
|
||||
|
||||
def mark_built(cursor):
|
||||
cursor.execute("""
|
||||
UPDATE search_index_progress
|
||||
SET vocabulary_built = 1, updated_at = datetime('now')
|
||||
""")
|
||||
|
||||
self._with_connection(mark_built)
|
||||
|
||||
def _is_indexing_complete(self):
|
||||
"""Check if all doctypes are completely indexed and vocabulary is built."""
|
||||
count = self._get_incomplete_count("is_complete = 0 OR vocabulary_built = 0")
|
||||
return count == 0 if count >= 0 else False
|
||||
|
||||
def _get_indexing_progress(self):
|
||||
"""Get overall indexing progress across all doctypes."""
|
||||
try:
|
||||
result = self.sql(
|
||||
"""
|
||||
SELECT SUM(total_docs) as total_docs, SUM(indexed_docs) as indexed_docs
|
||||
FROM search_index_progress
|
||||
""",
|
||||
read_only=True,
|
||||
)
|
||||
|
||||
if result and result[0]:
|
||||
total_docs = result[0]["total_docs"] or 0
|
||||
indexed_docs = result[0]["indexed_docs"] or 0
|
||||
return indexed_docs, total_docs
|
||||
return 0, 0
|
||||
except sqlite3.Error:
|
||||
return 0, 0
|
||||
|
||||
def _tables_exist(self):
|
||||
"""Check if the required tables exist in the current database."""
|
||||
return self._table_exists("search_fts")
|
||||
|
||||
# Private Implementation Methods
|
||||
|
||||
def _execute_search_query(self, fts_query, title_only, filters):
|
||||
|
|
@ -522,6 +883,7 @@ class SQLiteSearch(ABC):
|
|||
ORDER BY bm25_score
|
||||
LIMIT ?
|
||||
"""
|
||||
print(sql)
|
||||
return self.sql(sql, params, read_only=True)
|
||||
|
||||
def _process_search_results(self, raw_results, query):
|
||||
|
|
@ -784,80 +1146,6 @@ class SQLiteSearch(ABC):
|
|||
similarities.sort(key=lambda x: x[1], reverse=True)
|
||||
return [word for word, score in similarities[:max_suggestions]]
|
||||
|
||||
def _build_vocabulary(self, documents):
|
||||
"""Build vocabulary and trigram index from documents for spelling correction."""
|
||||
import re
|
||||
|
||||
word_freq = defaultdict(int)
|
||||
word_regex = re.compile(r"\w+") # Compile regex once for efficiency
|
||||
|
||||
# Extract words from all documents in batches
|
||||
for i, doc in enumerate(documents):
|
||||
# Show progress for large document sets
|
||||
if i % 1000 == 0:
|
||||
progress = 80 + int((i / len(documents)) * 15) # 80-95% range
|
||||
self._update_progress(
|
||||
f"Processing vocabulary ({i}/{len(documents)})", progress, 100, absolute=True
|
||||
)
|
||||
|
||||
# Process title and content together for efficiency
|
||||
combined_text = " ".join(
|
||||
[(doc.get("title", "") or "").lower(), (doc.get("content", "") or "").lower()]
|
||||
)
|
||||
|
||||
# Extract all words at once with compiled regex
|
||||
words = word_regex.findall(combined_text)
|
||||
|
||||
for word in words:
|
||||
if len(word) > MIN_WORD_LENGTH - 1 and word.isalpha(): # Filter out short words and non-alpha
|
||||
word_freq[word] += 1
|
||||
|
||||
# Clear existing data in a single transaction
|
||||
conn = self._get_connection()
|
||||
try:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("DELETE FROM search_vocabulary")
|
||||
cursor.execute("DELETE FROM search_trigrams")
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
if not word_freq:
|
||||
return
|
||||
|
||||
# Prepare batch data for vocabulary
|
||||
vocab_data = []
|
||||
trigram_data = []
|
||||
trigram_set = set() # Use set to avoid duplicate trigrams
|
||||
|
||||
for word, freq in word_freq.items():
|
||||
vocab_data.append((word, freq, len(word)))
|
||||
|
||||
# Generate trigrams for this word
|
||||
trigrams = self._generate_trigrams(word)
|
||||
for trigram in trigrams:
|
||||
trigram_key = (trigram, word)
|
||||
if trigram_key not in trigram_set:
|
||||
trigram_set.add(trigram_key)
|
||||
trigram_data.append(trigram_key)
|
||||
|
||||
# Use batch inserts with a single transaction
|
||||
conn = self._get_connection()
|
||||
try:
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Batch insert vocabulary
|
||||
cursor.executemany(
|
||||
"INSERT INTO search_vocabulary (word, frequency, length) VALUES (?, ?, ?)", vocab_data
|
||||
)
|
||||
|
||||
# Batch insert trigrams (duplicates already removed)
|
||||
cursor.executemany("INSERT INTO search_trigrams (trigram, word) VALUES (?, ?)", trigram_data)
|
||||
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
# Database and Infrastructure Methods
|
||||
|
||||
def _get_connection(self, read_only=False):
|
||||
|
|
@ -886,6 +1174,26 @@ class SQLiteSearch(ABC):
|
|||
if is_read:
|
||||
cursor.execute("PRAGMA query_only = 1;") # Read-only optimization
|
||||
|
||||
def _with_connection(self, callback, read_only=False):
|
||||
"""Execute a callback with a managed database connection.
|
||||
|
||||
Args:
|
||||
callback: Function that takes (cursor) and performs database operations
|
||||
read_only: Whether the connection is read-only
|
||||
|
||||
Returns:
|
||||
The return value of the callback, if any
|
||||
"""
|
||||
conn = self._get_connection(read_only=read_only)
|
||||
try:
|
||||
cursor = conn.cursor()
|
||||
result = callback(cursor)
|
||||
if not read_only:
|
||||
conn.commit()
|
||||
return result
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
def _ensure_fts_table(self):
|
||||
"""Create FTS table and related tables if they don't exist."""
|
||||
# Get schema from subclass
|
||||
|
|
@ -893,11 +1201,7 @@ class SQLiteSearch(ABC):
|
|||
metadata_fields = self.schema["metadata_fields"]
|
||||
tokenizer = self.schema["tokenizer"]
|
||||
|
||||
# Use a single transaction for all table creation operations
|
||||
conn = self._get_connection()
|
||||
try:
|
||||
cursor = conn.cursor()
|
||||
|
||||
def create_tables(cursor):
|
||||
# Create the FTS table with dynamic columns
|
||||
cursor.execute(f"""
|
||||
CREATE VIRTUAL TABLE IF NOT EXISTS search_fts USING fts5(
|
||||
|
|
@ -925,14 +1229,34 @@ class SQLiteSearch(ABC):
|
|||
)
|
||||
""")
|
||||
|
||||
# Create the index progress tracking table
|
||||
cursor.execute("""
|
||||
CREATE TABLE IF NOT EXISTS search_index_progress (
|
||||
id INTEGER PRIMARY KEY,
|
||||
doctype TEXT,
|
||||
last_indexed_name TEXT,
|
||||
last_indexed_modified TEXT,
|
||||
total_docs INTEGER DEFAULT 0,
|
||||
indexed_docs INTEGER DEFAULT 0,
|
||||
batch_size INTEGER DEFAULT 1000,
|
||||
is_complete BOOLEAN DEFAULT 0,
|
||||
started_at DATETIME,
|
||||
updated_at DATETIME,
|
||||
vocabulary_built BOOLEAN DEFAULT 0
|
||||
)
|
||||
""")
|
||||
|
||||
# Index for fast trigram lookups
|
||||
cursor.execute("""
|
||||
CREATE INDEX IF NOT EXISTS idx_trigram_lookup ON search_trigrams(trigram)
|
||||
""")
|
||||
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
# Index for progress tracking lookups
|
||||
cursor.execute("""
|
||||
CREATE INDEX IF NOT EXISTS idx_progress_doctype ON search_index_progress(doctype)
|
||||
""")
|
||||
|
||||
self._with_connection(create_tables)
|
||||
|
||||
def _index_documents(self, documents):
|
||||
"""Bulk index documents into SQLite FTS."""
|
||||
|
|
@ -955,10 +1279,8 @@ class SQLiteSearch(ABC):
|
|||
|
||||
# Process documents in chunks to prevent memory issues with large datasets
|
||||
chunk_size = 1000
|
||||
conn = self._get_connection()
|
||||
try:
|
||||
cursor = conn.cursor()
|
||||
|
||||
def index_chunks(cursor):
|
||||
for i in range(0, len(documents), chunk_size):
|
||||
chunk = documents[i : i + chunk_size]
|
||||
doc_ids_to_delete = []
|
||||
|
|
@ -994,6 +1316,7 @@ class SQLiteSearch(ABC):
|
|||
values.append(doc.get(field, ""))
|
||||
|
||||
doc_ids_to_delete.append(doc_id)
|
||||
|
||||
values_to_insert.append(tuple(values))
|
||||
|
||||
# Delete existing rows for these doc_ids first using a single statement
|
||||
|
|
@ -1006,9 +1329,7 @@ class SQLiteSearch(ABC):
|
|||
if values_to_insert:
|
||||
cursor.executemany(insert_sql, values_to_insert)
|
||||
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
self._with_connection(index_chunks)
|
||||
|
||||
def index_doc(self, doctype, docname):
|
||||
"""Index a single document."""
|
||||
|
|
@ -1369,15 +1690,34 @@ class SQLiteSearch(ABC):
|
|||
|
||||
|
||||
def build_index_if_not_exists():
|
||||
"""Build index if it doesn't exist."""
|
||||
"""Build index if it doesn't exist or continue if temp DB exists.
|
||||
|
||||
Called by scheduler every 3 hours to continue incomplete builds.
|
||||
"""
|
||||
search_classes = get_search_classes()
|
||||
|
||||
for SearchClass in search_classes:
|
||||
build_index(SearchClass, force=False)
|
||||
search = SearchClass()
|
||||
if not search.is_search_enabled():
|
||||
continue
|
||||
|
||||
# Check if a temp DB exists (incomplete build from previous run)
|
||||
temp_db_path = search._get_db_path(is_temp=True)
|
||||
if os.path.exists(temp_db_path):
|
||||
# Continue the incomplete build
|
||||
print(f"{SearchClass.__name__}: Found temp DB, continuing build...")
|
||||
build_index(SearchClass, force=True, is_continuation=True)
|
||||
elif not search.index_exists():
|
||||
# No index exists, start fresh build
|
||||
print(f"{SearchClass.__name__}: No index exists, starting fresh build...")
|
||||
build_index(SearchClass, force=False)
|
||||
|
||||
|
||||
def build_index(
|
||||
SearchClass: type[SQLiteSearch] | None = None, search_class_path: str | None = None, force: bool = False
|
||||
SearchClass: type[SQLiteSearch] | None = None,
|
||||
search_class_path: str | None = None,
|
||||
force: bool = False,
|
||||
is_continuation: bool = False,
|
||||
):
|
||||
"""Build search index for SearchClass"""
|
||||
if not SearchClass and not search_class_path:
|
||||
|
|
@ -1389,29 +1729,69 @@ def build_index(
|
|||
search = SearchClass()
|
||||
if not search.is_search_enabled():
|
||||
return
|
||||
if not search.index_exists() or force:
|
||||
print(f"{SearchClass.__name__}: Index does not exist, building...")
|
||||
search.build_index()
|
||||
|
||||
if search.index_exists() and not force:
|
||||
return
|
||||
|
||||
# For continuation jobs, always proceed regardless of existing index
|
||||
if is_continuation or force:
|
||||
if is_continuation:
|
||||
print(f"{SearchClass.__name__}: Continuing incremental index build...")
|
||||
else:
|
||||
print(f"{SearchClass.__name__}: Index does not exist or force=True, building...")
|
||||
search.build_index(is_continuation=is_continuation)
|
||||
|
||||
|
||||
def _enqueue_index_job(search_class_path: str, is_continuation: bool = False):
|
||||
"""Enqueue a search index build job.
|
||||
|
||||
Args:
|
||||
search_class_path: Full path to the search class (e.g., 'module.ClassName')
|
||||
is_continuation: Whether this is a continuation of an incomplete build
|
||||
"""
|
||||
job_id = f"{search_class_path}_continuation" if is_continuation else search_class_path
|
||||
job_type = "continuation" if is_continuation else "fresh build"
|
||||
print(f"Enqueuing {job_type} for {search_class_path}.build_index")
|
||||
|
||||
# timeout for 2 hour 10 minutes to account for job queue delays
|
||||
timeout = 2 * 60 * 60 + 10 * 60
|
||||
|
||||
enqueue_kwargs = {
|
||||
"queue": "long",
|
||||
"job_id": job_id,
|
||||
"deduplicate": True,
|
||||
"search_class_path": search_class_path,
|
||||
"force": True,
|
||||
"is_continuation": is_continuation,
|
||||
"timeout": timeout,
|
||||
}
|
||||
|
||||
frappe.enqueue("frappe.search.sqlite_search.build_index", **enqueue_kwargs)
|
||||
|
||||
|
||||
def build_index_in_background():
|
||||
"""Enqueue index building in background."""
|
||||
"""Enqueue index building in background.
|
||||
|
||||
Called after migrate to start/continue index building.
|
||||
"""
|
||||
search_classes = get_search_classes()
|
||||
for SearchClass in search_classes:
|
||||
search = SearchClass()
|
||||
if not search.is_search_enabled():
|
||||
return
|
||||
continue
|
||||
|
||||
search_class_path = f"{SearchClass.__module__}.{SearchClass.__name__}"
|
||||
print(f"Enqueuing {search_class_path}.build_index")
|
||||
frappe.enqueue(
|
||||
"frappe.search.sqlite_search.build_index",
|
||||
queue="long",
|
||||
job_id=search_class_path,
|
||||
deduplicate=True,
|
||||
# build_index args
|
||||
search_class_path=search_class_path,
|
||||
force=True,
|
||||
)
|
||||
|
||||
# Check if a temp DB exists (incomplete build from previous run)
|
||||
temp_db_path = search._get_db_path(is_temp=True)
|
||||
if os.path.exists(temp_db_path):
|
||||
# Continue the incomplete build
|
||||
_enqueue_index_job(search_class_path, is_continuation=True)
|
||||
elif not search.index_exists():
|
||||
# No index exists, start fresh build
|
||||
_enqueue_index_job(search_class_path, is_continuation=False)
|
||||
else:
|
||||
print(f"Index for {search_class_path} already exists")
|
||||
|
||||
|
||||
def update_doc_index(doc: Document, method=None):
|
||||
|
|
|
|||
|
|
@ -483,6 +483,29 @@ class TestSQLiteSearchAPI(IntegrationTestCase):
|
|||
disabled_search.build_index() # Should not raise error but do nothing
|
||||
self.assertFalse(disabled_search.index_exists())
|
||||
|
||||
@patch("frappe.enqueue")
|
||||
def test_background_operations(self, mock_enqueue):
|
||||
"""Test background job integration and module-level functions."""
|
||||
from frappe.search.sqlite_search import build_index_in_background, get_search_classes
|
||||
|
||||
# Test getting search classes
|
||||
with patch("frappe.get_hooks") as mock_get_hooks:
|
||||
mock_get_hooks.return_value = ["frappe.tests.test_sqlite_search.TestSQLiteSearch"]
|
||||
classes = get_search_classes()
|
||||
self.assertEqual(len(classes), 1)
|
||||
self.assertEqual(classes[0], TestSQLiteSearch)
|
||||
|
||||
# Ensure index doesn't exist so build_index_in_background will enqueue a job
|
||||
self.search.drop_index()
|
||||
|
||||
# Test background index building
|
||||
with patch("frappe.get_hooks") as mock_get_hooks:
|
||||
mock_get_hooks.return_value = ["frappe.tests.test_sqlite_search.TestSQLiteSearch"]
|
||||
build_index_in_background()
|
||||
|
||||
# Should have enqueued a background job since index doesn't exist
|
||||
self.assertTrue(mock_enqueue.called)
|
||||
|
||||
def test_deduplication_on_reindex(self):
|
||||
"""Test that re-indexing the same document does not create duplicates."""
|
||||
self.search.build_index()
|
||||
|
|
@ -546,26 +569,3 @@ class TestSQLiteSearchAPI(IntegrationTestCase):
|
|||
|
||||
finally:
|
||||
test_note.delete()
|
||||
|
||||
@patch("frappe.enqueue")
|
||||
def test_background_operations(self, mock_enqueue):
|
||||
"""Test background job integration and module-level functions."""
|
||||
from frappe.search.sqlite_search import (
|
||||
build_index_in_background,
|
||||
get_search_classes,
|
||||
)
|
||||
|
||||
# Test getting search classes
|
||||
with patch("frappe.get_hooks") as mock_get_hooks:
|
||||
mock_get_hooks.return_value = ["frappe.tests.test_sqlite_search.TestSQLiteSearch"]
|
||||
classes = get_search_classes()
|
||||
self.assertEqual(len(classes), 1)
|
||||
self.assertEqual(classes[0], TestSQLiteSearch)
|
||||
|
||||
# Test background index building
|
||||
with patch("frappe.get_hooks") as mock_get_hooks:
|
||||
mock_get_hooks.return_value = ["frappe.tests.test_sqlite_search.TestSQLiteSearch"]
|
||||
build_index_in_background()
|
||||
|
||||
# Should have enqueued a background job
|
||||
self.assertTrue(mock_enqueue.called)
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue