Files
railseek6/selenium_ocr_performance_test.py

437 lines
17 KiB
Python

#!/usr/bin/env python3
"""
Selenium OCR Performance Test
Tests OCR performance through the web UI to measure end-to-end performance
"""
import time
import os
import sys
import json
from datetime import datetime
from pathlib import Path
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import TimeoutException, NoSuchElementException
import pandas as pd
class OCRPerformanceTester:
"""Selenium-based OCR performance tester"""
def __init__(self, base_url="http://localhost:3015", headless=False):
"""
Initialize the performance tester
Args:
base_url: Base URL of the LightRAG web UI
headless: Run browser in headless mode
"""
self.base_url = base_url
self.headless = headless
self.driver = None
self.results = []
self.test_start_time = None
def setup_driver(self):
"""Setup Chrome WebDriver with options"""
chrome_options = Options()
if self.headless:
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--window-size=1920,1080")
self.driver = webdriver.Chrome(options=chrome_options)
self.driver.implicitly_wait(10)
def login(self, username="admin", password="admin"):
"""Login to the web UI"""
print(f"🔐 Logging in to {self.base_url}")
self.driver.get(f"{self.base_url}/login")
try:
# Wait for login form
username_field = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.NAME, "username"))
)
password_field = self.driver.find_element(By.NAME, "password")
login_button = self.driver.find_element(By.XPATH, "//button[contains(text(), 'Login')]")
# Fill credentials
username_field.send_keys(username)
password_field.send_keys(password)
login_button.click()
# Wait for login to complete
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.XPATH, "//h1[contains(text(), 'Dashboard')]"))
)
print("✅ Login successful")
return True
except TimeoutException:
print("⚠️ Login timeout - assuming already logged in or no auth required")
return True
except Exception as e:
print(f"❌ Login failed: {e}")
return False
def upload_document(self, file_path, document_type="pdf"):
"""
Upload a document and measure upload/processing time
Args:
file_path: Path to document file
document_type: Type of document (pdf, docx, etc.)
Returns:
dict: Performance metrics
"""
if not os.path.exists(file_path):
print(f"❌ File not found: {file_path}")
return None
file_name = os.path.basename(file_path)
file_size = os.path.getsize(file_path)
print(f"📤 Uploading {file_name} ({file_size:,} bytes)")
metrics = {
"file_name": file_name,
"file_size": file_size,
"document_type": document_type,
"upload_start": time.time()
}
try:
# Navigate to upload page
self.driver.get(f"{self.base_url}/upload")
# Wait for upload form
file_input = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.XPATH, "//input[@type='file']"))
)
# Upload file
file_input.send_keys(os.path.abspath(file_path))
metrics["upload_end"] = time.time()
metrics["upload_time"] = metrics["upload_end"] - metrics["upload_start"]
print(f" 📊 Upload time: {metrics['upload_time']:.2f}s")
# Wait for processing to start
processing_start = time.time()
# Look for processing indicators
try:
# Wait for processing status element
processing_element = WebDriverWait(self.driver, 30).until(
EC.presence_of_element_located((By.XPATH, "//*[contains(text(), 'Processing') or contains(text(), 'processing')]"))
)
metrics["processing_start"] = processing_start
# Wait for completion
try:
# Look for success message
success_element = WebDriverWait(self.driver, 300).until( # 5 minute timeout for OCR
EC.presence_of_element_located((By.XPATH, "//*[contains(text(), 'Success') or contains(text(), 'success') or contains(text(), 'Complete') or contains(text(), 'complete')]"))
)
processing_end = time.time()
metrics["processing_end"] = processing_end
metrics["processing_time"] = processing_end - processing_start
print(f" 📊 Processing time: {metrics['processing_time']:.2f}s")
# Try to get OCR-specific metrics
try:
# Look for OCR statistics
page_elements = self.driver.find_elements(By.XPATH, "//*[contains(text(), 'page') or contains(text(), 'Page')]")
for element in page_elements:
text = element.text
if "page" in text.lower():
# Extract page count
import re
match = re.search(r'(\d+)\s+page', text)
if match:
metrics["pages_processed"] = int(match.group(1))
break
except:
pass
# Total time
metrics["total_time"] = metrics["upload_time"] + metrics["processing_time"]
print(f" 📊 Total time: {metrics['total_time']:.2f}s")
return metrics
except TimeoutException:
print("❌ Processing timeout (5 minutes)")
metrics["error"] = "Processing timeout"
return metrics
except TimeoutException:
print("❌ Processing indicator not found")
metrics["error"] = "Processing indicator not found"
return metrics
except Exception as e:
print(f"❌ Upload failed: {e}")
metrics["error"] = str(e)
return metrics
def search_document(self, query, expected_results=1):
"""
Search for content in uploaded documents
Args:
query: Search query
expected_results: Expected number of results
Returns:
dict: Search performance metrics
"""
print(f"🔍 Searching for: '{query}'")
metrics = {
"query": query,
"search_start": time.time()
}
try:
# Navigate to search page
self.driver.get(f"{self.base_url}/search")
# Find search input
search_input = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.XPATH, "//input[@type='search' or contains(@placeholder, 'Search')]"))
)
# Enter search query
search_input.clear()
search_input.send_keys(query)
# Find and click search button
search_button = self.driver.find_element(By.XPATH, "//button[contains(text(), 'Search') or contains(@class, 'search')]")
search_button.click()
# Wait for results
search_end = time.time()
metrics["search_end"] = search_end
metrics["search_time"] = search_end - metrics["search_start"]
print(f" 📊 Search time: {metrics['search_time']:.2f}s")
# Count results
try:
results = WebDriverWait(self.driver, 10).until(
EC.presence_of_all_elements_located((By.XPATH, "//div[contains(@class, 'result') or contains(@class, 'card')]"))
)
metrics["results_found"] = len(results)
print(f" 📊 Results found: {len(results)}")
# Check if OCR content is in results
ocr_content_found = False
for result in results[:3]: # Check first 3 results
if "ocr" in result.text.lower() or "scanned" in result.text.lower():
ocr_content_found = True
break
metrics["ocr_content_found"] = ocr_content_found
except TimeoutException:
metrics["results_found"] = 0
print(" ⚠️ No results found or timeout")
return metrics
except Exception as e:
print(f"❌ Search failed: {e}")
metrics["error"] = str(e)
return metrics
def run_performance_test_suite(self, test_files):
"""
Run comprehensive performance test suite
Args:
test_files: List of test file paths
Returns:
list: All performance metrics
"""
print("🚀 STARTING OCR PERFORMANCE TEST SUITE")
print("=" * 60)
self.test_start_time = time.time()
all_metrics = []
# Login first
if not self.login():
print("❌ Cannot proceed without login")
return all_metrics
# Test each file
for file_path in test_files:
if not os.path.exists(file_path):
print(f"⚠️ Skipping missing file: {file_path}")
continue
print(f"\n📄 TESTING FILE: {os.path.basename(file_path)}")
print("-" * 40)
# Upload and process
upload_metrics = self.upload_document(file_path)
if upload_metrics:
all_metrics.append(upload_metrics)
# Wait a bit between tests
time.sleep(2)
# Search for OCR content
search_queries = ["text", "document", "content", "scanned"]
for query in search_queries:
search_metrics = self.search_document(query)
if search_metrics:
search_metrics["test_file"] = os.path.basename(file_path)
all_metrics.append(search_metrics)
time.sleep(1)
# Calculate summary statistics
self.calculate_summary(all_metrics)
return all_metrics
def calculate_summary(self, all_metrics):
"""Calculate and display summary statistics"""
print("\n" + "=" * 60)
print("📊 PERFORMANCE TEST SUMMARY")
print("=" * 60)
# Filter upload metrics
upload_metrics = [m for m in all_metrics if "upload_time" in m]
search_metrics = [m for m in all_metrics if "search_time" in m]
if upload_metrics:
print(f"\n📤 UPLOAD & PROCESSING ({len(upload_metrics)} tests):")
upload_times = [m["upload_time"] for m in upload_metrics if "upload_time" in m]
processing_times = [m["processing_time"] for m in upload_metrics if "processing_time" in m]
total_times = [m["total_time"] for m in upload_metrics if "total_time" in m]
if upload_times:
print(f" Upload Time: Avg={sum(upload_times)/len(upload_times):.2f}s, Min={min(upload_times):.2f}s, Max={max(upload_times):.2f}s")
if processing_times:
print(f" Processing Time: Avg={sum(processing_times)/len(processing_times):.2f}s, Min={min(processing_times):.2f}s, Max={max(processing_times):.2f}s")
if total_times:
print(f" Total Time: Avg={sum(total_times)/len(total_times):.2f}s, Min={min(total_times):.2f}s, Max={max(total_times):.2f}s")
if search_metrics:
print(f"\n🔍 SEARCH ({len(search_metrics)} tests):")
search_times = [m["search_time"] for m in search_metrics if "search_time" in m]
if search_times:
print(f" Search Time: Avg={sum(search_times)/len(search_times):.2f}s, Min={min(search_times):.2f}s, Max={max(search_times):.2f}s")
ocr_found = [m for m in search_metrics if m.get("ocr_content_found", False)]
print(f" OCR Content Found: {len(ocr_found)}/{len(search_metrics)} searches")
# Overall test duration
if self.test_start_time:
total_duration = time.time() - self.test_start_time
print(f"\n⏱️ TOTAL TEST DURATION: {total_duration:.2f}s")
def save_results(self, results, output_file="ocr_performance_results.json"):
"""Save test results to JSON file"""
# Add timestamp
results_data = {
"timestamp": datetime.now().isoformat(),
"base_url": self.base_url,
"results": results
}
with open(output_file, 'w') as f:
json.dump(results_data, f, indent=2, default=str)
print(f"\n💾 Results saved to: {output_file}")
# Also save as CSV for easier analysis
self.save_results_csv(results, "ocr_performance_results.csv")
def save_results_csv(self, results, output_file="ocr_performance_results.csv"):
"""Save test results to CSV file"""
if not results:
return
# Flatten results for CSV
flat_results = []
for result in results:
flat_result = {}
for key, value in result.items():
if isinstance(value, (dict, list)):
flat_result[key] = str(value)
else:
flat_result[key] = value
flat_results.append(flat_result)
df = pd.DataFrame(flat_results)
df.to_csv(output_file, index=False)
print(f"📊 CSV results saved to: {output_file}")
def cleanup(self):
"""Cleanup resources"""
if self.driver:
self.driver.quit()
print("\n🧹 Browser closed")
def main():
"""Main test execution"""
import argparse
parser = argparse.ArgumentParser(description="Selenium OCR Performance Test")
parser.add_argument("--url", default="http://localhost:3015", help="Base URL of LightRAG web UI")
parser.add_argument("--headless", action="store_true", help="Run browser in headless mode")
parser.add_argument("--files", nargs="+", help="Test files to upload")
parser.add_argument("--output", default="ocr_performance_results.json", help="Output file for results")
args = parser.parse_args()
# Default test files if none provided
test_files = args.files or [
"ocr.pdf", # OCR test PDF
"test_meaningful.pdf", # Text-based PDF
"test.docx" # Word document
]
# Filter to existing files
existing_files = [f for f in test_files if os.path.exists(f)]
if not existing_files:
print("❌ No test files found. Please provide valid file paths.")
return
print(f"📁 Test files: {existing_files}")
# Create tester instance
tester = OCRPerformanceTester(base_url=args.url, headless=args.headless)
try:
# Setup and run tests
tester.setup_driver()
results = tester.run_performance_test_suite(existing_files)
# Save results
if results:
tester.save_results(results, args.output)
else:
print("❌ No results collected")
except Exception as e:
print(f"❌ Test execution failed: {e}")
import traceback
traceback.print_exc()
finally:
tester.cleanup()
if __name__ == "__main__":
main()