Files
railseek6/test_workspace_isolation_api.py

97 lines
3.9 KiB
Python

#!/usr/bin/env python3
"""
Test workspace isolation via API.
Assumes LightRAG server is running on http://localhost:3015.
"""
import requests
import json
import sys
BASE_URL = "http://localhost:3015"
def test_workspace_isolation():
# 1. Create workspace A
resp = requests.post(f"{BASE_URL}/workspaces/", json={"name": "workspace_a"})
if resp.status_code != 200:
print(f"Failed to create workspace A: {resp.status_code} {resp.text}")
return False
workspace_a = resp.json()
workspace_a_id = workspace_a.get("id")
print(f"Created workspace A: {workspace_a_id}")
# 2. Create workspace B
resp = requests.post(f"{BASE_URL}/workspaces/", json={"name": "workspace_b"})
if resp.status_code != 200:
print(f"Failed to create workspace B: {resp.status_code} {resp.text}")
return False
workspace_b = resp.json()
workspace_b_id = workspace_b.get("id")
print(f"Created workspace B: {workspace_b_id}")
# 3. Upload a document to workspace A
files = {"file": ("test.txt", b"Content for workspace A", "text/plain")}
resp = requests.post(f"{BASE_URL}/documents/upload?workspace={workspace_a_id}", files=files)
if resp.status_code != 200:
print(f"Failed to upload to workspace A: {resp.status_code} {resp.text}")
return False
doc_a = resp.json()
doc_a_id = doc_a.get("id")
print(f"Uploaded document to workspace A: {doc_a_id}")
# 4. Upload a different document to workspace B
files = {"file": ("test2.txt", b"Content for workspace B", "text/plain")}
resp = requests.post(f"{BASE_URL}/documents/upload?workspace={workspace_b_id}", files=files)
if resp.status_code != 200:
print(f"Failed to upload to workspace B: {resp.status_code} {resp.text}")
return False
doc_b = resp.json()
doc_b_id = doc_b.get("id")
print(f"Uploaded document to workspace B: {doc_b_id}")
# 5. List documents in workspace A (should only see doc A)
resp = requests.get(f"{BASE_URL}/documents?workspace={workspace_a_id}")
if resp.status_code != 200:
print(f"Failed to list documents for workspace A: {resp.status_code} {resp.text}")
return False
docs_a = resp.json()
doc_ids_a = [d.get("id") for d in docs_a]
print(f"Documents in workspace A: {doc_ids_a}")
if doc_b_id in doc_ids_a:
print("ERROR: Document from workspace B appears in workspace A!")
return False
# 6. List documents in workspace B (should only see doc B)
resp = requests.get(f"{BASE_URL}/documents?workspace={workspace_b_id}")
if resp.status_code != 200:
print(f"Failed to list documents for workspace B: {resp.status_code} {resp.text}")
return False
docs_b = resp.json()
doc_ids_b = [d.get("id") for d in docs_b]
print(f"Documents in workspace B: {doc_ids_b}")
if doc_a_id in doc_ids_b:
print("ERROR: Document from workspace A appears in workspace B!")
return False
# 7. Search in workspace A (should only find content from doc A)
resp = requests.post(f"{BASE_URL}/query?workspace={workspace_a_id}", json={"query": "Content"})
if resp.status_code != 200:
print(f"Failed to search in workspace A: {resp.status_code} {resp.text}")
# search may fail due to no indexed content, but that's okay
print("Search returned non-200, but that's acceptable for this test.")
else:
result = resp.json()
print(f"Search result in workspace A: {result}")
# 8. Clean up (optional)
# Delete workspaces (will delete all documents)
resp = requests.delete(f"{BASE_URL}/workspaces/{workspace_a_id}")
print(f"Deleted workspace A: {resp.status_code}")
resp = requests.delete(f"{BASE_URL}/workspaces/{workspace_b_id}")
print(f"Deleted workspace B: {resp.status_code}")
print("✓ Workspace isolation API test passed!")
return True
if __name__ == "__main__":
success = test_workspace_isolation()
sys.exit(0 if success else 1)