Files
railseek6/test_entity_extraction_workaround.py

219 lines
6.4 KiB
Python

import requests
import json
import time
import os
# Configuration
BASE_URL = "http://localhost:3015"
HEADERS = {
"Content-Type": "application/json"
}
def get_auth_token():
"""Get authentication token by logging in"""
try:
response = requests.post(
f"{BASE_URL}/login",
data={"username": "jleu3482", "password": "jleu1212"}
)
if response.status_code == 200:
result = response.json()
return result.get('access_token')
else:
print(f"Login failed: {response.text}")
return None
except Exception as e:
print(f"Login error: {e}")
return None
def test_health():
"""Test server health"""
try:
auth_token = get_auth_token()
if not auth_token:
return False
headers = HEADERS.copy()
headers["Authorization"] = f"Bearer {auth_token}"
response = requests.get(f"{BASE_URL}/health", headers=headers)
print(f"Health check: {response.status_code}")
if response.status_code == 200:
print("Server is healthy")
return True
else:
print(f"Health check failed: {response.text}")
return False
except Exception as e:
print(f"Health check error: {e}")
return False
def upload_ocr_pdf():
"""Upload OCR PDF file for testing"""
file_path = "ocr.pdf"
if not os.path.exists(file_path):
print(f"File {file_path} not found")
return None
try:
auth_token = get_auth_token()
if not auth_token:
return None
# Upload file - use only Authorization header for multipart upload
with open(file_path, 'rb') as f:
files = {'file': (os.path.basename(file_path), f, 'application/pdf')}
upload_headers = {"Authorization": f"Bearer {auth_token}"}
response = requests.post(
f"{BASE_URL}/documents/upload",
files=files,
headers=upload_headers
)
print(f"Upload response: {response.status_code}")
if response.status_code == 200:
result = response.json()
print(f"Upload successful: {result}")
return result.get('track_id')
else:
print(f"Upload failed: {response.text}")
return None
except Exception as e:
print(f"Upload error: {e}")
return None
def check_document_status(track_id):
"""Check document processing status"""
try:
auth_token = get_auth_token()
if not auth_token:
return None
headers = HEADERS.copy()
headers["Authorization"] = f"Bearer {auth_token}"
response = requests.get(
f"{BASE_URL}/documents",
headers=headers
)
if response.status_code == 200:
docs = response.json()
print(f"Total documents: {len(docs)}")
# Find our document
for doc in docs:
if doc.get('track_id') == track_id:
print(f"Document status: {doc.get('status')}")
print(f"File path: {doc.get('file_path')}")
print(f"Chunks count: {doc.get('chunks_count')}")
return doc.get('status')
print("Document not found in status list")
return None
else:
print(f"Status check failed: {response.text}")
return None
except Exception as e:
print(f"Status check error: {e}")
return None
def test_search(query):
"""Test search functionality"""
try:
auth_token = get_auth_token()
if not auth_token:
return False
headers = HEADERS.copy()
headers["Authorization"] = f"Bearer {auth_token}"
payload = {
"query": query,
"mode": "hybrid",
"top_k": 5
}
response = requests.post(
f"{BASE_URL}/query",
json=payload,
headers=headers
)
if response.status_code == 200:
result = response.json()
print(f"Search successful for query: '{query}'")
print(f"Response: {result.get('response', 'No response')}")
print(f"Sources: {len(result.get('sources', []))}")
# Print sources
for i, source in enumerate(result.get('sources', [])):
print(f"Source {i+1}: {source.get('content', '')[:100]}...")
return True
else:
print(f"Search failed: {response.text}")
return False
except Exception as e:
print(f"Search error: {e}")
return False
def main():
print("Testing LightRAG with entity extraction workaround...")
# Step 1: Check server health
if not test_health():
print("Server is not healthy, exiting...")
return
# Step 2: Upload OCR PDF
print("\n--- Uploading OCR PDF ---")
track_id = upload_ocr_pdf()
if not track_id:
print("Failed to upload OCR PDF")
return
print(f"Uploaded with track_id: {track_id}")
# Step 3: Monitor processing status
print("\n--- Monitoring processing status ---")
max_attempts = 30
for attempt in range(max_attempts):
status = check_document_status(track_id)
if status == "PROCESSED":
print("Document processing completed successfully!")
break
elif status == "FAILED":
print("Document processing failed!")
return
elif status == "PENDING" or status == "PROCESSING":
print(f"Processing... attempt {attempt + 1}/{max_attempts}")
time.sleep(10)
else:
print(f"Unknown status: {status}")
time.sleep(10)
# Step 4: Test search functionality
print("\n--- Testing search functionality ---")
# Test queries based on OCR content
test_queries = [
"Windows Server",
"system requirements",
"installation guide",
"hardware specifications"
]
for query in test_queries:
print(f"\nTesting query: '{query}'")
test_search(query)
time.sleep(2) # Small delay between queries
print("\n--- Test completed ---")
if __name__ == "__main__":
main()