""" Complete OCR PDF Upload, Indexing, and Search Test for LightRAG Web UI Tests the entire workflow: upload ocr.pdf → indexing → search functionality """ import requests import time import json import os from pathlib import Path # Configuration BASE_URL = "http://localhost:3015" API_KEY = "jleu1212" USERNAME = "jleu3482" PASSWORD = "jleu1212" OCR_PDF_PATH = "ocr.pdf" TEST_QUERIES = [ "LightRAG", "OCR", "document processing", "text extraction" ] class LightRAGWebUITest: def __init__(self): self.session = requests.Session() self.base_url = BASE_URL self.api_key = API_KEY def test_health(self): """Test server health""" print("=== Testing Server Health ===") try: response = self.session.get(f"{self.base_url}/api/health") if response.status_code == 200: print("✅ Server is healthy") return True else: print(f"❌ Server health check failed: {response.status_code}") return False except Exception as e: print(f"❌ Server health check error: {e}") return False def login(self): """Login to web UI""" print("\n=== Testing Web UI Login ===") try: # Get login page response = self.session.get(f"{self.base_url}/webui/") if response.status_code == 200: print("✅ Web UI login page accessible") # Try to access authenticated endpoint headers = {"Authorization": f"Bearer {self.api_key}"} response = self.session.get( f"{self.base_url}/api/documents", headers=headers ) if response.status_code == 200: print("✅ API authentication working") return True else: print(f"⚠️ API auth returned {response.status_code}, but login page works") return True else: print(f"❌ Web UI login page failed: {response.status_code}") return False except Exception as e: print(f"❌ Web UI login error: {e}") return False def upload_ocr_pdf(self): """Upload ocr.pdf file""" print("\n=== Uploading OCR PDF ===") if not os.path.exists(OCR_PDF_PATH): print(f"❌ OCR PDF file not found: {OCR_PDF_PATH}") return False try: # Prepare file for upload files = { 'file': (os.path.basename(OCR_PDF_PATH), open(OCR_PDF_PATH, 'rb'), 'application/pdf') } headers = {"Authorization": f"Bearer {self.api_key}"} print(f"📤 Uploading {OCR_PDF_PATH}...") response = self.session.post( f"{self.base_url}/api/upload", files=files, headers=headers ) if response.status_code == 200: result = response.json() print(f"✅ Upload successful: {result}") return True else: print(f"❌ Upload failed: {response.status_code} - {response.text}") return False except Exception as e: print(f"❌ Upload error: {e}") return False def wait_for_indexing(self, timeout=120): """Wait for document indexing to complete""" print(f"\n=== Waiting for Indexing (max {timeout}s) ===") headers = {"Authorization": f"Bearer {self.api_key}"} start_time = time.time() while time.time() - start_time < timeout: try: # Check document status response = self.session.get( f"{self.base_url}/api/documents", headers=headers ) if response.status_code == 200: documents = response.json() if documents: doc = documents[0] # Check first document status = doc.get('status', 'unknown') print(f"📊 Document status: {status}") if status == 'indexed': print("✅ Document indexing completed!") return True elif status == 'error': print("❌ Document indexing failed!") return False else: print("📭 No documents found yet...") # Check processing queue response = self.session.get( f"{self.base_url}/api/queue", headers=headers ) if response.status_code == 200: queue_info = response.json() pending = queue_info.get('pending', 0) processing = queue_info.get('processing', 0) print(f"🔄 Queue: {pending} pending, {processing} processing") if pending == 0 and processing == 0: print("✅ Queue processing completed!") return True time.sleep(5) # Wait 5 seconds between checks except Exception as e: print(f"⚠️ Error checking indexing status: {e}") time.sleep(5) print("⏰ Indexing timeout reached") return False def test_search_queries(self): """Test search functionality with OCR content""" print("\n=== Testing Search Queries ===") headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } successful_searches = 0 for query in TEST_QUERIES: print(f"\n🔍 Testing query: '{query}'") try: payload = { "query": query, "top_k": 5 } response = self.session.post( f"{self.base_url}/api/search", json=payload, headers=headers ) if response.status_code == 200: results = response.json() if results and len(results) > 0: print(f"✅ Search successful: Found {len(results)} results") successful_searches += 1 # Show first result snippet first_result = results[0] content_preview = first_result.get('content', '')[:200] + "..." print(f" 📄 First result preview: {content_preview}") else: print(f"⚠️ Search returned no results for: '{query}'") else: print(f"❌ Search failed: {response.status_code} - {response.text}") except Exception as e: print(f"❌ Search error for '{query}': {e}") print(f"\n📊 Search Summary: {successful_searches}/{len(TEST_QUERIES)} queries successful") return successful_searches > 0 def check_database_storage(self): """Verify data is stored in all databases""" print("\n=== Checking Database Storage ===") headers = {"Authorization": f"Bearer {self.api_key}"} try: # Check vector storage (chunks) response = self.session.get( f"{self.base_url}/api/stats", headers=headers ) if response.status_code == 200: stats = response.json() print(f"📊 System Stats: {stats}") chunk_count = stats.get('chunk_count', 0) entity_count = stats.get('entity_count', 0) relationship_count = stats.get('relationship_count', 0) print(f" 📦 Chunks: {chunk_count}") print(f" 🏷️ Entities: {entity_count}") print(f" 🔗 Relationships: {relationship_count}") if chunk_count > 0: print("✅ Data stored in vector database") return True else: print("⚠️ No chunks found in vector database") return False else: print(f"❌ Could not get stats: {response.status_code}") return False except Exception as e: print(f"❌ Database check error: {e}") return False def run_complete_test(self): """Run the complete OCR PDF workflow test""" print("🚀 Starting Complete OCR PDF Web UI Workflow Test") print("=" * 60) test_results = {} # Step 1: Test server health test_results['health'] = self.test_health() # Step 2: Test login test_results['login'] = self.login() # Step 3: Upload OCR PDF test_results['upload'] = self.upload_ocr_pdf() # Step 4: Wait for indexing if test_results['upload']: test_results['indexing'] = self.wait_for_indexing() else: test_results['indexing'] = False # Step 5: Test search queries if test_results['indexing']: test_results['search'] = self.test_search_queries() else: test_results['search'] = False # Step 6: Check database storage test_results['storage'] = self.check_database_storage() # Summary print("\n" + "=" * 60) print("📋 TEST SUMMARY") print("=" * 60) for test_name, result in test_results.items(): status = "✅ PASS" if result else "❌ FAIL" print(f"{test_name.upper():<12} : {status}") overall_success = all(test_results.values()) if overall_success: print("\n🎉 ALL TESTS PASSED! OCR PDF workflow is working correctly.") print(" - Upload successful") print(" - Indexing completed") print(" - Search functionality working") print(" - Data stored in databases") else: print("\n⚠️ SOME TESTS FAILED. Check the logs above for details.") return overall_success def main(): """Main test execution""" test = LightRAGWebUITest() success = test.run_complete_test() # Exit with appropriate code exit(0 if success else 1) if __name__ == "__main__": main()