Files
railseek6/test_full_pipeline_workspace_isolation.py
2026-01-12 22:31:11 +08:00

418 lines
16 KiB
Python

#!/usr/bin/env python3
"""
Comprehensive test for workspace isolation issues:
1. Upload/indexing performance bottlenecks
2. Workspace isolation in retrieval
3. Data persistence after deletion
4. Refresh of Uploaded documents subscreen after workspace switch
"""
import asyncio
import os
import sys
import tempfile
import time
import json
import shutil
from pathlib import Path
import httpx
import subprocess
import threading
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent))
def start_server():
"""Start LightRAG server in a subprocess."""
cwd = Path(__file__).parent / "LightRAG-main"
env = os.environ.copy()
env['PYTHONPATH'] = str(cwd) + (os.pathsep + env.get('PYTHONPATH', ''))
# Use a different port to avoid conflicts
cmd = [sys.executable, 'lightrag_server.py', '--port', '8001']
proc = subprocess.Popen(
cmd,
cwd=cwd,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1
)
# Wait for server to start
for _ in range(30):
try:
response = httpx.get('http://localhost:8001', timeout=1)
if response.status_code < 500:
print("Server started successfully")
return proc
except:
time.sleep(1)
raise RuntimeError("Server failed to start")
async def test_upload_indexing_performance():
"""Test upload/indexing performance and identify bottlenecks."""
print("\n=== Testing Upload/Indexing Performance ===")
# Create a test document
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
f.write("This is a test document for workspace 1. It contains some sample text for indexing performance testing.")
test_file = f.name
try:
# Upload to workspace1
start_time = time.time()
async with httpx.AsyncClient(timeout=30.0) as client:
with open(test_file, 'rb') as file:
files = {'file': ('test.txt', file, 'text/plain')}
headers = {'X-Workspace': 'workspace1'}
response = await client.post('http://localhost:8001/documents/upload', files=files, headers=headers)
upload_time = time.time() - start_time
print(f"Upload time: {upload_time:.2f} seconds")
if response.status_code != 200:
print(f"Upload failed: {response.status_code} - {response.text}")
return False
result = response.json()
track_id = result.get('track_id')
print(f"Upload successful, track_id: {track_id}")
# Wait for indexing to complete
print("Waiting for indexing to complete...")
for i in range(60): # Wait up to 60 seconds
await asyncio.sleep(1)
status_response = await client.get(f'http://localhost:8001/documents/track_status/{track_id}', headers=headers)
if status_response.status_code == 200:
status_data = status_response.json()
documents = status_data.get('documents', [])
if documents:
doc_status = documents[0].get('status')
if doc_status == 'PROCESSED':
total_time = time.time() - start_time
print(f"Indexing completed in {total_time:.2f} seconds")
break
elif doc_status == 'FAILED':
print(f"Indexing failed: {documents[0].get('error_msg')}")
return False
if i % 10 == 0:
print(f"Still waiting... ({i+1}s)")
# Check pipeline status for performance insights
pipeline_response = await client.get('http://localhost:8001/documents/pipeline_status', headers=headers)
if pipeline_response.status_code == 200:
pipeline_data = pipeline_response.json()
print(f"Pipeline busy: {pipeline_data.get('busy')}")
print(f"Latest message: {pipeline_data.get('latest_message')}")
return True
finally:
os.unlink(test_file)
async def test_workspace_isolation():
"""Test that retrieval is isolated between workspaces."""
print("\n=== Testing Workspace Isolation in Retrieval ===")
# Create test documents for two workspaces
doc1_content = "Workspace 1 contains information about artificial intelligence and machine learning."
doc2_content = "Workspace 2 contains information about data science and statistical analysis."
async with httpx.AsyncClient(timeout=30.0) as client:
# Upload to workspace1
headers1 = {'X-Workspace': 'workspace1'}
response1 = await client.post(
'http://localhost:8001/documents/text',
json={'text': doc1_content, 'file_source': 'workspace1_doc.txt'},
headers=headers1
)
if response1.status_code != 200:
print(f"Failed to upload to workspace1: {response1.text}")
return False
# Upload to workspace2
headers2 = {'X-Workspace': 'workspace2'}
response2 = await client.post(
'http://localhost:8001/documents/text',
json={'text': doc2_content, 'file_source': 'workspace2_doc.txt'},
headers=headers2
)
if response2.status_code != 200:
print(f"Failed to upload to workspace2: {response2.text}")
return False
# Wait for indexing
await asyncio.sleep(5)
# Test search in workspace1 - should only find workspace1 content
search_response1 = await client.post(
'http://localhost:8001/search',
json={'query': 'artificial intelligence', 'top_k': 10},
headers=headers1
)
if search_response1.status_code != 200:
print(f"Search in workspace1 failed: {search_response1.text}")
return False
search_results1 = search_response1.json()
print(f"Workspace1 search results: {search_results1.get('total_results')} items")
# Test search in workspace2 - should only find workspace2 content
search_response2 = await client.post(
'http://localhost:8001/search',
json={'query': 'data science', 'top_k': 10},
headers=headers2
)
if search_response2.status_code != 200:
print(f"Search in workspace2 failed: {search_response2.text}")
return False
search_results2 = search_response2.json()
print(f"Workspace2 search results: {search_results2.get('total_results')} items")
# Check if there's cross-contamination
# Search for workspace2 content in workspace1
cross_search_response = await client.post(
'http://localhost:8001/search',
json={'query': 'data science statistical analysis', 'top_k': 10},
headers=headers1 # Searching in workspace1
)
if cross_search_response.status_code == 200:
cross_results = cross_search_response.json()
print(f"Cross-workspace search in workspace1: {cross_results.get('total_results')} items")
# This is the bug: search should be isolated but it's not
if cross_results.get('total_results', 0) > 0:
print("⚠️ BUG DETECTED: Retrieval is not isolated between workspaces!")
print(" Workspace1 can see documents from workspace2")
return False
else:
print("✓ Workspace isolation in retrieval is working correctly")
return True
else:
print(f"Cross-workspace search failed: {cross_search_response.text}")
return False
async def test_workspace_deletion():
"""Test that data is properly removed after workspace deletion."""
print("\n=== Testing Workspace Deletion ===")
async with httpx.AsyncClient(timeout=30.0) as client:
# Create a workspace and add data
workspace_name = "temp_workspace_delete_test"
headers = {'X-Workspace': workspace_name}
# First, check if workspace exists via API (if there's an endpoint)
# For now, we'll just try to upload
response = await client.post(
'http://localhost:8001/documents/text',
json={'text': 'This document should be deleted with the workspace.', 'file_source': 'delete_test.txt'},
headers=headers
)
if response.status_code != 200:
print(f"Failed to create test data: {response.text}")
return False
print(f"Created test data in workspace '{workspace_name}'")
# Wait for indexing
await asyncio.sleep(3)
# Verify data exists
search_response = await client.post(
'http://localhost:8001/search',
json={'query': 'document deleted workspace', 'top_k': 5},
headers=headers
)
if search_response.status_code == 200:
search_results = search_response.json()
print(f"Data exists in workspace: {search_results.get('total_results')} items found")
else:
print(f"Search failed: {search_response.text}")
# Delete workspace (simulate through directory deletion since API might not have endpoint)
# Check if there's a workspace deletion endpoint
try:
# Try to clear documents first
clear_response = await client.delete(
'http://localhost:8001/documents',
headers=headers
)
if clear_response.status_code == 200:
print("Cleared documents from workspace")
else:
print(f"Document clear failed: {clear_response.text}")
except Exception as e:
print(f"Note: Could not clear documents via API: {e}")
# Manually check if data directories exist
base_dir = Path(__file__).parent / "LightRAG-main"
workspace_dir = base_dir / "working" / workspace_name
input_dir = base_dir / "inputs" / workspace_name
print(f"Workspace directory: {workspace_dir}")
print(f"Input directory: {input_dir}")
# The bug: After deletion, data might still be accessible
print("\n⚠️ Manual check needed: After workspace deletion, check if:")
print(" 1. Directories are properly removed")
print(" 2. Vector database entries are cleared")
print(" 3. Search still returns results (it shouldn't)")
return True
async def test_uploaded_documents_refresh():
"""Test that Uploaded documents subscreen refreshes after workspace switch."""
print("\n=== Testing Uploaded Documents Refresh ===")
async with httpx.AsyncClient(timeout=30.0) as client:
# Create two workspaces with different documents
headers1 = {'X-Workspace': 'refresh_test_1'}
headers2 = {'X-Workspace': 'refresh_test_2'}
# Upload to workspace 1
response1 = await client.post(
'http://localhost:8001/documents/text',
json={'text': 'Document for refresh test workspace 1', 'file_source': 'refresh1.txt'},
headers=headers1
)
# Upload to workspace 2
response2 = await client.post(
'http://localhost:8001/documents/text',
json={'text': 'Document for refresh test workspace 2', 'file_source': 'refresh2.txt'},
headers=headers2
)
if response1.status_code != 200 or response2.status_code != 200:
print("Failed to create test documents")
return False
# Wait for indexing
await asyncio.sleep(3)
# Get documents list for workspace 1
docs_response1 = await client.get('http://localhost:8001/documents', headers=headers1)
if docs_response1.status_code == 200:
docs1 = docs_response1.json()
# Count documents in workspace 1
total_docs1 = sum(len(docs) for docs in docs1.get('statuses', {}).values())
print(f"Workspace 1 has {total_docs1} documents")
# Get documents list for workspace 2
docs_response2 = await client.get('http://localhost:8001/documents', headers=headers2)
if docs_response2.status_code == 200:
docs2 = docs_response2.json()
total_docs2 = sum(len(docs) for docs in docs2.get('statuses', {}).values())
print(f"Workspace 2 has {total_docs2} documents")
# The UI should refresh when switching workspaces
print("\n⚠️ UI Test needed: Manual verification required for:")
print(" 1. Switch workspace in UI")
print(" 2. Check if Uploaded documents subscreen refreshes")
print(" 3. Verify only documents from current workspace are shown")
return True
async def main():
"""Run all tests."""
print("Starting comprehensive workspace isolation tests")
print("=" * 60)
server_proc = None
try:
# Start server
print("Starting LightRAG server on port 8001...")
server_proc = start_server()
# Give server time to fully initialize
await asyncio.sleep(5)
# Run tests
tests_passed = 0
total_tests = 4
# Test 1: Upload/Indexing Performance
try:
if await test_upload_indexing_performance():
tests_passed += 1
except Exception as e:
print(f"Test 1 failed with error: {e}")
# Test 2: Workspace Isolation in Retrieval
try:
if await test_workspace_isolation():
tests_passed += 1
except Exception as e:
print(f"Test 2 failed with error: {e}")
# Test 3: Workspace Deletion
try:
if await test_workspace_deletion():
tests_passed += 1
except Exception as e:
print(f"Test 3 failed with error: {e}")
# Test 4: Uploaded Documents Refresh
try:
if await test_uploaded_documents_refresh():
tests_passed += 1
except Exception as e:
print(f"Test 4 failed with error: {e}")
print("\n" + "=" * 60)
print(f"Test Results: {tests_passed}/{total_tests} tests passed")
if tests_passed < total_tests:
print("\n⚠️ Issues found:")
print(" 1. Upload/indexing may have performance bottlenecks")
print(" 2. Retrieval may not be properly isolated between workspaces")
print(" 3. Data may persist after workspace deletion")
print(" 4. UI may not refresh uploaded documents after workspace switch")
return tests_passed == total_tests
finally:
# Cleanup
if server_proc:
print("\nStopping server...")
server_proc.terminate()
server_proc.wait(timeout=10)
# Clean up test directories
base_dir = Path(__file__).parent / "LightRAG-main"
test_dirs = [
base_dir / "working" / "workspace1",
base_dir / "working" / "workspace2",
base_dir / "working" / "temp_workspace_delete_test",
base_dir / "working" / "refresh_test_1",
base_dir / "working" / "refresh_test_2",
base_dir / "inputs" / "workspace1",
base_dir / "inputs" / "workspace2",
base_dir / "inputs" / "temp_workspace_delete_test",
base_dir / "inputs" / "refresh_test_1",
base_dir / "inputs" / "refresh_test_2"
]
for dir_path in test_dirs:
if dir_path.exists():
try:
shutil.rmtree(dir_path)
print(f"Cleaned up: {dir_path}")
except Exception as e:
print(f"Failed to clean up {dir_path}: {e}")
if __name__ == "__main__":
success = asyncio.run(main())
sys.exit(0 if success else 1)