80 lines
3.0 KiB
Python
80 lines
3.0 KiB
Python
import requests
|
|
import json
|
|
import time
|
|
|
|
BASE_URL = "http://localhost:3015"
|
|
API_KEY = "jleu1212"
|
|
|
|
def make_headers(workspace=None):
|
|
headers = {
|
|
"X-API-Key": API_KEY,
|
|
"Content-Type": "application/json",
|
|
}
|
|
if workspace is not None:
|
|
headers["X-Workspace"] = workspace
|
|
return headers
|
|
|
|
def test_delete_workspace():
|
|
workspace = "test_delete_workspace"
|
|
# First, ensure workspace exists by uploading a file
|
|
upload_url = f"{BASE_URL}/documents/upload"
|
|
# We need to actually upload a file; skip for now
|
|
# Instead, just call delete and see if it works (should delete empty workspace)
|
|
delete_url = f"{BASE_URL}/workspaces/{workspace}"
|
|
response = requests.delete(delete_url, headers=make_headers(workspace))
|
|
print(f"Delete workspace {workspace}: status {response.status_code}")
|
|
if response.status_code != 200:
|
|
print(f"Error: {response.text}")
|
|
else:
|
|
print(f"Response: {response.json()}")
|
|
|
|
def test_clear_documents():
|
|
workspace = "test_clear"
|
|
# Upload a dummy file (text) to create documents
|
|
text_url = f"{BASE_URL}/documents/text"
|
|
payload = {
|
|
"text": "Sample document for testing clear",
|
|
"file_source": "test.txt"
|
|
}
|
|
response = requests.post(text_url, headers=make_headers(workspace), json=payload)
|
|
print(f"Insert text: status {response.status_code}")
|
|
if response.status_code != 200:
|
|
print(f"Error: {response.text}")
|
|
return
|
|
track_id = response.json().get("track_id")
|
|
print(f"Track ID: {track_id}")
|
|
# Wait a bit for processing
|
|
time.sleep(2)
|
|
# Check documents in workspace
|
|
paginated_url = f"{BASE_URL}/documents/paginated"
|
|
payload = {"page": 1, "page_size": 10}
|
|
response = requests.post(paginated_url, headers=make_headers(workspace), json=payload)
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
print(f"Documents before clear: {data['pagination']['total_count']}")
|
|
else:
|
|
print(f"Failed to get documents: {response.text}")
|
|
# Clear documents
|
|
clear_url = f"{BASE_URL}/documents"
|
|
response = requests.delete(clear_url, headers=make_headers(workspace))
|
|
print(f"Clear documents: status {response.status_code}")
|
|
if response.status_code != 200:
|
|
print(f"Error: {response.text}")
|
|
else:
|
|
print(f"Response: {response.json()}")
|
|
# Verify documents are cleared
|
|
response = requests.post(paginated_url, headers=make_headers(workspace), json=payload)
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
print(f"Documents after clear: {data['pagination']['total_count']}")
|
|
else:
|
|
print(f"Failed to get documents: {response.text}")
|
|
# Clean up workspace (delete)
|
|
delete_url = f"{BASE_URL}/workspaces/{workspace}"
|
|
response = requests.delete(delete_url, headers=make_headers(workspace))
|
|
print(f"Delete workspace: status {response.status_code}")
|
|
|
|
if __name__ == "__main__":
|
|
print("Testing workspace operations...")
|
|
test_delete_workspace()
|
|
test_clear_documents() |