#!/usr/bin/env python3 """ Selenium test to verify UI refresh after workspace switch using existing workspaces. """ import os import sys import time import json import requests import urllib.request import urllib.error from pathlib import Path from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException # Configuration SERVER_URL = 'http://localhost:3015' API_BASE = SERVER_URL TEST_FILE = 'test/tir.docx' WAIT_INDEXING_TIMEOUT = 120 POLL_INTERVAL = 2 def is_server_running(url=SERVER_URL, timeout=5): try: response = urllib.request.urlopen(url, timeout=timeout) return response.status < 500 except urllib.error.URLError: return False except Exception: return False def api_request(method, endpoint, workspace='', data=None, files=None): url = API_BASE + endpoint headers = {'X-API-Key': 'jleu1212'} if workspace: headers['X-Workspace'] = workspace if data and not files: headers['Content-Type'] = 'application/json' data = json.dumps(data) response = requests.request(method, url, headers=headers, data=data, files=files) return response def upload_file_via_api(file_path, workspace=''): with open(file_path, 'rb') as f: files = {'file': (os.path.basename(file_path), f, 'application/vnd.openxmlformats-officedocument.wordprocessingml.document')} resp = api_request('POST', '/documents/upload', workspace=workspace, files=files) if resp.status_code != 200: raise Exception(f"Upload failed: {resp.text}") result = resp.json() track_id = result.get('track_id') print(f"Uploaded {file_path}, track_id: {track_id}") return track_id def wait_for_indexing_complete(track_id, workspace='', timeout=WAIT_INDEXING_TIMEOUT): start = time.time() while time.time() - start < timeout: resp = api_request('GET', f'/documents/track_status/{track_id}', workspace=workspace) if resp.status_code == 200: data = resp.json() total = data.get('total_count', 0) processed = data.get('status_summary', {}).get('PROCESSED', 0) failed = data.get('status_summary', {}).get('FAILED', 0) pending = data.get('status_summary', {}).get('PENDING', 0) print(f"Indexing status: total={total}, processed={processed}, failed={failed}, pending={pending}") if pending == 0: print("Indexing completed.") return True time.sleep(POLL_INTERVAL) raise TimeoutError(f"Indexing not completed within {timeout} seconds") def select_workspace_in_ui(driver, wait, workspace_name): """Click workspace combobox and select the workspace with given name.""" # Find combobox combobox = wait.until( EC.element_to_be_clickable((By.CSS_SELECTOR, '[role="combobox"].w-48')) ) combobox.click() # Wait for dropdown open dropdown = wait.until( EC.presence_of_element_located((By.CSS_SELECTOR, '[role="listbox"][data-state="open"]')) ) # Find option with matching span text options = dropdown.find_elements(By.CSS_SELECTOR, '[role="option"]') for opt in options: span = opt.find_element(By.CSS_SELECTOR, 'span') if workspace_name in span.text: opt.click() # Wait for dropdown to close wait.until(EC.invisibility_of_element_located((By.CSS_SELECTOR, '[role="listbox"][data-state="open"]'))) time.sleep(1) # allow UI to update return raise Exception(f"Workspace {workspace_name} not found in dropdown") def test_ui_refresh(): if not is_server_running(): print("LightRAG server not running on http://localhost:3015. Skipping Selenium test.") return driver = None workspace_a = "isolated_ws2" workspace_b = "test1" try: # 1. Upload a document to workspace A via API track_id = upload_file_via_api(TEST_FILE, workspace=workspace_a) # 2. Wait for indexing to complete wait_for_indexing_complete(track_id, workspace=workspace_a) # 3. Initialize Selenium driver options = webdriver.ChromeOptions() options.add_argument('--headless') options.add_argument('--no-sandbox') options.add_argument('--disable-dev-shm-usage') driver = webdriver.Chrome(options=options) driver.implicitly_wait(5) wait = WebDriverWait(driver, 10) # 4. Open UI driver.get(SERVER_URL) print("Page loaded") # 5. Select workspace A via UI select_workspace_in_ui(driver, wait, workspace_a) print(f"Selected workspace {workspace_a}") # 6. Wait for document table to appear and verify at least one row try: rows = wait.until( EC.presence_of_all_elements_located((By.CSS_SELECTOR, 'table tbody tr')) ) assert len(rows) > 0, "No documents in table" print(f"✓ Uploaded document appears in UI for workspace A (rows: {len(rows)})") except (NoSuchElementException, TimeoutException) as e: print(f"Warning: Could not find document table: {e}") # 7. Switch to workspace B via UI select_workspace_in_ui(driver, wait, workspace_b) print(f"Selected workspace {workspace_b}") # 8. Verify Uploaded documents subscreen is empty (or shows different documents) try: time.sleep(2) # wait for UI refresh # Look for empty state message empty_card = driver.find_elements(By.CSS_SELECTOR, '.EmptyCard') if empty_card: print("✓ Workspace B document list is empty (EmptyCard present)") else: rows = driver.find_elements(By.CSS_SELECTOR, 'table tbody tr') if len(rows) == 0: print("✓ Workspace B document list is empty (no rows)") else: print(f"Warning: Workspace B document list contains {len(rows)} rows (maybe from other workspaces)") except Exception as e: print(f"Warning: Could not verify empty state: {e}") # 9. Switch back to workspace A and verify document still appears select_workspace_in_ui(driver, wait, workspace_a) print(f"Switched back to workspace {workspace_a}") rows = wait.until( EC.presence_of_all_elements_located((By.CSS_SELECTOR, 'table tbody tr')) ) assert len(rows) > 0, "Document disappeared after switching back" print(f"✓ Workspace A document list restored (rows: {len(rows)})") print("\n✅ UI refresh after workspace switch verified!") except Exception as e: print(f"Test failed with error: {e}") import traceback traceback.print_exc() raise finally: if driver: driver.quit() # Note: we do not delete the uploaded document; it's fine to leave in workspace. if __name__ == "__main__": test_ui_refresh()