seitime-frappe/frappe/tests/test_sqlite_search.py

571 lines
18 KiB
Python

import os
import sqlite3
import time
from typing import ClassVar
from unittest.mock import patch
import frappe
from frappe.search.sqlite_search import SQLiteSearch, SQLiteSearchIndexMissingError
from frappe.tests import IntegrationTestCase
class TestSQLiteSearch(SQLiteSearch):
"""Test implementation of SQLiteSearch for testing purposes."""
INDEX_NAME = "test_search.db"
INDEX_SCHEMA: ClassVar = {
"text_fields": ["title", "content"],
"metadata_fields": ["doctype", "name", "owner", "modified"],
"tokenizer": "unicode61 remove_diacritics 2",
}
INDEXABLE_DOCTYPES: ClassVar = {
"Note": {
"fields": ["name", "title", "content", "owner", {"modified": "creation"}],
},
"ToDo": {
"fields": ["name", {"title": "description"}, {"content": "description"}, "owner", "modified"],
},
"User": {
"fields": ["name", {"title": "full_name"}, {"content": "email"}, "name", "modified"],
"filters": {"enabled": 1},
},
}
def get_search_filters(self):
"""Return permission filters - for testing, allow all documents."""
if frappe.session.user == "Administrator":
return {}
# Simulate user-specific filtering
return {"owner": frappe.session.user}
class TestSQLiteSearchAPI(IntegrationTestCase):
"""Test suite for SQLiteSearch public API functionality."""
@classmethod
def setUpClass(cls):
super().setUpClass()
frappe.db.delete("Note")
frappe.db.delete("ToDo")
cls.search = TestSQLiteSearch()
# Clean up any existing test database
cls.search.drop_index()
@classmethod
def tearDownClass(cls):
super().tearDownClass()
# Clean up test database
cls.search.drop_index()
def setUp(self):
"""Set up test data for each test."""
super().setUp()
# Create test documents
self.test_notes = []
self.test_todos = []
# Create test notes with different content
note_data = [
{"title": "Python Programming Guide", "content": "Learn Python basics and advanced concepts"},
{"title": "Project Management Tips", "content": "How to manage software projects effectively"},
{"title": "Cooking Recipe Collection", "content": "Delicious recipes for home cooking"},
{
"title": "Machine Learning Tutorial",
"content": "Introduction to ML algorithms and Python implementation",
},
]
for data in note_data:
note = frappe.get_doc({"doctype": "Note", "title": data["title"], "content": data["content"]})
note.insert()
self.test_notes.append(note)
# Create test todos
todo_data = [
{"description": "Review Python code for search functionality"},
{"description": "Update project documentation"},
{"description": "Plan team meeting agenda"},
]
for data in todo_data:
todo = frappe.get_doc({"doctype": "ToDo", "description": data["description"], "status": "Open"})
todo.insert()
self.test_todos.append(todo)
def tearDown(self):
"""Clean up test data after each test."""
# Delete test documents
for note in self.test_notes:
try:
note.delete()
except Exception:
pass
for todo in self.test_todos:
try:
todo.delete()
except Exception:
pass
super().tearDown()
def test_index_lifecycle_and_status_methods(self):
"""Test index building, existence checking, and status validation."""
# Initially index should not exist
self.search.drop_index() # Ensure clean state
self.assertFalse(self.search.index_exists())
# Should raise error when trying to search without index
with self.assertRaises(SQLiteSearchIndexMissingError):
self.search.raise_if_not_indexed()
# Build index
self.search.build_index()
# Now index should exist
self.assertTrue(self.search.index_exists())
# Should not raise error now
try:
self.search.raise_if_not_indexed()
except SQLiteSearchIndexMissingError:
self.fail("raise_if_not_indexed() raised exception when index exists")
# Verify database file exists and has correct tables
self.assertTrue(os.path.exists(self.search.db_path))
conn = sqlite3.connect(self.search.db_path)
cursor = conn.cursor()
# Check if FTS table exists
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='search_fts'")
self.assertTrue(cursor.fetchone())
# Check if vocabulary tables exist
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='search_vocabulary'")
self.assertTrue(cursor.fetchone())
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='search_trigrams'")
self.assertTrue(cursor.fetchone())
conn.close()
# Test drop_index method
self.search.drop_index()
self.assertFalse(self.search.index_exists())
self.assertFalse(os.path.exists(self.search.db_path))
# Dropping non-existent index should not raise error
self.search.drop_index() # Should not raise error
def test_basic_search_functionality(self):
"""Test core search functionality with various query types."""
# Build index first
self.search.build_index()
# Test basic text search
results = self.search.search("Python")
self.assertGreater(len(results["results"]), 0)
self.assertIn("Python", results["results"][0]["title"] + results["results"][0]["content"])
# Verify result structure
result = results["results"][0]
required_fields = [
"id",
"title",
"content",
"doctype",
"name",
"score",
"original_rank",
"modified_rank",
]
for field in required_fields:
self.assertIn(field, result)
# Test case-insensitive search
results_lower = self.search.search("python")
results_upper = self.search.search("PYTHON")
self.assertEqual(len(results_lower["results"]), len(results_upper["results"]))
# Test partial word matching
results = self.search.search("prog") # Should match "Programming"
self.assertGreater(len(results["results"]), 0)
# Test multi-word search
results = self.search.search("Python programming")
self.assertGreater(len(results["results"]), 0)
# Test empty query
results = self.search.search("")
self.assertEqual(len(results["results"]), 0)
# Test title-only search
results = self.search.search("Python", title_only=True)
self.assertGreater(len(results["results"]), 0)
for result in results["results"]:
self.assertIn("Python", result["title"])
def test_search_filtering_and_permissions(self):
"""Test search filtering and permission-based result filtering."""
self.search.build_index()
# Test basic filtering by doctype
results = self.search.search("", filters={"doctype": "Note"})
for result in results["results"]:
self.assertEqual(result["doctype"], "Note")
# Test filtering with list values
results = self.search.search("", filters={"doctype": ["Note", "ToDo"]})
for result in results["results"]:
self.assertIn(result["doctype"], ["Note", "ToDo"])
# Test empty filter list (should return no results)
results = self.search.search("", filters={"doctype": []})
self.assertEqual(len(results["results"]), 0)
# Test permission filtering by switching users
original_user = frappe.session.user
try:
# Create a test user and switch to them
test_user_email = "test_search_user@example.com"
if not frappe.db.exists("User", test_user_email):
test_user = frappe.get_doc(
{
"doctype": "User",
"email": test_user_email,
"first_name": "Test",
"last_name": "User",
"enabled": 1,
}
)
test_user.insert()
frappe.set_user(test_user_email)
# Search should now filter by owner (based on our test implementation)
results = self.search.search("Python")
# Results should be limited based on permission filters
self.assertIsInstance(results["results"], list)
finally:
frappe.set_user(original_user)
def test_advanced_scoring_and_ranking(self):
"""Test scoring pipeline, ranking, and result ordering."""
self.search.build_index()
# Search for a term that appears in multiple documents
results = self.search.search("Python")
# Verify results are sorted by score (descending)
scores = [result["score"] for result in results["results"]]
self.assertEqual(scores, sorted(scores, reverse=True))
# Verify both original and modified rankings are present
for i, result in enumerate(results["results"]):
self.assertEqual(result["modified_rank"], i + 1)
self.assertIsInstance(result["original_rank"], int)
self.assertGreater(result["original_rank"], 0)
# Test title boost - documents with search term in title should rank higher
results = self.search.search("Programming")
title_match_found = False
for result in results["results"]:
if "Programming" in result["title"]:
title_match_found = True
# Title matches should have higher scores
self.assertGreater(result["score"], 1.0)
break
self.assertTrue(title_match_found, "No title matches found for scoring test")
# Test that BM25 score is included
for result in results["results"]:
self.assertIn("bm25_score", result)
self.assertIsInstance(result["bm25_score"], (int, float))
def test_spelling_correction_and_query_expansion(self):
"""Test spelling correction and query expansion functionality."""
self.search.build_index()
# Test with a misspelled word that should be corrected
results = self.search.search("Pythom") # Misspelled "Python"
# Check if corrections were applied
summary = results["summary"]
if summary.get("corrected_words"):
self.assertIsInstance(summary["corrected_words"], dict)
self.assertIsInstance(summary["corrected_query"], str)
# Even with misspelling, we should get some results due to correction
# (This might not always work depending on vocabulary, so we test gracefully)
self.assertIsInstance(results["results"], list)
# Test with a completely made-up word
results = self.search.search("xyzabc123nonexistent")
# Should return empty results or minimal results
self.assertLessEqual(len(results["results"]), 1)
def test_document_indexing_operations(self):
"""Test individual document indexing and removal operations."""
self.search.build_index()
# Create a new document after index is built
new_note = frappe.get_doc(
{
"doctype": "Note",
"title": "Newly Added Document",
"content": "This document was added after initial indexing",
}
)
new_note.insert()
try:
# Initially, the new document shouldn't be in search results
results = self.search.search("Newly Added Document")
initial_count = len(results["results"])
# Index the new document
self.search.index_doc("Note", new_note.name)
# Now it should be findable
results = self.search.search("Newly Added Document")
self.assertGreater(len(results["results"]), initial_count)
# Verify the document is in results
found = False
for result in results["results"]:
if result["name"] == new_note.name:
found = True
break
self.assertTrue(found, "Newly indexed document not found in search results")
# Remove the document from index
self.search.remove_doc("Note", new_note.name)
# Should not be findable anymore
results = self.search.search("Newly Added Document")
found = False
for result in results["results"]:
if result["name"] == new_note.name:
found = True
break
self.assertFalse(found, "Removed document still found in search results")
finally:
new_note.delete()
def test_search_result_summary_and_metadata(self):
"""Test search result summary and metadata information."""
self.search.build_index()
results = self.search.search("Python")
summary = results["summary"]
# Verify summary structure
required_summary_fields = [
"total_matches",
"filtered_matches",
"returned_matches",
"duration",
"title_only",
"applied_filters",
]
for field in required_summary_fields:
self.assertIn(field, summary)
# Verify summary values make sense
self.assertIsInstance(summary["duration"], (int, float))
self.assertGreater(summary["duration"], 0)
self.assertEqual(summary["total_matches"], summary["filtered_matches"])
self.assertEqual(summary["filtered_matches"], len(results["results"]))
self.assertFalse(summary["title_only"])
self.assertEqual(summary["applied_filters"], {})
# Test with filters applied
results = self.search.search("Python", filters={"doctype": "Note"})
summary = results["summary"]
self.assertEqual(summary["applied_filters"], {"doctype": "Note"})
# Test title-only search
results = self.search.search("Python", title_only=True)
summary = results["summary"]
self.assertTrue(summary["title_only"])
def test_configuration_and_schema_validation(self):
"""Test configuration validation and schema handling."""
# Test invalid configuration
class InvalidSearchClass(SQLiteSearch):
# Missing required INDEX_SCHEMA
INDEXABLE_DOCTYPES: ClassVar = {"Note": {"fields": ["name", "title"]}}
def get_search_filters(self):
return {}
with self.assertRaises(ValueError):
InvalidSearchClass()
# Test invalid doctype configuration
class InvalidDoctypeConfig(SQLiteSearch):
INDEX_SCHEMA: ClassVar = {"text_fields": ["title", "content"]}
INDEXABLE_DOCTYPES: ClassVar = {
"Note": {
# Missing 'fields' key
"title_field": "title"
}
}
def get_search_filters(self):
return {}
with self.assertRaises(ValueError):
InvalidDoctypeConfig()
def test_content_processing_and_html_handling(self):
"""Test content processing including HTML tag removal and text normalization."""
self.search.build_index()
# Create a note with HTML content
html_note = frappe.get_doc(
{
"doctype": "Note",
"title": "HTML Content Test",
"content": "<p>This is <strong>bold</strong> text with <a href='http://example.com'>links</a> and <br> line breaks.</p>",
}
)
html_note.insert()
try:
# Index the document
self.search.index_doc("Note", html_note.name)
# Search should find processed content
results = self.search.search("bold text links")
# Should find the document
found = False
for result in results["results"]:
if result["name"] == html_note.name:
found = True
# Content should be processed (HTML tags removed)
self.assertNotIn("<p>", result["content"])
self.assertNotIn("<strong>", result["content"])
self.assertIn("bold", result["content"])
self.assertNotIn(
"<a href='http://example.com'>", result["content"]
) # Links should be replaced
break
self.assertTrue(found, "HTML content document not found in search")
finally:
html_note.delete()
def test_search_disabled_state(self):
"""Test behavior when search is disabled."""
# Create a search class with search disabled
class DisabledSearch(TestSQLiteSearch):
def is_search_enabled(self):
return False
disabled_search = DisabledSearch()
disabled_search.drop_index() # Ensure clean state
# Should return empty results when disabled
results = disabled_search.search("Python")
self.assertEqual(len(results["results"]), 0)
# Build index should do nothing when disabled
disabled_search.build_index() # Should not raise error but do nothing
self.assertFalse(disabled_search.index_exists())
def test_deduplication_on_reindex(self):
"""Test that re-indexing the same document does not create duplicates."""
self.search.build_index()
# Create a test document
test_note = frappe.get_doc(
{
"doctype": "Note",
"title": "Deduplication Test Document",
"content": "This document tests deduplication functionality",
}
)
test_note.insert()
try:
# Index the document
self.search.index_doc("Note", test_note.name)
# Search for the document - should find exactly one result
results = self.search.search("Deduplication Test")
initial_count = len([r for r in results["results"] if r["name"] == test_note.name])
self.assertEqual(initial_count, 1, "Should find exactly one instance of the document")
# Re-index the same document multiple times
self.search.index_doc("Note", test_note.name)
self.search.index_doc("Note", test_note.name)
self.search.index_doc("Note", test_note.name)
# Search again - should still find exactly one result
results = self.search.search("Deduplication Test")
final_count = len([r for r in results["results"] if r["name"] == test_note.name])
self.assertEqual(final_count, 1, "Should still find exactly one instance after re-indexing")
# Update the document content and re-index
test_note.content = "Updated content for deduplication testing"
test_note.save()
self.search.index_doc("Note", test_note.name)
# Search with updated content - should find exactly one result with new content
results = self.search.search("Updated content deduplication")
updated_results = [r for r in results["results"] if r["name"] == test_note.name]
self.assertEqual(len(updated_results), 1, "Should find exactly one instance with updated content")
# Content may contain HTML markup from search highlighting, so check for words individually
self.assertIn("Updated", updated_results[0]["content"])
self.assertIn("content", updated_results[0]["content"])
# Rebuild entire index - should not create duplicates
self.search.build_index()
results = self.search.search("Deduplication Test")
rebuild_count = len([r for r in results["results"] if r["name"] == test_note.name])
self.assertEqual(rebuild_count, 1, "Should still find exactly one instance after full rebuild")
# Verify at database level - check raw count in FTS table
conn = sqlite3.connect(self.search.db_path)
cursor = conn.cursor()
doc_id = f"Note:{test_note.name}"
cursor.execute("SELECT COUNT(*) FROM search_fts WHERE doc_id = ?", (doc_id,))
db_count = cursor.fetchone()[0]
conn.close()
self.assertEqual(db_count, 1, "Database should contain exactly one entry for the document")
finally:
test_note.delete()
@patch("frappe.enqueue")
def test_background_operations(self, mock_enqueue):
"""Test background job integration and module-level functions."""
from frappe.search.sqlite_search import (
build_index_in_background,
get_search_classes,
)
# Test getting search classes
with patch("frappe.get_hooks") as mock_get_hooks:
mock_get_hooks.return_value = ["frappe.tests.test_sqlite_search.TestSQLiteSearch"]
classes = get_search_classes()
self.assertEqual(len(classes), 1)
self.assertEqual(classes[0], TestSQLiteSearch)
# Test background index building
with patch("frappe.get_hooks") as mock_get_hooks:
mock_get_hooks.return_value = ["frappe.tests.test_sqlite_search.TestSQLiteSearch"]
build_index_in_background()
# Should have enqueued a background job
self.assertTrue(mock_enqueue.called)