diff --git a/frappe/hooks.py b/frappe/hooks.py index ea4537ba0d..8d16696f41 100644 --- a/frappe/hooks.py +++ b/frappe/hooks.py @@ -154,6 +154,7 @@ doc_events = { "frappe.automation.doctype.assignment_rule.assignment_rule.update_due_date", "frappe.core.doctype.user_type.user_type.apply_permissions_for_non_standard_user_type", "frappe.core.doctype.permission_log.permission_log.make_perm_log", + "frappe.search.sqlite_search.update_doc_index", ], "after_rename": "frappe.desk.notifications.clear_doctype_notifications", "on_cancel": [ @@ -164,6 +165,7 @@ doc_events = { "on_trash": [ "frappe.desk.notifications.clear_doctype_notifications", "frappe.workflow.doctype.workflow_action.workflow_action.process_workflow_actions", + "frappe.search.sqlite_search.delete_doc_index", ], "on_update_after_submit": [ "frappe.workflow.doctype.workflow_action.workflow_action.process_workflow_actions", @@ -206,6 +208,7 @@ scheduler_events = { "frappe.deferred_insert.save_to_db", "frappe.automation.doctype.reminder.reminder.send_reminders", "frappe.model.utils.link_count.update_link_count", + "frappe.search.sqlite_search.build_index_if_not_exists", ], # 10 minutes "0/10 * * * *": [ @@ -278,7 +281,10 @@ setup_wizard_exception = [ ] before_migrate = ["frappe.core.doctype.patch_log.patch_log.before_migrate"] -after_migrate = ["frappe.website.doctype.website_theme.website_theme.after_migrate"] +after_migrate = [ + "frappe.website.doctype.website_theme.website_theme.after_migrate", + "frappe.search.sqlite_search.build_index_in_background", +] otp_methods = ["OTP App", "Email", "SMS"] diff --git a/frappe/search/__init__.py b/frappe/search/__init__.py index 81df8f4a80..959e0884a8 100644 --- a/frappe/search/__init__.py +++ b/frappe/search/__init__.py @@ -3,6 +3,7 @@ import frappe from frappe.search.full_text_search import FullTextSearch +from frappe.search.sqlite_search import SQLiteSearch from frappe.search.website_search import WebsiteSearch from frappe.utils import cint diff --git a/frappe/search/sqlite_search.md b/frappe/search/sqlite_search.md new file mode 100644 index 0000000000..2ff3227caa --- /dev/null +++ b/frappe/search/sqlite_search.md @@ -0,0 +1,470 @@ +# SQLite Search Framework + +SQLite Search is a full-text search framework for Frappe applications that provides advanced search capabilities using SQLite's FTS5 (Full-Text Search) engine. It offers features like spelling correction, time-based recency scoring, custom ranking, permission-aware filtering, and extensible scoring pipelines. + +## Table of Contents + +- [Quick Start](#quick-start) +- [How It Works](#how-it-works) +- [Configuration](#configuration) +- [Features & Customization](#features--customization) +- [API Reference](#api-reference) + +## Quick Start + +### 1. Create a Search Class + +Create a search implementation by extending `SQLiteSearch`: + +```python +# my_app/search.py +from frappe.search.sqlite_search import SQLiteSearch + +class MyAppSearch(SQLiteSearch): + # Database file name + INDEX_NAME = "my_app_search.db" + + # Define the search schema + INDEX_SCHEMA = { + "metadata_fields": ["project", "owner", "status"], + "tokenizer": "unicode61 remove_diacritics 2 tokenchars '-_'", + } + + # Define which doctypes to index and their field mappings + INDEXABLE_DOCTYPES = { + "Task": { + "fields": ["name", {"title": "subject"}, {"content": "description"}, "modified", "project", "owner", "status"], + }, + "Issue": { + "fields": ["name", "title", "description", {"modified": "last_updated"}, "project", "owner"], + "filters": {"status": ("!=", "Closed")}, # Only index non-closed issues + }, + } + + def get_search_filters(self): + """Return permission filters for current user""" + # Get projects accessible to current user + accessible_projects = frappe.get_all( + "Project", + filters={"owner": frappe.session.user}, + pluck="name" + ) + + if not accessible_projects: + return {"project": []} # No access + + return {"project": accessible_projects} +``` + +### 2. Register the Search Class + +Add your search class to hooks.py: + +```python +# my_app/hooks.py +sqlite_search = ['my_app.search.MyAppSearch'] +``` + +### 3. Create API Endpoint + +Create a whitelisted method to expose search functionality: + +```python +# my_app/api.py +import frappe +from my_app.search import MyAppSearch + +@frappe.whitelist() +def search(query, filters=None): + search = MyAppSearch() + result = search.search(query, filters=filters) + + return result +``` + +### 4. Build the Index + +Build the search index programmatically or via console: + +```python +from my_app.search import MyAppSearch +search = MyAppSearch() +search.build_index() +``` + +## How It Works + +### 1. Indexing Process + +#### Full Index Building + +When you call `build_index()`, the framework performs a complete index rebuild: + +1. **Database Preparation**: Creates a temporary SQLite database with FTS5 tables configured according to your schema +2. **Document Collection**: Queries all specified doctypes using the configured field mappings and filters +3. **Document Processing**: For each document: + - Extracts and maps fields according to `INDEXABLE_DOCTYPES` configuration + - Cleans HTML content using BeautifulSoup to extract plain text + - Applies custom document preparation logic if `prepare_document()` is overridden + - Validates required fields (title, content) are present +4. **Batch Insertion**: Inserts processed documents into the FTS5 index in batches for performance +5. **Vocabulary Building**: Constructs a spelling correction dictionary from all indexed text +6. **Atomic Replacement**: Replaces the existing index database with the new one atomically + +#### Individual Document Indexing + +For real-time updates using `index_doc()` or `remove_doc()`: + +1. **Single Document Processing**: Retrieves and processes one document using the same field mapping logic +2. **Incremental Update**: Updates the existing FTS5 index by inserting, updating, or deleting the specific document +3. **Vocabulary Update**: Updates the spelling dictionary with new terms from the document + +### 2. Search Process + +When a user performs a search using `search()`, the framework executes these steps: + +1. **Permission Filtering**: Calls `get_search_filters()` to determine what documents the current user can access +2. **Query Preprocessing**: + - Validates the search query is not empty + - Combines user-provided filters with permission filters +3. **Spelling Correction**: + - Analyzes query terms against the vocabulary dictionary + - Uses trigram similarity to suggest corrections for misspelled words + - Expands the original query with corrected terms +4. **FTS5 Query Execution**: + - Constructs an FTS5-compatible query string + - Executes the full-text search against the SQLite database + - Applies metadata filters (status, owner, project, etc.) + - Retrieves raw results with BM25 scores +5. **Results Processing**: + - **Custom Scoring**: Applies the scoring pipeline to calculate final relevance scores + - Base BM25 score processing + - Title matching boosts (exact and partial matches) + - Recency boosting based on document age + - Custom scoring functions (doctype-specific, priority-based, etc.) + - **Ranking**: Sorts results by final scores and assigns rank positions + - **Content Formatting**: Generates content snippets and highlights matching terms + +## Configuration + +### INDEX_SCHEMA + +Defines the structure of your search index: + +```python +INDEX_SCHEMA = { + # Text fields that will be searchable (defaults to ["title", "content"]) + "text_fields": ["title", "content"], + + # Metadata fields stored alongside text content for filtering + "metadata_fields": ["project", "owner", "status", "priority"], + + # FTS5 tokenizer configuration + "tokenizer": "unicode61 remove_diacritics 2 tokenchars '-_@.'" +} +``` + +### INDEXABLE_DOCTYPES + +Specifies which doctypes to index and how to map their fields: + +```python +INDEXABLE_DOCTYPES = { + "Task": { + # Field mapping + "fields": [ + "name", + {"title": "subject"}, # Maps subject field to title + {"content": "description"}, # Maps description field to content + {"modified": "creation"}, # Use creation instead of modified for recency boost + "project", + "owner" + ], + + # Optional filters to limit which records are indexed + "filters": { + "status": ("!=", "Cancelled"), + "docstatus": ("!=", 2) + } + } +} +``` + +### Field Mapping Rules + +- **String fields**: Direct mapping `"field_name"` +- **Aliased fields**: Dictionary mapping `{"schema_field": "doctype_field"}` +- **Required fields**: `title` and `content` fields must be present or explicitly mapped (e.g., `{"title": "subject"}`) +- **Auto-added fields**: `doctype` and `name` are automatically included +- **Modified field**: Added automatically if used in any doctype configuration. Used for recency boosting - if you want to use a different timestamp field (like `creation` or `last_updated`), map it to `modified` using `{"modified": "creation"}` + +## Features & Customization + +### Permission Filtering + +Implement `get_search_filters()` to control access: + +```python +def get_search_filters(self): + """Return filters based on user permissions""" + user = frappe.session.user + + if user == "Administrator": + return {} # No restrictions + + # Example: User can only see their own and public documents + return { + "owner": user, + "status": ["Active", "Published"] + } +``` + +### Custom Scoring + +Create custom scoring functions to influence search relevance: + +```python +class MyAppSearch(SQLiteSearch): + ... + + @SQLiteSearch.scoring_function + def _get_priority_boost(self, row, query, query_words): + """Boost high-priority items""" + priority = row.get("priority", "Medium") + + if priority == "High": + return 1.5 + if priority == "Medium": + return 1.1 + return 1.0 +``` + +### Recency Boosting + +The framework automatically provides time-based recency boosting using the `modified` field: + +```python +# The modified field is used for calculating document age +# Recent documents get higher scores: +# - Last 24 hours: 1.8x boost +# - Last 7 days: 1.5x boost +# - Last 30 days: 1.2x boost +# - Last 90 days: 1.1x boost +# - Older documents: gradually decreasing boost + +# If your doctype uses a different timestamp field, map it to modified: +INDEXABLE_DOCTYPES = { + "GP Discussion": { + "fields": ["name", "title", "content", {"modified": "last_post_at"}, "project"], + }, + "Article": { + "fields": ["name", "title", "content", {"modified": "published_date"}, "category"], + } +} +``` + +### Document Preparation + +Override `prepare_document()` for custom document processing: + +```python +def prepare_document(self, doc): + """Custom document preparation""" + document = super().prepare_document(doc) + if not document: + return None + + # Add computed fields + if doc.doctype == "Task": + # Combine multiple fields into content + content_parts = [ + doc.description or "", + doc.notes or "", + "\n".join([comment.content for comment in doc.get("comments", [])]) + ] + document["content"] = "\n".join(filter(None, content_parts)) + + # set fields that might be stored in another table + document["category"] = get_category_for_task(doc) + + return document +``` + +### Spelling Correction + +The framework includes built-in spelling correction using trigram similarity: + +```python +# Spelling correction happens automatically +search_result = search.search("projetc managment") # Will find "project management" + +# Access correction information +print(search_result["summary"]["corrected_words"]) +# Output: {"projetc": "project", "managment": "management"} +``` + +### Content Processing + +HTML content is automatically cleaned and processed using BeautifulSoup: + +```python +# Complex HTML content like this: +html_content = """ +
Learn how to integrate with our REST API.
+
+ Bearer tokensSee our code examples for details.+
| Method | POST |
, etc.)
+# 2. Strips out scripts, styles, and non-content elements
+# 3. Extracts link text while removing href URLs
+# 4. Normalizes whitespace and line breaks
+```
+
+### Title-Only Search
+
+```python
+results = search.search("project update", title_only=True)
+```
+
+### Advanced Filtering
+
+```python
+accessible_projects = ['PROJ001', 'PROJ002', ...]
+
+filters = {
+ "project": accessible_projects, # Multiple values (IN clause)
+ "owner": current_user, # Single value (= clause)
+}
+
+results = search.search("bug fix", filters=filters)
+```
+
+### Automatic Index Handling
+
+The framework handles index building and maintenance automatically when you register your search class:
+
+```python
+# hooks.py
+sqlite_search = ['my_app.search.MyAppSearch']
+```
+
+**What the framework does automatically:**
+
+1. **Post-Migration Index Building**: Builds the search index automatically after running `bench migrate`
+2. **Periodic Index Verification**: Checks every 15 minutes that the index exists and rebuilds if missing
+3. **Real-time Document Updates**: Automatically calls `index_doc()` and `remove_doc()` on document lifecycle events (insert, update, delete) for all doctypes defined in your `INDEXABLE_DOCTYPES`
+
+## Manual Index Handling
+
+If you prefer to have manual control over the lifecycle of indexing, then you can simply opt out of automatic index handling by not registering the search class in `sqlite_search` hook.
+
+```python
+from my_app.search import MyAppSearch
+
+def build_index_in_background():
+ """Manually trigger background index building"""
+ search = MyAppSearch()
+ if search.is_search_enabled() and not search.index_exists():
+ frappe.enqueue("my_app.search.build_index", queue="long")
+
+# hooks.py
+scheduler_events = {
+ # Custom scheduler (if you want different timing)
+ "daily": ["my_app.search.build_index_if_not_exists"],
+}
+```
+
+## API Reference
+
+#### `search(query, title_only=False, filters=None)`
+Main search method that returns formatted results.
+
+**Parameters:**
+- `query` (str): Search query text
+- `title_only` (bool): Search only in title fields
+- `filters` (dict): Additional filters to apply
+
+**Returns:**
+```python
+{
+ "results": [
+ {
+ "doctype": "Task",
+ "name": "TASK-001",
+ "title": "Fix login bug",
+ "content": "User cannot login after password reset...",
+ "score": 0.85,
+ "original_rank": 3, # original bm25 rank
+ "rank": 1, # modified rank after custom scoring pipeline
+ # ... other metadata fields
+ }
+ ],
+ "summary": {
+ "duration": 0.023,
+ "total_matches": 15,
+ "returned_matches": 15,
+ "corrected_words": {"loggin": "login"},
+ "corrected_query": "Fix login bug",
+ "title_only": False,
+ "filtered_matches": 15,
+ "applied_filters": {"status": ["Open"]}
+ }
+}
+```
+
+#### `build_index()`
+Build the complete search index from scratch.
+
+#### `index_doc(doctype, docname)`
+Index a single document.
+
+#### `remove_doc(doctype, docname)`
+Remove a single document from the index.
+
+#### `is_search_enabled()`
+Check if search is enabled (override to add disable logic).
+
+#### `index_exists()`
+Check if the search index exists.
+
+#### `get_search_filters()`
+**Must be implemented by subclasses.** Return filters for the current user.
+
+**Returns:**
+```python
+{
+ "field_name": "value", # Single value
+ "field_name": ["val1", "val2"], # Multiple values
+}
+```
+
+
+#### `scoring_function()`
+
+Use the `@SQLiteSearch.scoring_function` decorator to mark a function as a scoring function.
diff --git a/frappe/search/sqlite_search.py b/frappe/search/sqlite_search.py
new file mode 100644
index 0000000000..433b2d3d24
--- /dev/null
+++ b/frappe/search/sqlite_search.py
@@ -0,0 +1,1419 @@
+# Copyright (c) 2025, Frappe Technologies Pvt. Ltd. and Contributors
+# MIT License. See license.txt
+
+import datetime
+import inspect
+import os
+import re
+import sqlite3
+import time
+from abc import ABC, abstractmethod
+from collections import defaultdict
+from dataclasses import dataclass
+from enum import Enum
+from typing import Any
+
+from bs4 import BeautifulSoup
+
+import frappe
+from frappe.model.document import Document
+from frappe.utils import update_progress_bar
+
+
+class WarningType(Enum):
+ """Warning types for search indexing."""
+
+ INVALID_DOCUMENT = "invalid_document"
+ MISSING_TEXT_FIELDS = "missing_text_fields"
+ MISSING_CONTENT_FIELD = "missing_content_field"
+ MISSING_TITLE_FIELD = "missing_title_field"
+ MISSING_DOCTYPE = "missing_doctype"
+ MISSING_NAME = "missing_name"
+ OTHER = "other"
+
+
+@dataclass
+class IndexWarning:
+ """Structured warning for search indexing."""
+
+ type: WarningType
+ message: str
+ doctype: str | None = None
+ docname: str | None = None
+ field: str | None = None
+ missing_fields: list | None = None
+
+ def __str__(self):
+ return self.message
+
+
+class SQLiteSearchIndexMissingError(Exception):
+ pass
+
+
+# Search Configuration Constants
+MAX_SEARCH_RESULTS = 100
+SNIPPET_LENGTH = 64
+MIN_WORD_LENGTH = 4
+MAX_EDIT_DISTANCE = 3
+MIN_SIMILARITY_THRESHOLD = 0.6
+MAX_SPELLING_SUGGESTIONS = 3
+SIMILARITY_TRIGRAM_WEIGHT = 0.7
+SIMILARITY_SEQUENCE_WEIGHT = 0.3
+FREQUENCY_BOOST_FACTOR = 1000
+MAX_FREQUENCY_BOOST = 1.2
+RECENCY_DECAY_RATE = 0.005 # Linear decay per day beyond 90 days
+MIN_RECENCY_BOOST = 0.5
+TITLE_EXACT_MATCH_BOOST = 5.0
+TITLE_PARTIAL_MATCH_BOOST = 2.0
+DISCUSSION_BOOST = 1.2
+COMMENT_BOOST = 1.0
+
+# Time-based recency categories for aggressive boosting
+RECENT_HOURS_BOOST = 1.8 # Documents from last 24 hours
+RECENT_WEEK_BOOST = 1.5 # Documents from last 7 days
+RECENT_MONTH_BOOST = 1.2 # Documents from last 30 days
+RECENT_QUARTER_BOOST = 1.1 # Documents from last 90 days
+
+
+class SQLiteSearch(ABC):
+ """
+ Abstract base class for SQLite FTS5-based full-text search for Frappe.
+
+ Provides full-text search with advanced features:
+ - Spelling correction using trigram similarity
+ - Time-based recency boost with categorical scoring
+ - Custom scoring with title matching and document type boosts
+ - Ranking tracking (original BM25 vs modified scores)
+ - Filtering by user-defined criteria
+ - Permission-aware search results via query-level filtering
+ """
+
+ @staticmethod
+ def scoring_function(func):
+ """
+ Decorator to mark methods as scoring functions that should be automatically
+ included in the scoring pipeline.
+
+ Usage:
+ @SQLiteSearch.scoring_function
+ def custom_boost(self, row, query, query_words):
+ return 1.5
+ """
+ func._is_scoring_function = True
+ return func
+
+ def __init__(self, db_name=None):
+ # Use class-level INDEX_NAME if db_name not provided
+ if db_name is None:
+ db_name = getattr(self, "INDEX_NAME", "search.db")
+
+ self.db_name = db_name
+ self.db_path = self._get_db_path()
+
+ # Validate required class attributes early
+ if not hasattr(self, "INDEX_SCHEMA"):
+ raise ValueError("INDEX_SCHEMA must be defined as a class-level variable")
+ if not hasattr(self, "INDEXABLE_DOCTYPES"):
+ raise ValueError("INDEXABLE_DOCTYPES must be defined as a class-level variable")
+
+ self.doc_configs = self._build_doc_configs()
+ self.warnings: list[IndexWarning] = [] # Collect warnings during indexing
+ self.schema = self._get_schema()
+ self._validate_config()
+
+ # Helper Methods for New API
+
+ def _parse_doctype_fields(self, doctype, config):
+ """Parse field definitions for a doctype to extract field names and mappings."""
+ if "fields" not in config:
+ raise ValueError(f"Missing 'fields' in configuration for doctype '{doctype}'")
+
+ parsed_fields = []
+ field_mappings = {}
+
+ for field_def in config["fields"]:
+ if isinstance(field_def, str):
+ parsed_fields.append(field_def)
+ elif isinstance(field_def, dict):
+ for schema_field, doctype_field in field_def.items():
+ parsed_fields.append(doctype_field)
+ field_mappings[schema_field] = doctype_field
+ else:
+ raise ValueError(f"Invalid field definition: {field_def}")
+
+ return parsed_fields, field_mappings
+
+ def _build_doc_configs(self):
+ """Build document configurations from class-level INDEXABLE_DOCTYPES."""
+ doc_configs = {}
+ for doctype, config in self.INDEXABLE_DOCTYPES.items():
+ parsed_fields, field_mappings = self._parse_doctype_fields(doctype, config)
+
+ # Determine content field
+ content_field = field_mappings.get("content")
+ if not content_field:
+ if "content" in parsed_fields:
+ content_field = "content"
+ else:
+ raise ValueError(
+ f"Content field must be present in fields list or explicitly mapped for '{doctype}'"
+ )
+
+ # Determine title field
+ title_field = field_mappings.get("title")
+ if not title_field and "title" in parsed_fields:
+ title_field = "title"
+
+ doc_configs[doctype] = {
+ "fields": parsed_fields,
+ "field_mappings": field_mappings,
+ "content_field": content_field,
+ "title_field": title_field,
+ "modified_field": field_mappings.get("modified", "modified"),
+ "filters": config.get("filters", {}),
+ }
+
+ return doc_configs
+
+ def _get_schema(self):
+ """Get the search index schema with automatic defaults."""
+ if not hasattr(self, "INDEX_SCHEMA"):
+ raise ValueError("INDEX_SCHEMA must be defined as a class-level variable")
+
+ schema = self.INDEX_SCHEMA.copy()
+
+ # Default text fields to title and content
+ schema.setdefault("text_fields", ["title", "content"])
+
+ # Default tokenizer
+ schema.setdefault("tokenizer", "unicode61 remove_diacritics 2")
+
+ # Automatically add required metadata fields
+ metadata_fields = schema.setdefault("metadata_fields", [])
+ required_fields = ["doctype", "name"]
+
+ for field in required_fields:
+ if field not in metadata_fields:
+ metadata_fields.append(field)
+
+ # Add 'modified' to metadata if it's used in the schema or any doctype config
+ is_modified_in_schema = "modified" in self.INDEX_SCHEMA.get("metadata_fields", [])
+ is_modified_in_doctypes = any(
+ "modified" in config.get("field_mappings", {}) or "modified" in config.get("fields", [])
+ for config in self.doc_configs.values()
+ )
+
+ if (is_modified_in_schema or is_modified_in_doctypes) and "modified" not in metadata_fields:
+ metadata_fields.append("modified")
+
+ schema["metadata_fields"] = metadata_fields
+
+ return schema
+
+ # Abstract Method - Must be implemented by subclasses
+
+ @abstractmethod
+ def get_search_filters(self):
+ """
+ Return filters to apply to search results.
+
+ Returns:
+ dict: Permission filters in format:
+ {
+ "field_name": value, # Single value: field = value
+ "field_name": [val1, val2] # List: field IN (val1, val2)
+ }
+ """
+ pass
+
+ # Public API Methods
+
+ def search(self, query, title_only=False, filters=None):
+ """
+ Main search method with advanced filtering support.
+
+ Args:
+ query (str): Search query text
+ title_only (bool): Whether to search only in titles
+ filters (dict): Optional filters by field names
+
+ Returns:
+ dict: Search results with summary statistics
+ """
+ if not self.is_search_enabled():
+ return self._empty_search_result(title_only, filters)
+
+ self.raise_if_not_indexed()
+
+ if not query:
+ return self._empty_search_result(title_only, filters)
+
+ start_time = time.time()
+
+ # Prepare filters if provided
+ filters = filters or {}
+
+ # Get permission filters from subclass
+ permission_filters = self.get_search_filters()
+
+ # Combine user filters with permission filters
+ all_filters = {**filters, **permission_filters}
+
+ # Prepare FTS5 query with spelling correction
+ expanded_query, corrections = self._expand_query_with_corrections(query)
+ fts_query = self._prepare_fts_query(expanded_query)
+
+ try:
+ raw_results = self._execute_search_query(fts_query, title_only, all_filters)
+ total_matches = len(raw_results)
+ except sqlite3.Error as e:
+ frappe.log_error(f"Search query failed: {e}")
+ raw_results = []
+ total_matches = 0
+
+ # Process results
+ processed_results = self._process_search_results(raw_results, query)
+
+ duration = time.time() - start_time
+
+ return {
+ "results": processed_results,
+ "summary": {
+ "duration": round(duration, 3),
+ "total_matches": total_matches,
+ "returned_matches": total_matches,
+ "corrected_words": corrections,
+ "corrected_query": expanded_query if corrections else None,
+ "title_only": title_only,
+ "filtered_matches": len(processed_results),
+ "applied_filters": filters,
+ },
+ }
+
+ def build_index(self):
+ """Build the complete search index from scratch using atomic replacement."""
+ if not self.is_search_enabled():
+ return
+
+ # Use temporary database path for atomic replacement
+ temp_db_path = self._get_db_path(is_temp=True)
+ original_db_path = self.db_path
+
+ # Remove temp file if it exists
+ if os.path.exists(temp_db_path):
+ os.unlink(temp_db_path)
+
+ # Temporarily switch to temp database for building
+ self.db_path = temp_db_path
+
+ try:
+ self._update_progress("Setting up search tables", 0, 100, absolute=True)
+
+ # Setup tables in temp database
+ self._ensure_fts_table()
+
+ self._update_progress("Fetching records", 20, 100, absolute=True)
+
+ records = self.get_documents()
+ documents = []
+
+ self._update_progress("Preparing documents", 30, 100, absolute=True)
+
+ total_records = len(records)
+ for i, doc in enumerate(records):
+ document = self.prepare_document(doc)
+ if document:
+ documents.append(document)
+
+ # Update progress during document preparation
+ if i % 100 == 0:
+ progress = 30 + int((i / total_records) * 20) # 30-50% range
+ self._update_progress("Preparing documents", progress, 100, absolute=True)
+
+ self._update_progress("Indexing documents", 50, 100, absolute=True)
+
+ self._index_documents(documents)
+
+ self._update_progress("Building spell correction vocabulary", 80, 100, absolute=True)
+
+ # Build vocabulary for spelling correction
+ self._build_vocabulary(documents)
+
+ # Atomic replacement: move temp database to final location
+ if os.path.exists(original_db_path):
+ os.unlink(original_db_path)
+ os.rename(temp_db_path, original_db_path)
+
+ self._update_progress("Search index build complete", 100, 100, absolute=True)
+
+ # Print warning summary
+ self._print_warning_summary()
+
+ except Exception:
+ # Clean up temp file on error
+ if os.path.exists(temp_db_path):
+ os.unlink(temp_db_path)
+ raise
+ finally:
+ # Restore original database path
+ self.db_path = original_db_path
+
+ # Status and Validation Methods
+
+ def index_exists(self):
+ """Check if FTS index exists."""
+ if not os.path.exists(self.db_path):
+ return False
+
+ try:
+ result = self.sql(
+ "SELECT name FROM sqlite_master WHERE type='table' AND name='search_fts'", read_only=True
+ )
+ return bool(result)
+ except sqlite3.Error:
+ return False
+
+ def drop_index(self):
+ """Drop the search index by removing the database file."""
+ if os.path.exists(self.db_path):
+ try:
+ os.unlink(self.db_path)
+ except OSError as e:
+ frappe.log_error(f"Failed to remove search index file {self.db_path}: {e}")
+ raise
+
+ def is_search_enabled(self):
+ """Override this to enable/disable search"""
+ return True
+
+ def raise_if_not_indexed(self):
+ """Raise exception if search index doesn't exist."""
+ if not self.index_exists():
+ raise SQLiteSearchIndexMissingError("Search index does not exist. Please build the index first.")
+
+ def get_documents(self):
+ """Get all records to be indexed."""
+ records = []
+ for doctype, config in self.doc_configs.items():
+ docs = frappe.qb.get_query(
+ doctype, fields=config["fields"], filters=config.get("filters", {})
+ ).run(as_dict=True)
+
+ for doc in docs:
+ doc.doctype = doctype
+ if config["modified_field"] != "modified":
+ doc.modified = getattr(doc, config["modified_field"], None) or doc.modified
+ records.append(doc)
+
+ return records
+
+ # Private Implementation Methods
+
+ def _execute_search_query(self, fts_query, title_only, filters):
+ """Execute the FTS search query with optional filters."""
+ # Build filter conditions
+ filter_conditions = []
+ filter_params = []
+
+ if filters:
+ # Build filter conditions dynamically
+ for field, values in filters.items():
+ if not values and isinstance(values, list):
+ # If filter is an empty list, it should not match any documents.
+ filter_conditions.append("1=0")
+ continue
+
+ if not values: # Skip empty filters
+ continue
+
+ if isinstance(values, list):
+ if len(values) == 1:
+ filter_conditions.append(f"{field} = ?")
+ filter_params.append(values[0])
+ else:
+ placeholders = ",".join(["?" for _ in values])
+ filter_conditions.append(f"{field} IN ({placeholders})")
+ filter_params.extend(values)
+ else:
+ filter_conditions.append(f"{field} = ?")
+ filter_params.append(values)
+
+ # Combine filter conditions with AND
+ filter_clause = ""
+ if filter_conditions:
+ filter_clause = "AND " + " AND ".join(filter_conditions)
+
+ # Get schema to build dynamic SELECT fields
+ text_fields = self.schema["text_fields"]
+ metadata_fields = self.schema["metadata_fields"]
+
+ # Build SELECT clause with all fields
+ select_fields = []
+
+ # Add title highlighting
+ title_field = "title" if "title" in text_fields else text_fields[0] if text_fields else "doc_id"
+ title_column_index = self._get_text_field_column_index(title_field)
+ if title_column_index is not None:
+ select_fields.append(f"highlight(search_fts, {title_column_index}, '', '') as title")
+ else:
+ select_fields.append(f"{title_field} as title")
+
+ # Add content snippet or highlighting
+ if not title_only and "content" in text_fields:
+ content_index = self._get_text_field_column_index("content")
+ select_fields.append(
+ f"snippet(search_fts, {content_index}, '', '', '...', ?) as content"
+ )
+ elif "content" in text_fields:
+ select_fields.append("content")
+
+ # Add all other fields
+ for field in metadata_fields:
+ if field != "doc_id": # Already handled above
+ select_fields.append(field)
+
+ # Add scoring fields
+ select_fields.extend(["bm25(search_fts) as bm25_score", f"{title_field} as original_title"])
+
+ select_clause = ",\n ".join(select_fields)
+
+ if title_only:
+ sql = f"""
+ SELECT
+ doc_id,
+ {select_clause}
+ FROM search_fts
+ WHERE search_fts MATCH ?
+ AND {title_field} MATCH ?
+ {filter_clause}
+ ORDER BY bm25_score
+ LIMIT ?
+ """
+ return self.sql(sql, (fts_query, fts_query, *filter_params, MAX_SEARCH_RESULTS), read_only=True)
+ else:
+ params = []
+ if "content" in text_fields:
+ params.append(SNIPPET_LENGTH)
+ params.extend([fts_query, *filter_params, MAX_SEARCH_RESULTS])
+
+ sql = f"""
+ SELECT
+ doc_id,
+ {select_clause}
+ FROM search_fts
+ WHERE search_fts MATCH ?
+ {filter_clause}
+ ORDER BY bm25_score
+ LIMIT ?
+ """
+ return self.sql(sql, params, read_only=True)
+
+ def _process_search_results(self, raw_results, query):
+ """Process search results with scoring."""
+ processed_results = []
+ query_words = query.split()
+
+ # Get schema configuration
+ text_fields = self.schema["text_fields"]
+ metadata_fields = self.schema["metadata_fields"]
+
+ # 1-based ranking
+ for original_rank, row in enumerate(raw_results, 1):
+ # Apply advanced heuristics scoring
+ score = self._calculate_advanced_score(row, query, query_words)
+
+ # Build result dynamically based on schema
+ result = {
+ "id": row["doc_id"],
+ "score": score,
+ "original_rank": original_rank,
+ "bm25_score": row["bm25_score"],
+ }
+
+ # Add text fields
+ for field in text_fields:
+ result[field] = row[field] if field in row.keys() else ""
+
+ # Add metadata fields
+ for field in metadata_fields:
+ if field == "owner":
+ # Map owner to author for backward compatibility
+ result["author"] = row["owner"] if "owner" in row.keys() else ""
+ else:
+ result[field] = row[field] if field in row.keys() else None
+
+ processed_results.append(result)
+
+ # Sort by custom score (descending - higher is better)
+ processed_results.sort(key=lambda x: x["score"], reverse=True)
+
+ # Add modified ranking after custom scoring
+ for i, result in enumerate(processed_results):
+ result["modified_rank"] = i + 1
+
+ return processed_results
+
+ def get_scoring_pipeline(self):
+ """
+ Return the scoring pipeline, a list of methods to calculate the final score.
+ Each method in the list should accept either (row, query) or (row, query, query_words)
+ and return a float. The final score is the product of all values returned by the pipeline methods.
+ Subclasses can override this to customize the scoring logic.
+ """
+ pipeline = [
+ self._get_base_score,
+ self._get_title_boost,
+ ]
+
+ # Only add recency boost if modified is available in the schema
+ if "modified" in self.schema["metadata_fields"]:
+ pipeline.append(self._get_recency_boost)
+
+ # Automatically discover and add decorated scoring functions
+ for attr_name in dir(self):
+ attr = getattr(self, attr_name)
+ if callable(attr) and hasattr(attr, "_is_scoring_function"):
+ pipeline.append(attr)
+
+ return pipeline
+
+ def _calculate_advanced_score(self, row, query, query_words):
+ """
+ Calculate the final score by executing the scoring pipeline.
+ The final score is the product of all scores returned by the pipeline methods.
+ """
+ pipeline = self.get_scoring_pipeline()
+ final_score = 1.0
+
+ for scoring_method in pipeline:
+ # Check method signature to determine how to call it
+ sig = inspect.signature(scoring_method)
+ params = list(sig.parameters.keys())
+
+ # Skip 'self' parameter
+ if params and params[0] == "self":
+ params = params[1:]
+
+ # Call method based on its signature
+ if len(params) >= 3 or "query_words" in params:
+ # Method accepts query_words parameter
+ final_score *= scoring_method(row, query, query_words)
+ else:
+ # Method only accepts row and query
+ final_score *= scoring_method(row, query)
+
+ return final_score
+
+ def _get_base_score(self, row, query):
+ """Calculate the base score from BM25."""
+ bm25_score = abs(row["bm25_score"]) if row["bm25_score"] is not None else 0
+ return 1.0 / (1.0 + bm25_score) if bm25_score > 0 else 0.5
+
+ def _get_title_boost(self, row, query, query_words):
+ """Calculate the title matching boost."""
+ original_title = (row["original_title"] or "").lower()
+ query_lower = query.lower()
+
+ if query_lower in original_title:
+ return TITLE_EXACT_MATCH_BOOST
+ if any(word.lower() in original_title for word in query_words):
+ return TITLE_PARTIAL_MATCH_BOOST
+ return 1.0
+
+ def _get_recency_boost(self, row, query):
+ """Calculate the time-based recency boost."""
+ # Return neutral boost if modified is not available
+ if "modified" not in row or row["modified"] is None:
+ return 1.0
+
+ current_time = time.time()
+ doc_timestamp = row["modified"]
+ hours_old = (current_time - doc_timestamp) / 3600
+ days_old = hours_old / 24
+
+ if hours_old <= 24:
+ return RECENT_HOURS_BOOST
+ if days_old <= 7:
+ return RECENT_WEEK_BOOST
+ if days_old <= 30:
+ return RECENT_MONTH_BOOST
+ if days_old <= 90:
+ return RECENT_QUARTER_BOOST
+
+ # Older documents get linear decay
+ days_beyond_90 = days_old - 90
+ return max(MIN_RECENCY_BOOST, RECENT_QUARTER_BOOST - (days_beyond_90 * RECENCY_DECAY_RATE))
+
+ def _get_text_field_column_index(self, field_name):
+ """Get the 1-based column index of a text field in the FTS table."""
+ try:
+ # FTS table columns are doc_id, then text_fields...
+ # So index is 1 (for doc_id) + index in text_fields list
+ return 1 + self.schema["text_fields"].index(field_name)
+ except ValueError:
+ return None
+
+ # Spelling Correction Methods
+
+ def _expand_query_with_corrections(self, query):
+ """Expand query with spelling corrections."""
+ words = query.strip().split()
+ expanded_terms = []
+ corrections = {}
+
+ for word in words:
+ similar_words = self._find_similar_words(word)
+ if similar_words and similar_words[0] != word:
+ # Replace the misspelled word with the corrected word
+ corrected_word = similar_words[0]
+ expanded_terms.append(corrected_word)
+ corrections[word] = corrected_word
+ else:
+ expanded_terms.append(word)
+
+ expanded_query = " ".join(expanded_terms)
+ return expanded_query, corrections if corrections else None
+
+ def _find_similar_words(
+ self, word, max_suggestions=MAX_SPELLING_SUGGESTIONS, min_similarity=MIN_SIMILARITY_THRESHOLD
+ ):
+ """Find similar words using indexed trigram similarity - much faster!"""
+ import difflib
+
+ word = word.lower()
+ if len(word) < MIN_WORD_LENGTH:
+ return []
+
+ word_trigrams = self._generate_trigrams(word)
+ word_length = len(word)
+
+ try:
+ # Find candidate words that share trigrams (MUCH faster than checking all words)
+ placeholders = ",".join("?" * len(word_trigrams))
+ candidates = self.sql(
+ f"""
+ SELECT t.word, v.frequency, v.length, COUNT(*) as shared_trigrams
+ FROM search_trigrams t
+ JOIN search_vocabulary v ON t.word = v.word
+ WHERE t.trigram IN ({placeholders})
+ AND ABS(v.length - ?) <= ? -- Length filter for efficiency
+ GROUP BY t.word, v.frequency, v.length
+ HAVING shared_trigrams >= 1 -- Must share at least 1 trigram
+ ORDER BY shared_trigrams DESC, v.frequency DESC
+ """,
+ (*word_trigrams, word_length, MAX_EDIT_DISTANCE),
+ read_only=True,
+ )
+ except sqlite3.Error:
+ return []
+
+ similarities = []
+ word_trigram_set = set(word_trigrams)
+
+ for candidate_word, freq, candidate_length, _ in candidates:
+ # Quick length-based filter
+ if abs(candidate_length - word_length) > MAX_EDIT_DISTANCE:
+ continue
+
+ candidate_trigrams = set(self._generate_trigrams(candidate_word))
+
+ # Jaccard similarity for trigrams
+ intersection = len(word_trigram_set & candidate_trigrams)
+ union = len(word_trigram_set | candidate_trigrams)
+ trigram_similarity = intersection / union if union > 0 else 0
+
+ # Skip if trigram similarity is too low
+ if trigram_similarity < 0.3:
+ continue
+
+ # Sequence similarity for additional accuracy (only for promising candidates)
+ seq_similarity = difflib.SequenceMatcher(None, word, candidate_word).ratio()
+
+ # Combined similarity with frequency boost
+ combined_similarity = (
+ trigram_similarity * SIMILARITY_TRIGRAM_WEIGHT + seq_similarity * SIMILARITY_SEQUENCE_WEIGHT
+ )
+ frequency_boost = min(
+ MAX_FREQUENCY_BOOST, 1.0 + (freq / FREQUENCY_BOOST_FACTOR)
+ ) # Slight boost for common words
+ final_score = combined_similarity * frequency_boost
+
+ if final_score >= min_similarity:
+ similarities.append((candidate_word, final_score))
+
+ # Sort by similarity and return top suggestions
+ similarities.sort(key=lambda x: x[1], reverse=True)
+ return [word for word, score in similarities[:max_suggestions]]
+
+ def _build_vocabulary(self, documents):
+ """Build vocabulary and trigram index from documents for spelling correction."""
+ import re
+
+ word_freq = defaultdict(int)
+ word_regex = re.compile(r"\w+") # Compile regex once for efficiency
+
+ # Extract words from all documents in batches
+ for i, doc in enumerate(documents):
+ # Show progress for large document sets
+ if i % 1000 == 0:
+ progress = 80 + int((i / len(documents)) * 15) # 80-95% range
+ self._update_progress(
+ f"Processing vocabulary ({i}/{len(documents)})", progress, 100, absolute=True
+ )
+
+ # Process title and content together for efficiency
+ combined_text = " ".join(
+ [(doc.get("title", "") or "").lower(), (doc.get("content", "") or "").lower()]
+ )
+
+ # Extract all words at once with compiled regex
+ words = word_regex.findall(combined_text)
+
+ for word in words:
+ if len(word) > MIN_WORD_LENGTH - 1 and word.isalpha(): # Filter out short words and non-alpha
+ word_freq[word] += 1
+
+ # Clear existing data in a single transaction
+ conn = self._get_connection()
+ try:
+ cursor = conn.cursor()
+ cursor.execute("DELETE FROM search_vocabulary")
+ cursor.execute("DELETE FROM search_trigrams")
+ conn.commit()
+ finally:
+ conn.close()
+
+ if not word_freq:
+ return
+
+ # Prepare batch data for vocabulary
+ vocab_data = []
+ trigram_data = []
+ trigram_set = set() # Use set to avoid duplicate trigrams
+
+ for word, freq in word_freq.items():
+ vocab_data.append((word, freq, len(word)))
+
+ # Generate trigrams for this word
+ trigrams = self._generate_trigrams(word)
+ for trigram in trigrams:
+ trigram_key = (trigram, word)
+ if trigram_key not in trigram_set:
+ trigram_set.add(trigram_key)
+ trigram_data.append(trigram_key)
+
+ # Use batch inserts with a single transaction
+ conn = self._get_connection()
+ try:
+ cursor = conn.cursor()
+
+ # Batch insert vocabulary
+ cursor.executemany(
+ "INSERT INTO search_vocabulary (word, frequency, length) VALUES (?, ?, ?)", vocab_data
+ )
+
+ # Batch insert trigrams (duplicates already removed)
+ cursor.executemany("INSERT INTO search_trigrams (trigram, word) VALUES (?, ?)", trigram_data)
+
+ conn.commit()
+ finally:
+ conn.close()
+
+ # Database and Infrastructure Methods
+
+ def _get_connection(self, read_only=False):
+ """Get SQLite connection with FTS5 support and performance optimizations."""
+ try:
+ conn = sqlite3.connect(self.db_path)
+ conn.row_factory = sqlite3.Row
+
+ # Apply performance optimizations
+ cursor = conn.cursor()
+ self._set_pragmas(cursor, read_only)
+
+ # Test the connection
+ cursor.execute("SELECT 1")
+ return conn
+ except sqlite3.Error as e:
+ frappe.log_error(f"Failed to connect to search database: {e}")
+ raise SQLiteSearchIndexMissingError(f"Search database connection failed: {e}") from e
+
+ def _set_pragmas(self, cursor, is_read=False):
+ """Set SQLite performance pragmas."""
+ cursor.execute("PRAGMA journal_mode = WAL;") # Write-Ahead Logging for concurrency
+ cursor.execute("PRAGMA synchronous = NORMAL;") # Better performance vs FULL
+ cursor.execute("PRAGMA cache_size = -8192;") # 8MB cache
+ cursor.execute("PRAGMA temp_store = MEMORY;") # Memory temp storage
+ if is_read:
+ cursor.execute("PRAGMA query_only = 1;") # Read-only optimization
+
+ def _ensure_fts_table(self):
+ """Create FTS table and related tables if they don't exist."""
+ # Get schema from subclass
+ text_fields = self.schema["text_fields"]
+ metadata_fields = self.schema["metadata_fields"]
+ tokenizer = self.schema["tokenizer"]
+
+ # Use a single transaction for all table creation operations
+ conn = self._get_connection()
+ try:
+ cursor = conn.cursor()
+
+ # Create the FTS table with dynamic columns
+ cursor.execute(f"""
+ CREATE VIRTUAL TABLE IF NOT EXISTS search_fts USING fts5(
+ doc_id UNINDEXED,
+ {", ".join([f"{field}" for field in text_fields])},
+ {", ".join([f"{field} UNINDEXED" for field in metadata_fields])},
+ tokenize="{tokenizer}"
+ )
+ """)
+
+ # Create the vocabulary and trigram tables
+ cursor.execute("""
+ CREATE TABLE IF NOT EXISTS search_vocabulary (
+ word TEXT PRIMARY KEY,
+ frequency INTEGER DEFAULT 1,
+ length INTEGER
+ )
+ """)
+
+ cursor.execute("""
+ CREATE TABLE IF NOT EXISTS search_trigrams (
+ trigram TEXT,
+ word TEXT,
+ PRIMARY KEY (trigram, word)
+ )
+ """)
+
+ # Index for fast trigram lookups
+ cursor.execute("""
+ CREATE INDEX IF NOT EXISTS idx_trigram_lookup ON search_trigrams(trigram)
+ """)
+
+ conn.commit()
+ finally:
+ conn.close()
+
+ def _index_documents(self, documents):
+ """Bulk index documents into SQLite FTS."""
+ if not documents:
+ return
+
+ # Get schema configuration to build dynamic insert SQL
+ text_fields = self.schema["text_fields"]
+ metadata_fields = self.schema["metadata_fields"]
+
+ # Always add doc_id as first field (required for FTS)
+ all_fields = ["doc_id", *text_fields, *metadata_fields]
+ placeholders = ",".join(["?" for _ in all_fields])
+ field_names = ",".join(all_fields)
+
+ insert_sql = f"""
+ INSERT INTO search_fts ({field_names})
+ VALUES ({placeholders})
+ """
+
+ # Process documents in chunks to prevent memory issues with large datasets
+ chunk_size = 1000
+ conn = self._get_connection()
+ try:
+ cursor = conn.cursor()
+
+ for i in range(0, len(documents), chunk_size):
+ chunk = documents[i : i + chunk_size]
+ values_to_insert = []
+
+ for doc in chunk:
+ # Validate document has required fields
+ if not doc.get("doctype") or not doc.get("name"):
+ self._warn_invalid_document(doc, "missing doctype/name")
+ continue
+
+ # Validate text fields are present
+ missing_text_fields = []
+ for field in text_fields:
+ if field not in doc or doc[field] is None:
+ missing_text_fields.append(field)
+
+ if missing_text_fields:
+ self._warn_missing_text_fields(
+ doc.get("doctype", ""), doc.get("name", ""), missing_text_fields
+ )
+ continue
+
+ # Build values tuple dynamically based on schema
+ values = []
+ for field in all_fields:
+ # Build doc_id automatically from doctype:name
+ if field == "doc_id":
+ doc_id = doc.get("id") or f"{doc.get('doctype', '')}:{doc.get('name', '')}"
+ values.append(doc_id)
+ else:
+ values.append(doc.get(field, ""))
+
+ values_to_insert.append(tuple(values))
+
+ # Insert the chunk
+ if values_to_insert:
+ cursor.executemany(insert_sql, values_to_insert)
+
+ conn.commit()
+ finally:
+ conn.close()
+
+ def index_doc(self, doctype, docname):
+ """Index a single document."""
+ doc = frappe.get_doc(doctype, docname)
+ self.raise_if_not_indexed()
+ document = self.prepare_document(doc)
+ if document:
+ self._index_documents([document])
+
+ def remove_doc(self, doctype, docname):
+ """Remove a single document from the index."""
+ self.raise_if_not_indexed()
+ doc_id = f"{doctype}:{docname}"
+ self.sql("DELETE FROM search_fts WHERE doc_id = ?", (doc_id,), commit=True)
+
+ # Utility Methods
+
+ def _update_progress(self, message, progress, total=100, absolute=True):
+ """Update progress bar only if not running in a web request context or tests."""
+ if not hasattr(frappe.local, "request") and not frappe.flags.in_test:
+ update_progress_bar(message, progress, total, absolute=absolute)
+
+ def _validate_config(self):
+ """Validate document configuration at startup."""
+ metadata_fields = self.schema["metadata_fields"]
+
+ for doctype, config in self.doc_configs.items():
+ # Validate that all specified fields are present in the 'fields' list
+ fields_to_check = ["content_field", "title_field"]
+ if "modified" in metadata_fields:
+ fields_to_check.append("modified_field")
+
+ for field_key in fields_to_check:
+ field_value = config.get(field_key)
+ if field_value and field_value not in config["fields"]:
+ raise ValueError(
+ f"{field_key.replace('_', ' ').title()} '{field_value}' not found in 'fields' list for Doctype '{doctype}'"
+ )
+
+ def _empty_search_result(self, title_only=False, filters=None):
+ """Return empty search result structure."""
+ return {
+ "results": [],
+ "summary": {
+ "total_matches": 0,
+ "filtered_matches": 0,
+ "duration": 0,
+ "returned_matches": 0,
+ "corrected_words": None,
+ "corrected_query": None,
+ "title_only": title_only,
+ "applied_filters": filters or {},
+ },
+ }
+
+ def _get_db_path(self, is_temp=False):
+ """Get the path for the SQLite FTS database."""
+ site_path = frappe.get_site_path()
+ db_path = os.path.join(site_path, self.db_name)
+ if is_temp:
+ return db_path.replace(".db", ".temp.db")
+ return db_path
+
+ def _prepare_fts_query(self, query):
+ """Prepare query for FTS5 with proper escaping and operators."""
+ query = query.strip()
+ if not query:
+ return ""
+
+ # Simple query - split into terms and add wildcards for partial matching
+ terms = query.split()
+ fts_terms = []
+
+ for term in terms:
+ # Escape special FTS5 characters
+ term = term.replace('"', '""')
+ # Add wildcard for prefix matching
+ if len(term) > MIN_WORD_LENGTH - 1:
+ fts_terms.append(f'"{term}"*')
+ else:
+ fts_terms.append(f'"{term}"')
+
+ return " ".join(fts_terms)
+
+ def sql(self, query, params=None, read_only=False, commit=False):
+ """Execute a SQL query on the search database."""
+ conn = self._get_connection(read_only=read_only)
+ try:
+ cursor = conn.cursor()
+ cursor.execute(query, params or [])
+
+ if read_only:
+ return cursor.fetchall()
+
+ if commit:
+ conn.commit()
+
+ # For write operations, we might not need to return anything,
+ # but returning the cursor could be useful for getting rowcount, etc.
+ return cursor
+ finally:
+ conn.close()
+
+ def prepare_document(self, doc):
+ """Prepare a document for indexing by validating and transforming it."""
+ is_valid, config = self._validate_document_for_indexing(doc)
+ if not is_valid:
+ return None
+
+ document = {
+ "id": f"{doc.doctype}:{doc.name}",
+ "doctype": doc.doctype,
+ "name": doc.name,
+ }
+
+ self._add_text_fields_to_document(document, doc, config)
+ self._add_metadata_fields_to_document(document, doc, config)
+
+ return document
+
+ def _validate_document_for_indexing(self, doc):
+ """Run all validation checks for a document before indexing."""
+ if not hasattr(doc, "doctype") or not doc.doctype:
+ self._warn_missing_doctype(doc)
+ return False, None
+
+ if not hasattr(doc, "name") or not doc.name:
+ self._warn_missing_name(doc.doctype)
+ return False, None
+
+ config = self.doc_configs.get(doc.doctype)
+ if not config:
+ return False, None
+
+ text_fields = self.schema["text_fields"]
+
+ # Validate title field
+ if "title" in text_fields:
+ title_field = config.get("title_field")
+ if title_field and (not hasattr(doc, title_field) or getattr(doc, title_field, None) is None):
+ self._warn_missing_title_field(doc.doctype, doc.name, title_field)
+ return False, None
+
+ # Validate content field
+ if "content" in text_fields:
+ content_field = config["content_field"]
+ if not hasattr(doc, content_field) or getattr(doc, content_field, None) is None:
+ self._warn_missing_content_field(doc.doctype, doc.name, content_field)
+ return False, None
+
+ return True, config
+
+ def _add_text_fields_to_document(self, document, doc, config):
+ """Populate text fields in the document for indexing."""
+ text_fields = self.schema["text_fields"]
+ title_field = config.get("title_field")
+ content_field = config["content_field"]
+
+ for field in text_fields:
+ if field == "title":
+ if title_field:
+ raw_title = getattr(doc, title_field, "") or ""
+ document["title"] = self._process_content(raw_title)
+ else:
+ document["title"] = "" # No title field configured
+ elif field == "content":
+ raw_content = getattr(doc, content_field, "") or ""
+ document["content"] = self._process_content(raw_content)
+ else:
+ # Handle other custom text fields
+ raw_text = getattr(doc, field, "")
+ document[field] = self._process_content(raw_text)
+
+ def _add_metadata_fields_to_document(self, document, doc, config):
+ """Populate metadata fields in the document for indexing."""
+ metadata_fields = self.schema["metadata_fields"]
+
+ for field in metadata_fields:
+ if field in document: # Skip already populated fields (id, doctype, name)
+ continue
+
+ if field == "modified":
+ modified_field = config["modified_field"]
+ modified_value = getattr(doc, modified_field, None)
+ if modified_value:
+ if not isinstance(modified_value, datetime.datetime):
+ modified_value = frappe.utils.get_datetime(modified_value)
+ document["modified"] = modified_value.timestamp()
+ continue
+
+ # Handle other metadata fields with potential mapping
+ field_mappings = config.get("field_mappings", {})
+ actual_field = field_mappings.get(field, field)
+ value = getattr(doc, actual_field, None)
+
+ # Convert Mock objects to strings to avoid database errors
+ if value is not None and hasattr(value, "_mock_name"):
+ value = str(value)
+
+ document[field] = value
+
+ def _process_content(self, content):
+ """Process content to remove HTML tags, links, and images for better indexing quality."""
+ if not content:
+ return ""
+
+ # Convert to string in case it's a Mock object or other type
+ content = str(content)
+
+ soup = BeautifulSoup(content, "html.parser")
+
+ # Extract text content from links before removing HTML tags
+ for link in soup.find_all("a"):
+ link_text = link.get_text().strip()
+ if link_text:
+ link.replace_with(link_text)
+ else:
+ link.replace_with("[link]")
+
+ text = soup.get_text(separator=" ").strip() # remove tags
+ text = re.sub(r"https?://[^\s]+", "[link]", text) # replace standalone links
+ text = re.sub(r"\s+", " ", text).strip() # normalize whitespace
+ return text
+
+ def _generate_trigrams(self, word):
+ """Generate trigrams for a word for fuzzy matching."""
+ word = f" {word.lower()} " # Add padding
+ return [word[i : i + 3] for i in range(len(word) - 2)]
+
+ def _print_warning_summary(self):
+ """Print a summary of warnings collected during indexing."""
+ if not self.warnings:
+ return
+
+ print("\n" + "=" * 60)
+ print("SEARCH INDEX BUILD WARNINGS")
+ print("=" * 60)
+
+ # Group warnings by type
+ warning_groups: dict[WarningType, list[IndexWarning]] = {}
+ for warning in self.warnings:
+ warning_groups.setdefault(warning.type, []).append(warning)
+
+ # Define display names for warning types
+ type_display_names = {
+ WarningType.INVALID_DOCUMENT: "Invalid Documents",
+ WarningType.MISSING_TEXT_FIELDS: "Missing Text Fields",
+ WarningType.MISSING_CONTENT_FIELD: "Missing Content Field",
+ WarningType.MISSING_TITLE_FIELD: "Missing Title Field",
+ WarningType.MISSING_DOCTYPE: "Missing Document Type",
+ WarningType.MISSING_NAME: "Missing Document Name",
+ WarningType.OTHER: "Other Issues",
+ }
+
+ # Print grouped warnings
+ for warning_type, warnings in warning_groups.items():
+ display_name = type_display_names.get(warning_type, warning_type.value.title())
+ print(f"\n{display_name} ({len(warnings)} warnings):")
+ print("-" * 50)
+
+ for warning in warnings[:5]: # Show first 5 warnings of each type
+ print(f" • {warning.message}")
+
+ if len(warnings) > 5:
+ print(f" ... and {len(warnings) - 5} more")
+
+ print(f"\nTotal warnings: {len(self.warnings)}")
+ print("=" * 60)
+
+ # Warning helper methods (utility functions)
+
+ def _add_warning(self, warning_type: WarningType, message: str, **kwargs):
+ """Add a structured warning to the warnings list."""
+ warning = IndexWarning(type=warning_type, message=message, **kwargs)
+ self.warnings.append(warning)
+
+ def _warn_invalid_document(self, doc: dict, reason: str):
+ """Add warning for invalid document."""
+ self._add_warning(
+ WarningType.INVALID_DOCUMENT,
+ f"Skipping document with {reason}: {doc}",
+ doctype=doc.get("doctype"),
+ docname=doc.get("name"),
+ )
+
+ def _warn_missing_text_fields(self, doctype: str, docname: str, missing_fields: list):
+ """Add warning for missing text fields."""
+ self._add_warning(
+ WarningType.MISSING_TEXT_FIELDS,
+ f"Document {doctype}:{docname} missing text fields: {missing_fields}",
+ doctype=doctype,
+ docname=docname,
+ missing_fields=missing_fields,
+ )
+
+ def _warn_missing_content_field(self, doctype: str, docname: str, field: str):
+ """Add warning for missing content field."""
+ self._add_warning(
+ WarningType.MISSING_CONTENT_FIELD,
+ f"Document {doctype}:{docname} missing content field '{field}'",
+ doctype=doctype,
+ docname=docname,
+ field=field,
+ )
+
+ def _warn_missing_title_field(self, doctype: str, docname: str, field: str):
+ """Add warning for missing title field."""
+ self._add_warning(
+ WarningType.MISSING_TITLE_FIELD,
+ f"Document {doctype}:{docname} missing title field '{field}'",
+ doctype=doctype,
+ docname=docname,
+ field=field,
+ )
+
+ def _warn_missing_doctype(self, doc: Any):
+ """Add warning for missing doctype."""
+ self._add_warning(
+ WarningType.MISSING_DOCTYPE,
+ f"Document missing doctype: {doc}",
+ docname=getattr(doc, "name", None),
+ )
+
+ def _warn_missing_name(self, doctype: str):
+ """Add warning for missing name."""
+ self._add_warning(WarningType.MISSING_NAME, f"Document missing name: {doctype}", doctype=doctype)
+
+ def get_warning_statistics(self) -> dict[str, Any]:
+ """Get warning statistics for programmatic use."""
+ if not self.warnings:
+ return {"total": 0, "by_type": {}}
+
+ stats = {"total": len(self.warnings), "by_type": {}}
+
+ for warning in self.warnings:
+ warning_type = warning.type.value
+ if warning_type not in stats["by_type"]:
+ stats["by_type"][warning_type] = {"count": 0, "examples": []}
+
+ stats["by_type"][warning_type]["count"] += 1
+
+ # Keep a few examples
+ if len(stats["by_type"][warning_type]["examples"]) < 3:
+ stats["by_type"][warning_type]["examples"].append(
+ {
+ "message": warning.message,
+ "doctype": warning.doctype,
+ "docname": warning.docname,
+ "field": warning.field,
+ "missing_fields": warning.missing_fields,
+ }
+ )
+
+ return stats
+
+
+# Module-level Functions for background tasks
+
+
+def build_index_if_not_exists():
+ """Build index if it doesn't exist."""
+ search_classes = get_search_classes()
+
+ for SearchClass in search_classes:
+ build_index(SearchClass, force=False)
+
+
+def build_index(
+ SearchClass: type[SQLiteSearch] | None = None, search_class_path: str | None = None, force: bool = False
+):
+ """Build search index for SearchClass"""
+ if not SearchClass and not search_class_path:
+ raise ValueError("Either SearchClass or search_class_path must be provided")
+
+ if search_class_path:
+ SearchClass = frappe.get_attr(search_class_path)
+
+ search = SearchClass()
+ if not search.is_search_enabled():
+ return
+ if not search.index_exists() or force:
+ print(f"{SearchClass.__name__}: Index does not exist, building...")
+ search.build_index()
+
+
+def build_index_in_background():
+ """Enqueue index building in background."""
+ search_classes = get_search_classes()
+ for SearchClass in search_classes:
+ search = SearchClass()
+ if not search.is_search_enabled():
+ return
+ search_class_path = f"{SearchClass.__module__}.{SearchClass.__name__}"
+ print(f"Enqueuing {search_class_path}.build_index")
+ frappe.enqueue(
+ "frappe.search.sqlite_search.build_index",
+ queue="long",
+ job_id=search_class_path,
+ deduplicate=True,
+ # build_index args
+ search_class_path=search_class_path,
+ force=True,
+ )
+
+
+def update_doc_index(doc: Document, method=None):
+ search_classes = get_search_classes()
+
+ for SearchClass in search_classes:
+ search = SearchClass()
+
+ if not (search.is_search_enabled() and search.index_exists()):
+ return
+
+ for doctype, config in search.doc_configs.items():
+ if doc.doctype == doctype:
+ fields = config.get("fields", [])
+ if not fields:
+ continue
+
+ any_field_changed = any(doc.has_value_changed(field) for field in fields)
+ if any_field_changed:
+ print(f"Enqueuing {search.__class__.__name__}.index_doc for {doc.doctype}:{doc.name}")
+ search.index_doc(doctype, doc.name)
+
+
+def delete_doc_index(doc: Document, method=None):
+ search_classes = get_search_classes()
+
+ for SearchClass in search_classes:
+ search = SearchClass()
+
+ if not (search.is_search_enabled() and search.index_exists()):
+ return
+
+ for doctype, config in search.doc_configs.items():
+ if doc.doctype == doctype:
+ fields = config.get("fields", [])
+ if not fields:
+ continue
+
+ print(f"Enqueuing {search.__class__.__name__}.remove_doc for {doc.doctype}:{doc.name}")
+ search.remove_doc(doctype, doc.name)
+
+
+def get_search_classes() -> list[type[SQLiteSearch]]:
+ module_paths = frappe.get_hooks("sqlite_search")
+ search_classes = [frappe.get_attr(path) for path in module_paths]
+
+ for search_class in search_classes:
+ # validate if search classes extend from SQLiteSearch
+ if not issubclass(search_class, SQLiteSearch):
+ raise TypeError(f"Search class {search_class.__name__} must extend SQLiteSearch")
+
+ return search_classes
diff --git a/frappe/tests/test_sqlite_search.py b/frappe/tests/test_sqlite_search.py
new file mode 100644
index 0000000000..be88901ebc
--- /dev/null
+++ b/frappe/tests/test_sqlite_search.py
@@ -0,0 +1,505 @@
+import os
+import sqlite3
+import time
+from typing import ClassVar
+from unittest.mock import patch
+
+import frappe
+from frappe.search.sqlite_search import SQLiteSearch, SQLiteSearchIndexMissingError
+from frappe.tests import IntegrationTestCase
+
+
+class TestSQLiteSearch(SQLiteSearch):
+ """Test implementation of SQLiteSearch for testing purposes."""
+
+ INDEX_NAME = "test_search.db"
+
+ INDEX_SCHEMA: ClassVar = {
+ "text_fields": ["title", "content"],
+ "metadata_fields": ["doctype", "name", "owner", "modified"],
+ "tokenizer": "unicode61 remove_diacritics 2",
+ }
+
+ INDEXABLE_DOCTYPES: ClassVar = {
+ "Note": {
+ "fields": ["name", "title", "content", "owner", {"modified": "creation"}],
+ },
+ "ToDo": {
+ "fields": ["name", {"title": "description"}, {"content": "description"}, "owner", "modified"],
+ },
+ "User": {
+ "fields": ["name", {"title": "full_name"}, {"content": "email"}, "name", "modified"],
+ "filters": {"enabled": 1},
+ },
+ }
+
+ def get_search_filters(self):
+ """Return permission filters - for testing, allow all documents."""
+ if frappe.session.user == "Administrator":
+ return {}
+ # Simulate user-specific filtering
+ return {"owner": frappe.session.user}
+
+
+class TestSQLiteSearchAPI(IntegrationTestCase):
+ """Test suite for SQLiteSearch public API functionality."""
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+ cls.search = TestSQLiteSearch()
+ # Clean up any existing test database
+ cls.search.drop_index()
+
+ @classmethod
+ def tearDownClass(cls):
+ super().tearDownClass()
+ # Clean up test database
+ cls.search.drop_index()
+
+ def setUp(self):
+ """Set up test data for each test."""
+ super().setUp()
+ # Create test documents
+ self.test_notes = []
+ self.test_todos = []
+
+ # Create test notes with different content
+ note_data = [
+ {"title": "Python Programming Guide", "content": "Learn Python basics and advanced concepts"},
+ {"title": "Project Management Tips", "content": "How to manage software projects effectively"},
+ {"title": "Cooking Recipe Collection", "content": "Delicious recipes for home cooking"},
+ {
+ "title": "Machine Learning Tutorial",
+ "content": "Introduction to ML algorithms and Python implementation",
+ },
+ ]
+
+ for data in note_data:
+ note = frappe.get_doc({"doctype": "Note", "title": data["title"], "content": data["content"]})
+ note.insert()
+ self.test_notes.append(note)
+
+ # Create test todos
+ todo_data = [
+ {"description": "Review Python code for search functionality"},
+ {"description": "Update project documentation"},
+ {"description": "Plan team meeting agenda"},
+ ]
+
+ for data in todo_data:
+ todo = frappe.get_doc({"doctype": "ToDo", "description": data["description"], "status": "Open"})
+ todo.insert()
+ self.test_todos.append(todo)
+
+ def tearDown(self):
+ """Clean up test data after each test."""
+ # Delete test documents
+ for note in self.test_notes:
+ try:
+ note.delete()
+ except Exception:
+ pass
+
+ for todo in self.test_todos:
+ try:
+ todo.delete()
+ except Exception:
+ pass
+
+ super().tearDown()
+
+ def test_index_lifecycle_and_status_methods(self):
+ """Test index building, existence checking, and status validation."""
+ # Initially index should not exist
+ self.search.drop_index() # Ensure clean state
+ self.assertFalse(self.search.index_exists())
+
+ # Should raise error when trying to search without index
+ with self.assertRaises(SQLiteSearchIndexMissingError):
+ self.search.raise_if_not_indexed()
+
+ # Build index
+ self.search.build_index()
+
+ # Now index should exist
+ self.assertTrue(self.search.index_exists())
+
+ # Should not raise error now
+ try:
+ self.search.raise_if_not_indexed()
+ except SQLiteSearchIndexMissingError:
+ self.fail("raise_if_not_indexed() raised exception when index exists")
+
+ # Verify database file exists and has correct tables
+ self.assertTrue(os.path.exists(self.search.db_path))
+
+ conn = sqlite3.connect(self.search.db_path)
+ cursor = conn.cursor()
+
+ # Check if FTS table exists
+ cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='search_fts'")
+ self.assertTrue(cursor.fetchone())
+
+ # Check if vocabulary tables exist
+ cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='search_vocabulary'")
+ self.assertTrue(cursor.fetchone())
+
+ cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='search_trigrams'")
+ self.assertTrue(cursor.fetchone())
+
+ conn.close()
+
+ # Test drop_index method
+ self.search.drop_index()
+ self.assertFalse(self.search.index_exists())
+ self.assertFalse(os.path.exists(self.search.db_path))
+
+ # Dropping non-existent index should not raise error
+ self.search.drop_index() # Should not raise error
+
+ def test_basic_search_functionality(self):
+ """Test core search functionality with various query types."""
+ # Build index first
+ self.search.build_index()
+
+ # Test basic text search
+ results = self.search.search("Python")
+ self.assertGreater(len(results["results"]), 0)
+ self.assertIn("Python", results["results"][0]["title"] + results["results"][0]["content"])
+
+ # Verify result structure
+ result = results["results"][0]
+ required_fields = [
+ "id",
+ "title",
+ "content",
+ "doctype",
+ "name",
+ "score",
+ "original_rank",
+ "modified_rank",
+ ]
+ for field in required_fields:
+ self.assertIn(field, result)
+
+ # Test case-insensitive search
+ results_lower = self.search.search("python")
+ results_upper = self.search.search("PYTHON")
+ self.assertEqual(len(results_lower["results"]), len(results_upper["results"]))
+
+ # Test partial word matching
+ results = self.search.search("prog") # Should match "Programming"
+ self.assertGreater(len(results["results"]), 0)
+
+ # Test multi-word search
+ results = self.search.search("Python programming")
+ self.assertGreater(len(results["results"]), 0)
+
+ # Test empty query
+ results = self.search.search("")
+ self.assertEqual(len(results["results"]), 0)
+
+ # Test title-only search
+ results = self.search.search("Python", title_only=True)
+ self.assertGreater(len(results["results"]), 0)
+ for result in results["results"]:
+ self.assertIn("Python", result["title"])
+
+ def test_search_filtering_and_permissions(self):
+ """Test search filtering and permission-based result filtering."""
+ self.search.build_index()
+
+ # Test basic filtering by doctype
+ results = self.search.search("", filters={"doctype": "Note"})
+ for result in results["results"]:
+ self.assertEqual(result["doctype"], "Note")
+
+ # Test filtering with list values
+ results = self.search.search("", filters={"doctype": ["Note", "ToDo"]})
+ for result in results["results"]:
+ self.assertIn(result["doctype"], ["Note", "ToDo"])
+
+ # Test empty filter list (should return no results)
+ results = self.search.search("", filters={"doctype": []})
+ self.assertEqual(len(results["results"]), 0)
+
+ # Test permission filtering by switching users
+ original_user = frappe.session.user
+ try:
+ # Create a test user and switch to them
+ test_user_email = "test_search_user@example.com"
+ if not frappe.db.exists("User", test_user_email):
+ test_user = frappe.get_doc(
+ {
+ "doctype": "User",
+ "email": test_user_email,
+ "first_name": "Test",
+ "last_name": "User",
+ "enabled": 1,
+ }
+ )
+ test_user.insert()
+
+ frappe.set_user(test_user_email)
+
+ # Search should now filter by owner (based on our test implementation)
+ results = self.search.search("Python")
+ # Results should be limited based on permission filters
+ self.assertIsInstance(results["results"], list)
+
+ finally:
+ frappe.set_user(original_user)
+
+ def test_advanced_scoring_and_ranking(self):
+ """Test scoring pipeline, ranking, and result ordering."""
+ self.search.build_index()
+
+ # Search for a term that appears in multiple documents
+ results = self.search.search("Python")
+
+ # Verify results are sorted by score (descending)
+ scores = [result["score"] for result in results["results"]]
+ self.assertEqual(scores, sorted(scores, reverse=True))
+
+ # Verify both original and modified rankings are present
+ for i, result in enumerate(results["results"]):
+ self.assertEqual(result["modified_rank"], i + 1)
+ self.assertIsInstance(result["original_rank"], int)
+ self.assertGreater(result["original_rank"], 0)
+
+ # Test title boost - documents with search term in title should rank higher
+ results = self.search.search("Programming")
+ title_match_found = False
+ for result in results["results"]:
+ if "Programming" in result["title"]:
+ title_match_found = True
+ # Title matches should have higher scores
+ self.assertGreater(result["score"], 1.0)
+ break
+ self.assertTrue(title_match_found, "No title matches found for scoring test")
+
+ # Test that BM25 score is included
+ for result in results["results"]:
+ self.assertIn("bm25_score", result)
+ self.assertIsInstance(result["bm25_score"], (int, float))
+
+ def test_spelling_correction_and_query_expansion(self):
+ """Test spelling correction and query expansion functionality."""
+ self.search.build_index()
+
+ # Test with a misspelled word that should be corrected
+ results = self.search.search("Pythom") # Misspelled "Python"
+
+ # Check if corrections were applied
+ summary = results["summary"]
+ if summary.get("corrected_words"):
+ self.assertIsInstance(summary["corrected_words"], dict)
+ self.assertIsInstance(summary["corrected_query"], str)
+
+ # Even with misspelling, we should get some results due to correction
+ # (This might not always work depending on vocabulary, so we test gracefully)
+ self.assertIsInstance(results["results"], list)
+
+ # Test with a completely made-up word
+ results = self.search.search("xyzabc123nonexistent")
+ # Should return empty results or minimal results
+ self.assertLessEqual(len(results["results"]), 1)
+
+ def test_document_indexing_operations(self):
+ """Test individual document indexing and removal operations."""
+ self.search.build_index()
+
+ # Create a new document after index is built
+ new_note = frappe.get_doc(
+ {
+ "doctype": "Note",
+ "title": "Newly Added Document",
+ "content": "This document was added after initial indexing",
+ }
+ )
+ new_note.insert()
+
+ try:
+ # Initially, the new document shouldn't be in search results
+ results = self.search.search("Newly Added Document")
+ initial_count = len(results["results"])
+
+ # Index the new document
+ self.search.index_doc("Note", new_note.name)
+
+ # Now it should be findable
+ results = self.search.search("Newly Added Document")
+ self.assertGreater(len(results["results"]), initial_count)
+
+ # Verify the document is in results
+ found = False
+ for result in results["results"]:
+ if result["name"] == new_note.name:
+ found = True
+ break
+ self.assertTrue(found, "Newly indexed document not found in search results")
+
+ # Remove the document from index
+ self.search.remove_doc("Note", new_note.name)
+
+ # Should not be findable anymore
+ results = self.search.search("Newly Added Document")
+ found = False
+ for result in results["results"]:
+ if result["name"] == new_note.name:
+ found = True
+ break
+ self.assertFalse(found, "Removed document still found in search results")
+
+ finally:
+ new_note.delete()
+
+ def test_search_result_summary_and_metadata(self):
+ """Test search result summary and metadata information."""
+ self.search.build_index()
+
+ results = self.search.search("Python")
+ summary = results["summary"]
+
+ # Verify summary structure
+ required_summary_fields = [
+ "total_matches",
+ "filtered_matches",
+ "returned_matches",
+ "duration",
+ "title_only",
+ "applied_filters",
+ ]
+ for field in required_summary_fields:
+ self.assertIn(field, summary)
+
+ # Verify summary values make sense
+ self.assertIsInstance(summary["duration"], (int, float))
+ self.assertGreater(summary["duration"], 0)
+ self.assertEqual(summary["total_matches"], summary["filtered_matches"])
+ self.assertEqual(summary["filtered_matches"], len(results["results"]))
+ self.assertFalse(summary["title_only"])
+ self.assertEqual(summary["applied_filters"], {})
+
+ # Test with filters applied
+ results = self.search.search("Python", filters={"doctype": "Note"})
+ summary = results["summary"]
+ self.assertEqual(summary["applied_filters"], {"doctype": "Note"})
+
+ # Test title-only search
+ results = self.search.search("Python", title_only=True)
+ summary = results["summary"]
+ self.assertTrue(summary["title_only"])
+
+ def test_configuration_and_schema_validation(self):
+ """Test configuration validation and schema handling."""
+
+ # Test invalid configuration
+ class InvalidSearchClass(SQLiteSearch):
+ # Missing required INDEX_SCHEMA
+ INDEXABLE_DOCTYPES: ClassVar = {"Note": {"fields": ["name", "title"]}}
+
+ def get_search_filters(self):
+ return {}
+
+ with self.assertRaises(ValueError):
+ InvalidSearchClass()
+
+ # Test invalid doctype configuration
+ class InvalidDoctypeConfig(SQLiteSearch):
+ INDEX_SCHEMA: ClassVar = {"text_fields": ["title", "content"]}
+ INDEXABLE_DOCTYPES: ClassVar = {
+ "Note": {
+ # Missing 'fields' key
+ "title_field": "title"
+ }
+ }
+
+ def get_search_filters(self):
+ return {}
+
+ with self.assertRaises(ValueError):
+ InvalidDoctypeConfig()
+
+ def test_content_processing_and_html_handling(self):
+ """Test content processing including HTML tag removal and text normalization."""
+ self.search.build_index()
+
+ # Create a note with HTML content
+ html_note = frappe.get_doc(
+ {
+ "doctype": "Note",
+ "title": "HTML Content Test",
+ "content": "This is bold text with links and
line breaks.
",
+ }
+ )
+ html_note.insert()
+
+ try:
+ # Index the document
+ self.search.index_doc("Note", html_note.name)
+
+ # Search should find processed content
+ results = self.search.search("bold text links")
+
+ # Should find the document
+ found = False
+ for result in results["results"]:
+ if result["name"] == html_note.name:
+ found = True
+ # Content should be processed (HTML tags removed)
+ self.assertNotIn("