219 lines
6.4 KiB
Python
219 lines
6.4 KiB
Python
import requests
|
|
import json
|
|
import time
|
|
import os
|
|
|
|
# Configuration
|
|
BASE_URL = "http://localhost:3015"
|
|
HEADERS = {
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
def get_auth_token():
|
|
"""Get authentication token by logging in"""
|
|
try:
|
|
response = requests.post(
|
|
f"{BASE_URL}/login",
|
|
data={"username": "jleu3482", "password": "jleu1212"}
|
|
)
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
return result.get('access_token')
|
|
else:
|
|
print(f"Login failed: {response.text}")
|
|
return None
|
|
except Exception as e:
|
|
print(f"Login error: {e}")
|
|
return None
|
|
|
|
def test_health():
|
|
"""Test server health"""
|
|
try:
|
|
auth_token = get_auth_token()
|
|
if not auth_token:
|
|
return False
|
|
|
|
headers = HEADERS.copy()
|
|
headers["Authorization"] = f"Bearer {auth_token}"
|
|
|
|
response = requests.get(f"{BASE_URL}/health", headers=headers)
|
|
print(f"Health check: {response.status_code}")
|
|
if response.status_code == 200:
|
|
print("Server is healthy")
|
|
return True
|
|
else:
|
|
print(f"Health check failed: {response.text}")
|
|
return False
|
|
except Exception as e:
|
|
print(f"Health check error: {e}")
|
|
return False
|
|
|
|
def upload_ocr_pdf():
|
|
"""Upload OCR PDF file for testing"""
|
|
file_path = "ocr.pdf"
|
|
|
|
if not os.path.exists(file_path):
|
|
print(f"File {file_path} not found")
|
|
return None
|
|
|
|
try:
|
|
auth_token = get_auth_token()
|
|
if not auth_token:
|
|
return None
|
|
|
|
# Upload file - use only Authorization header for multipart upload
|
|
with open(file_path, 'rb') as f:
|
|
files = {'file': (os.path.basename(file_path), f, 'application/pdf')}
|
|
upload_headers = {"Authorization": f"Bearer {auth_token}"}
|
|
response = requests.post(
|
|
f"{BASE_URL}/documents/upload",
|
|
files=files,
|
|
headers=upload_headers
|
|
)
|
|
|
|
print(f"Upload response: {response.status_code}")
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
print(f"Upload successful: {result}")
|
|
return result.get('track_id')
|
|
else:
|
|
print(f"Upload failed: {response.text}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
print(f"Upload error: {e}")
|
|
return None
|
|
|
|
def check_document_status(track_id):
|
|
"""Check document processing status"""
|
|
try:
|
|
auth_token = get_auth_token()
|
|
if not auth_token:
|
|
return None
|
|
|
|
headers = HEADERS.copy()
|
|
headers["Authorization"] = f"Bearer {auth_token}"
|
|
|
|
response = requests.get(
|
|
f"{BASE_URL}/documents",
|
|
headers=headers
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
docs = response.json()
|
|
print(f"Total documents: {len(docs)}")
|
|
|
|
# Find our document
|
|
for doc in docs:
|
|
if doc.get('track_id') == track_id:
|
|
print(f"Document status: {doc.get('status')}")
|
|
print(f"File path: {doc.get('file_path')}")
|
|
print(f"Chunks count: {doc.get('chunks_count')}")
|
|
return doc.get('status')
|
|
|
|
print("Document not found in status list")
|
|
return None
|
|
else:
|
|
print(f"Status check failed: {response.text}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
print(f"Status check error: {e}")
|
|
return None
|
|
|
|
def test_search(query):
|
|
"""Test search functionality"""
|
|
try:
|
|
auth_token = get_auth_token()
|
|
if not auth_token:
|
|
return False
|
|
|
|
headers = HEADERS.copy()
|
|
headers["Authorization"] = f"Bearer {auth_token}"
|
|
|
|
payload = {
|
|
"query": query,
|
|
"mode": "hybrid",
|
|
"top_k": 5
|
|
}
|
|
|
|
response = requests.post(
|
|
f"{BASE_URL}/query",
|
|
json=payload,
|
|
headers=headers
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
print(f"Search successful for query: '{query}'")
|
|
print(f"Response: {result.get('response', 'No response')}")
|
|
print(f"Sources: {len(result.get('sources', []))}")
|
|
|
|
# Print sources
|
|
for i, source in enumerate(result.get('sources', [])):
|
|
print(f"Source {i+1}: {source.get('content', '')[:100]}...")
|
|
|
|
return True
|
|
else:
|
|
print(f"Search failed: {response.text}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"Search error: {e}")
|
|
return False
|
|
|
|
def main():
|
|
print("Testing LightRAG with entity extraction workaround...")
|
|
|
|
# Step 1: Check server health
|
|
if not test_health():
|
|
print("Server is not healthy, exiting...")
|
|
return
|
|
|
|
# Step 2: Upload OCR PDF
|
|
print("\n--- Uploading OCR PDF ---")
|
|
track_id = upload_ocr_pdf()
|
|
if not track_id:
|
|
print("Failed to upload OCR PDF")
|
|
return
|
|
|
|
print(f"Uploaded with track_id: {track_id}")
|
|
|
|
# Step 3: Monitor processing status
|
|
print("\n--- Monitoring processing status ---")
|
|
max_attempts = 30
|
|
for attempt in range(max_attempts):
|
|
status = check_document_status(track_id)
|
|
if status == "PROCESSED":
|
|
print("Document processing completed successfully!")
|
|
break
|
|
elif status == "FAILED":
|
|
print("Document processing failed!")
|
|
return
|
|
elif status == "PENDING" or status == "PROCESSING":
|
|
print(f"Processing... attempt {attempt + 1}/{max_attempts}")
|
|
time.sleep(10)
|
|
else:
|
|
print(f"Unknown status: {status}")
|
|
time.sleep(10)
|
|
|
|
# Step 4: Test search functionality
|
|
print("\n--- Testing search functionality ---")
|
|
|
|
# Test queries based on OCR content
|
|
test_queries = [
|
|
"Windows Server",
|
|
"system requirements",
|
|
"installation guide",
|
|
"hardware specifications"
|
|
]
|
|
|
|
for query in test_queries:
|
|
print(f"\nTesting query: '{query}'")
|
|
test_search(query)
|
|
time.sleep(2) # Small delay between queries
|
|
|
|
print("\n--- Test completed ---")
|
|
|
|
if __name__ == "__main__":
|
|
main() |