#!/usr/bin/env python3 """ Selenium OCR Performance Test Tests OCR performance through the web UI to measure end-to-end performance """ import time import os import sys import json from datetime import datetime from pathlib import Path from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.chrome.options import Options from selenium.common.exceptions import TimeoutException, NoSuchElementException import pandas as pd class OCRPerformanceTester: """Selenium-based OCR performance tester""" def __init__(self, base_url="http://localhost:3015", headless=False): """ Initialize the performance tester Args: base_url: Base URL of the LightRAG web UI headless: Run browser in headless mode """ self.base_url = base_url self.headless = headless self.driver = None self.results = [] self.test_start_time = None def setup_driver(self): """Setup Chrome WebDriver with options""" chrome_options = Options() if self.headless: chrome_options.add_argument("--headless") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--window-size=1920,1080") self.driver = webdriver.Chrome(options=chrome_options) self.driver.implicitly_wait(10) def login(self, username="admin", password="admin"): """Login to the web UI""" print(f"๐Ÿ” Logging in to {self.base_url}") self.driver.get(f"{self.base_url}/login") try: # Wait for login form username_field = WebDriverWait(self.driver, 10).until( EC.presence_of_element_located((By.NAME, "username")) ) password_field = self.driver.find_element(By.NAME, "password") login_button = self.driver.find_element(By.XPATH, "//button[contains(text(), 'Login')]") # Fill credentials username_field.send_keys(username) password_field.send_keys(password) login_button.click() # Wait for login to complete WebDriverWait(self.driver, 10).until( EC.presence_of_element_located((By.XPATH, "//h1[contains(text(), 'Dashboard')]")) ) print("โœ… Login successful") return True except TimeoutException: print("โš ๏ธ Login timeout - assuming already logged in or no auth required") return True except Exception as e: print(f"โŒ Login failed: {e}") return False def upload_document(self, file_path, document_type="pdf"): """ Upload a document and measure upload/processing time Args: file_path: Path to document file document_type: Type of document (pdf, docx, etc.) Returns: dict: Performance metrics """ if not os.path.exists(file_path): print(f"โŒ File not found: {file_path}") return None file_name = os.path.basename(file_path) file_size = os.path.getsize(file_path) print(f"๐Ÿ“ค Uploading {file_name} ({file_size:,} bytes)") metrics = { "file_name": file_name, "file_size": file_size, "document_type": document_type, "upload_start": time.time() } try: # Navigate to upload page self.driver.get(f"{self.base_url}/upload") # Wait for upload form file_input = WebDriverWait(self.driver, 10).until( EC.presence_of_element_located((By.XPATH, "//input[@type='file']")) ) # Upload file file_input.send_keys(os.path.abspath(file_path)) metrics["upload_end"] = time.time() metrics["upload_time"] = metrics["upload_end"] - metrics["upload_start"] print(f" ๐Ÿ“Š Upload time: {metrics['upload_time']:.2f}s") # Wait for processing to start processing_start = time.time() # Look for processing indicators try: # Wait for processing status element processing_element = WebDriverWait(self.driver, 30).until( EC.presence_of_element_located((By.XPATH, "//*[contains(text(), 'Processing') or contains(text(), 'processing')]")) ) metrics["processing_start"] = processing_start # Wait for completion try: # Look for success message success_element = WebDriverWait(self.driver, 300).until( # 5 minute timeout for OCR EC.presence_of_element_located((By.XPATH, "//*[contains(text(), 'Success') or contains(text(), 'success') or contains(text(), 'Complete') or contains(text(), 'complete')]")) ) processing_end = time.time() metrics["processing_end"] = processing_end metrics["processing_time"] = processing_end - processing_start print(f" ๐Ÿ“Š Processing time: {metrics['processing_time']:.2f}s") # Try to get OCR-specific metrics try: # Look for OCR statistics page_elements = self.driver.find_elements(By.XPATH, "//*[contains(text(), 'page') or contains(text(), 'Page')]") for element in page_elements: text = element.text if "page" in text.lower(): # Extract page count import re match = re.search(r'(\d+)\s+page', text) if match: metrics["pages_processed"] = int(match.group(1)) break except: pass # Total time metrics["total_time"] = metrics["upload_time"] + metrics["processing_time"] print(f" ๐Ÿ“Š Total time: {metrics['total_time']:.2f}s") return metrics except TimeoutException: print("โŒ Processing timeout (5 minutes)") metrics["error"] = "Processing timeout" return metrics except TimeoutException: print("โŒ Processing indicator not found") metrics["error"] = "Processing indicator not found" return metrics except Exception as e: print(f"โŒ Upload failed: {e}") metrics["error"] = str(e) return metrics def search_document(self, query, expected_results=1): """ Search for content in uploaded documents Args: query: Search query expected_results: Expected number of results Returns: dict: Search performance metrics """ print(f"๐Ÿ” Searching for: '{query}'") metrics = { "query": query, "search_start": time.time() } try: # Navigate to search page self.driver.get(f"{self.base_url}/search") # Find search input search_input = WebDriverWait(self.driver, 10).until( EC.presence_of_element_located((By.XPATH, "//input[@type='search' or contains(@placeholder, 'Search')]")) ) # Enter search query search_input.clear() search_input.send_keys(query) # Find and click search button search_button = self.driver.find_element(By.XPATH, "//button[contains(text(), 'Search') or contains(@class, 'search')]") search_button.click() # Wait for results search_end = time.time() metrics["search_end"] = search_end metrics["search_time"] = search_end - metrics["search_start"] print(f" ๐Ÿ“Š Search time: {metrics['search_time']:.2f}s") # Count results try: results = WebDriverWait(self.driver, 10).until( EC.presence_of_all_elements_located((By.XPATH, "//div[contains(@class, 'result') or contains(@class, 'card')]")) ) metrics["results_found"] = len(results) print(f" ๐Ÿ“Š Results found: {len(results)}") # Check if OCR content is in results ocr_content_found = False for result in results[:3]: # Check first 3 results if "ocr" in result.text.lower() or "scanned" in result.text.lower(): ocr_content_found = True break metrics["ocr_content_found"] = ocr_content_found except TimeoutException: metrics["results_found"] = 0 print(" โš ๏ธ No results found or timeout") return metrics except Exception as e: print(f"โŒ Search failed: {e}") metrics["error"] = str(e) return metrics def run_performance_test_suite(self, test_files): """ Run comprehensive performance test suite Args: test_files: List of test file paths Returns: list: All performance metrics """ print("๐Ÿš€ STARTING OCR PERFORMANCE TEST SUITE") print("=" * 60) self.test_start_time = time.time() all_metrics = [] # Login first if not self.login(): print("โŒ Cannot proceed without login") return all_metrics # Test each file for file_path in test_files: if not os.path.exists(file_path): print(f"โš ๏ธ Skipping missing file: {file_path}") continue print(f"\n๐Ÿ“„ TESTING FILE: {os.path.basename(file_path)}") print("-" * 40) # Upload and process upload_metrics = self.upload_document(file_path) if upload_metrics: all_metrics.append(upload_metrics) # Wait a bit between tests time.sleep(2) # Search for OCR content search_queries = ["text", "document", "content", "scanned"] for query in search_queries: search_metrics = self.search_document(query) if search_metrics: search_metrics["test_file"] = os.path.basename(file_path) all_metrics.append(search_metrics) time.sleep(1) # Calculate summary statistics self.calculate_summary(all_metrics) return all_metrics def calculate_summary(self, all_metrics): """Calculate and display summary statistics""" print("\n" + "=" * 60) print("๐Ÿ“Š PERFORMANCE TEST SUMMARY") print("=" * 60) # Filter upload metrics upload_metrics = [m for m in all_metrics if "upload_time" in m] search_metrics = [m for m in all_metrics if "search_time" in m] if upload_metrics: print(f"\n๐Ÿ“ค UPLOAD & PROCESSING ({len(upload_metrics)} tests):") upload_times = [m["upload_time"] for m in upload_metrics if "upload_time" in m] processing_times = [m["processing_time"] for m in upload_metrics if "processing_time" in m] total_times = [m["total_time"] for m in upload_metrics if "total_time" in m] if upload_times: print(f" Upload Time: Avg={sum(upload_times)/len(upload_times):.2f}s, Min={min(upload_times):.2f}s, Max={max(upload_times):.2f}s") if processing_times: print(f" Processing Time: Avg={sum(processing_times)/len(processing_times):.2f}s, Min={min(processing_times):.2f}s, Max={max(processing_times):.2f}s") if total_times: print(f" Total Time: Avg={sum(total_times)/len(total_times):.2f}s, Min={min(total_times):.2f}s, Max={max(total_times):.2f}s") if search_metrics: print(f"\n๐Ÿ” SEARCH ({len(search_metrics)} tests):") search_times = [m["search_time"] for m in search_metrics if "search_time" in m] if search_times: print(f" Search Time: Avg={sum(search_times)/len(search_times):.2f}s, Min={min(search_times):.2f}s, Max={max(search_times):.2f}s") ocr_found = [m for m in search_metrics if m.get("ocr_content_found", False)] print(f" OCR Content Found: {len(ocr_found)}/{len(search_metrics)} searches") # Overall test duration if self.test_start_time: total_duration = time.time() - self.test_start_time print(f"\nโฑ๏ธ TOTAL TEST DURATION: {total_duration:.2f}s") def save_results(self, results, output_file="ocr_performance_results.json"): """Save test results to JSON file""" # Add timestamp results_data = { "timestamp": datetime.now().isoformat(), "base_url": self.base_url, "results": results } with open(output_file, 'w') as f: json.dump(results_data, f, indent=2, default=str) print(f"\n๐Ÿ’พ Results saved to: {output_file}") # Also save as CSV for easier analysis self.save_results_csv(results, "ocr_performance_results.csv") def save_results_csv(self, results, output_file="ocr_performance_results.csv"): """Save test results to CSV file""" if not results: return # Flatten results for CSV flat_results = [] for result in results: flat_result = {} for key, value in result.items(): if isinstance(value, (dict, list)): flat_result[key] = str(value) else: flat_result[key] = value flat_results.append(flat_result) df = pd.DataFrame(flat_results) df.to_csv(output_file, index=False) print(f"๐Ÿ“Š CSV results saved to: {output_file}") def cleanup(self): """Cleanup resources""" if self.driver: self.driver.quit() print("\n๐Ÿงน Browser closed") def main(): """Main test execution""" import argparse parser = argparse.ArgumentParser(description="Selenium OCR Performance Test") parser.add_argument("--url", default="http://localhost:3015", help="Base URL of LightRAG web UI") parser.add_argument("--headless", action="store_true", help="Run browser in headless mode") parser.add_argument("--files", nargs="+", help="Test files to upload") parser.add_argument("--output", default="ocr_performance_results.json", help="Output file for results") args = parser.parse_args() # Default test files if none provided test_files = args.files or [ "ocr.pdf", # OCR test PDF "test_meaningful.pdf", # Text-based PDF "test.docx" # Word document ] # Filter to existing files existing_files = [f for f in test_files if os.path.exists(f)] if not existing_files: print("โŒ No test files found. Please provide valid file paths.") return print(f"๐Ÿ“ Test files: {existing_files}") # Create tester instance tester = OCRPerformanceTester(base_url=args.url, headless=args.headless) try: # Setup and run tests tester.setup_driver() results = tester.run_performance_test_suite(existing_files) # Save results if results: tester.save_results(results, args.output) else: print("โŒ No results collected") except Exception as e: print(f"โŒ Test execution failed: {e}") import traceback traceback.print_exc() finally: tester.cleanup() if __name__ == "__main__": main()