#!/usr/bin/env python3 """ Comprehensive Selenium test for LightRAG workflow. Tests: server startup, login, document upload, indexing, and search. """ import os import sys import time import subprocess import requests import json import threading from pathlib import Path from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException # Configuration SERVER_URL = "http://localhost:3015" USERNAME = "jleu3482" PASSWORD = "jleu1212" TEST_PDF = "test/ocr.pdf" # Relative to workspace directory WORKSPACE_DIR = "c:/aaWORK/railseek6" def start_server(): """Start LightRAG server using zrun.bat""" print("Starting LightRAG server...") # Kill any existing server on port 3015 try: subprocess.run(["taskkill", "/F", "/IM", "python.exe"], capture_output=True) except: pass # Start server in background bat_path = os.path.join(WORKSPACE_DIR, "zrun.bat") if not os.path.exists(bat_path): print(f"ERROR: zrun.bat not found at {bat_path}") return None process = subprocess.Popen( [bat_path], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, encoding='utf-8', bufsize=1, universal_newlines=True ) # Wait for server to start print("Waiting for server to start...") for i in range(30): # Wait up to 30 seconds try: response = requests.get(f"{SERVER_URL}/health", timeout=5) if response.status_code == 200: print(f"Server started successfully (attempt {i+1})") return process except: pass time.sleep(1) print("ERROR: Server failed to start within 30 seconds") if process: process.terminate() return None def check_server_health(): """Check if server is healthy""" try: response = requests.get(f"{SERVER_URL}/health", timeout=10) if response.status_code == 200: data = response.json() print(f"Server health: {data.get('status', 'unknown')}") print(f"Auth mode: {data.get('auth_mode', 'unknown')}") print(f"LLM binding: {data.get('configuration', {}).get('llm_binding', 'unknown')}") return True except Exception as e: print(f"Health check failed: {e}") return False def selenium_login(driver): """Login using Selenium WebDriver""" print("Logging in via web UI...") # Go to login page driver.get(f"{SERVER_URL}/webui") time.sleep(2) # Check if login form exists try: username_field = driver.find_element(By.NAME, "username") password_field = driver.find_element(By.NAME, "password") login_button = driver.find_element(By.XPATH, "//button[contains(text(), 'Login')]") # Fill credentials username_field.clear() username_field.send_keys(USERNAME) password_field.clear() password_field.send_keys(PASSWORD) login_button.click() # Wait for login to complete time.sleep(3) # Check if login was successful if "login" not in driver.current_url.lower(): print("Login successful") return True else: print("Login may have failed") return False except NoSuchElementException: print("Login form not found - may already be logged in or auth disabled") # Check if we're already on main page if "webui" in driver.current_url: print("Already on webui page") return True return False def upload_document(driver): """Upload test PDF document""" print("Uploading document...") # Navigate to upload page driver.get(f"{SERVER_URL}/webui") time.sleep(2) # Look for upload button or form try: # Try to find file input file_input = driver.find_element(By.XPATH, "//input[@type='file']") # Get absolute path to test PDF pdf_path = os.path.join(WORKSPACE_DIR, TEST_PDF) if not os.path.exists(pdf_path): print(f"ERROR: Test PDF not found at {pdf_path}") # Try alternative location pdf_path = os.path.join(WORKSPACE_DIR, "ocr.pdf") if not os.path.exists(pdf_path): print(f"ERROR: Test PDF not found at {pdf_path} either") return False print(f"Uploading PDF: {pdf_path}") file_input.send_keys(pdf_path) # Look for upload button and click it upload_button = driver.find_element(By.XPATH, "//button[contains(text(), 'Upload') or contains(text(), 'upload')]") upload_button.click() # Wait for upload to complete time.sleep(5) # Check for success message try: success_elem = driver.find_element(By.XPATH, "//*[contains(text(), 'success') or contains(text(), 'Success') or contains(text(), 'uploaded')]") print(f"Upload success message: {success_elem.text[:100]}") return True except: print("No success message found, but upload may have completed") return True except NoSuchElementException as e: print(f"Upload form not found: {e}") # Try alternative approach - check if document was already uploaded return check_document_status() def check_document_status(): """Check document status via API""" print("Checking document status via API...") try: # Get list of documents response = requests.get(f"{SERVER_URL}/api/documents", timeout=10) if response.status_code == 200: documents = response.json() print(f"Found {len(documents)} documents") for doc in documents[:5]: # Show first 5 print(f" - {doc.get('filename', 'unknown')}: {doc.get('status', 'unknown')}") return len(documents) > 0 except Exception as e: print(f"Error checking document status: {e}") return False def test_search(): """Test search functionality""" print("Testing search...") # Test simple search query test_queries = ["railway", "train", "station", "transport"] for query in test_queries: try: response = requests.post( f"{SERVER_URL}/api/query", json={"query": query, "top_k": 5}, timeout=30 ) if response.status_code == 200: results = response.json() print(f"Search for '{query}': {len(results.get('results', []))} results") # Check if deepseek API was used (should be in response) if "llm_response" in results: print(f" LLM response present (DeepSeek API used)") return True else: print(f" No LLM response in results") else: print(f"Search failed for '{query}': {response.status_code}") except Exception as e: print(f"Search error for '{query}': {e}") return False def check_indexing_components(): """Check if indexing components are being used""" print("Checking indexing components...") # Check server logs for evidence of components log_file = os.path.join(WORKSPACE_DIR, "LightRAG-main", "logs", "lightrag.log") if os.path.exists(log_file): try: with open(log_file, 'r', encoding='utf-8') as f: log_content = f.read() components = { "openclip": "openclip" in log_content.lower(), "paddleocr": "paddleocr" in log_content.lower() or "ocr" in log_content.lower(), "spacy": "spacy" in log_content.lower() or "entity" in log_content.lower(), "deepseek": "deepseek" in log_content.lower() } print("Indexing components found in logs:") for component, found in components.items(): print(f" - {component}: {'YES' if found else 'NO'}") return any(components.values()) except Exception as e: print(f"Error reading log file: {e}") print("Log file not found or unreadable") return False def test_endpoints(): """Test various API endpoints""" print("Testing API endpoints...") endpoints = [ ("/health", "GET"), ("/auth-status", "GET"), ("/api/documents", "GET"), ("/api/workspaces", "GET"), ] all_working = True for endpoint, method in endpoints: try: if method == "GET": response = requests.get(f"{SERVER_URL}{endpoint}", timeout=10) else: response = requests.post(f"{SERVER_URL}{endpoint}", timeout=10) if response.status_code in [200, 201]: print(f"✓ {endpoint}: {response.status_code}") else: print(f"✗ {endpoint}: {response.status_code}") all_working = False except Exception as e: print(f"✗ {endpoint}: ERROR - {e}") all_working = False return all_working def main(): """Main test function""" print("=" * 60) print("LightRAG Comprehensive Selenium Test") print("=" * 60) # Change to workspace directory os.chdir(WORKSPACE_DIR) # Step 1: Start server server_process = start_server() if not server_process: print("FAILED: Could not start server") return False # Give server time to fully initialize time.sleep(5) # Step 2: Check server health if not check_server_health(): print("FAILED: Server health check failed") server_process.terminate() return False # Step 3: Test endpoints if not test_endpoints(): print("WARNING: Some endpoints not working") # Step 4: Setup Selenium print("Setting up Selenium WebDriver...") try: options = webdriver.ChromeOptions() options.add_argument('--headless') # Run in headless mode options.add_argument('--no-sandbox') options.add_argument('--disable-dev-shm-usage') driver = webdriver.Chrome(options=options) driver.implicitly_wait(10) except Exception as e: print(f"ERROR: Could not start WebDriver: {e}") print("Trying Firefox...") try: options = webdriver.FirefoxOptions() options.add_argument('--headless') driver = webdriver.Firefox(options=options) driver.implicitly_wait(10) except Exception as e2: print(f"ERROR: Could not start any WebDriver: {e2}") print("Skipping Selenium tests, using API only") driver = None test_results = { "server_started": True, "health_check": True, "endpoints_tested": test_endpoints(), "selenium_login": False, "document_upload": False, "search_works": False, "indexing_components": False } # Step 5: Selenium login (if WebDriver available) if driver: try: test_results["selenium_login"] = selenium_login(driver) # Step 6: Upload document if test_results["selenium_login"]: test_results["document_upload"] = upload_document(driver) # Wait for indexing print("Waiting for indexing to complete (30 seconds)...") time.sleep(30) # Step 7: Check indexing components test_results["indexing_components"] = check_indexing_components() # Step 8: Test search test_results["search_works"] = test_search() driver.quit() except Exception as e: print(f"ERROR in Selenium tests: {e}") if driver: driver.quit() else: # Without Selenium, try API-based tests print("Running API-only tests...") test_results["document_upload"] = check_document_status() test_results["indexing_components"] = check_indexing_components() test_results["search_works"] = test_search() # Step 9: Cleanup print("Cleaning up...") if server_process: server_process.terminate() server_process.wait() # Step 10: Report results print("\n" + "=" * 60) print("TEST RESULTS") print("=" * 60) all_passed = True for test_name, result in test_results.items(): status = "PASS" if result else "FAIL" if not result: all_passed = False print(f"{test_name}: {status}") print("\n" + "=" * 60) if all_passed: print("SUCCESS: All tests passed!") return True else: print("FAILURE: Some tests failed") # Generate error log error_log = { "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "test_results": test_results, "server_url": SERVER_URL, "username": USERNAME, "test_pdf": TEST_PDF } log_file = "lightrag_test_error_log.json" with open(log_file, 'w') as f: json.dump(error_log, f, indent=2) print(f"Error log saved to: {log_file}") return False if __name__ == "__main__": success = main() sys.exit(0 if success else 1)