""" Workspace manager for LightRAG server. Provides isolation between different workspaces by managing separate LightRAG instances per workspace. """ import os import logging import asyncio from pathlib import Path from typing import Dict, List, Optional, Tuple from lightrag import LightRAG from lightrag.api.routers.document_routes import DocumentManager from lightrag.api.config import global_args logger = logging.getLogger(__name__) class WorkspaceManager: """Manages multiple workspaces, each with its own LightRAG instance and DocumentManager.""" def __init__(self, args, lightrag_factory=None): self.args = args self.base_working_dir = Path(args.working_dir) self.base_input_dir = Path(args.input_dir) self.lightrag_factory = lightrag_factory # Cache of LightRAG instances per workspace self._rag_instances: Dict[str, LightRAG] = {} # Cache of DocumentManager instances per workspace self._doc_managers: Dict[str, DocumentManager] = {} # Ensure base directories exist self.base_working_dir.mkdir(parents=True, exist_ok=True) self.base_input_dir.mkdir(parents=True, exist_ok=True) def list_workspaces(self) -> List[str]: """List all existing workspaces by scanning the working directory.""" workspaces = [] for item in self.base_working_dir.iterdir(): if item.is_dir(): # Exclude special directories if item.name.startswith("__") and item.name.endswith("__"): continue # Check if it's a valid workspace (has at least one storage file) # For simplicity, we consider any subdirectory as a workspace workspaces.append(item.name) return sorted(workspaces) def create_workspace(self, name: str) -> bool: """Create a new workspace directory.""" if not name or not name.strip(): raise ValueError("Workspace name cannot be empty") name = name.strip() # Validate name (alphanumeric, underscore, hyphen) if not all(c.isalnum() or c in ('_', '-') for c in name): raise ValueError("Workspace name can only contain alphanumeric characters, underscores, and hyphens") workspace_dir = self.base_working_dir / name input_subdir = self.base_input_dir / name try: workspace_dir.mkdir(exist_ok=True) input_subdir.mkdir(exist_ok=True) logger.info(f"Created workspace '{name}' with directories {workspace_dir}, {input_subdir}") return True except Exception as e: logger.error(f"Failed to create workspace '{name}': {e}") raise async def rename_workspace(self, old_name: str, new_name: str) -> bool: """Rename a workspace directory and update cached instances.""" if not old_name or not old_name.strip(): raise ValueError("Old workspace name cannot be empty") if not new_name or not new_name.strip(): raise ValueError("New workspace name cannot be empty") old_name = old_name.strip() new_name = new_name.strip() # Validate new name (alphanumeric, underscore, hyphen) if not all(c.isalnum() or c in ('_', '-') for c in new_name): raise ValueError("New workspace name can only contain alphanumeric characters, underscores, and hyphens") # Check if old workspace exists old_workspace_dir = self.base_working_dir / old_name old_input_subdir = self.base_input_dir / old_name if not old_workspace_dir.exists() and not old_input_subdir.exists(): raise ValueError(f"Workspace '{old_name}' does not exist") # Check if new workspace already exists new_workspace_dir = self.base_working_dir / new_name new_input_subdir = self.base_input_dir / new_name if new_workspace_dir.exists() or new_input_subdir.exists(): raise ValueError(f"Workspace '{new_name}' already exists") # Move directories import shutil try: if old_workspace_dir.exists(): shutil.move(str(old_workspace_dir), str(new_workspace_dir)) logger.info(f"Moved workspace directory from {old_workspace_dir} to {new_workspace_dir}") if old_input_subdir.exists(): shutil.move(str(old_input_subdir), str(new_input_subdir)) logger.info(f"Moved input directory from {old_input_subdir} to {new_input_subdir}") except Exception as e: logger.error(f"Failed to rename workspace '{old_name}' to '{new_name}': {e}") raise # Update cached instances if old_name in self._rag_instances: self._rag_instances[new_name] = self._rag_instances.pop(old_name) # Optionally update the workspace name inside the LightRAG instance? Not needed as path changed. if old_name in self._doc_managers: self._doc_managers[new_name] = self._doc_managers.pop(old_name) # Update the workspace name in DocumentManager? It uses workspace parameter. # For simplicity, we'll just keep the cached instance; but the workspace attribute may be wrong. # Better to discard and let it be recreated when needed. del self._doc_managers[new_name] # discard, will be recreated with new workspace name logger.info(f"Renamed workspace '{old_name}' to '{new_name}' successfully") return True async def delete_workspace(self, name: str) -> bool: """Delete a workspace directory and all its data.""" import shutil from pathlib import Path # Validate name if not name or not name.strip(): raise ValueError("Workspace name cannot be empty") name = name.strip() # Check if workspace exists workspace_dir = self.base_working_dir / name input_subdir = self.base_input_dir / name if not workspace_dir.exists() and not input_subdir.exists(): raise ValueError(f"Workspace '{name}' does not exist") # Remove cached instances if name in self._rag_instances: del self._rag_instances[name] if name in self._doc_managers: del self._doc_managers[name] # Delete workspace data from storage (vector DB, KV, graph, etc.) try: if self.lightrag_factory: # Create a temporary LightRAG instance for this workspace to delete its data rag = self.lightrag_factory(str(self.base_working_dir), name) # Call async delete_workspace_data await rag.adelete_workspace_data() logger.info(f"Deleted workspace data for '{name}' from storage") else: logger.warning("No lightrag_factory provided; workspace data may remain in storage") except Exception as e: logger.warning(f"Failed to delete workspace data for '{name}': {e}. Proceeding with directory deletion.") # Delete directories recursively try: if workspace_dir.exists(): shutil.rmtree(workspace_dir) logger.info(f"Deleted workspace directory: {workspace_dir}") if input_subdir.exists(): shutil.rmtree(input_subdir) logger.info(f"Deleted workspace input directory: {input_subdir}") except Exception as e: logger.error(f"Failed to delete workspace '{name}': {e}") raise return True def get_rag(self, workspace: str = "") -> LightRAG: """Get or create a LightRAG instance for the given workspace.""" if not workspace: workspace = self.args.workspace # default workspace from args if workspace not in self._rag_instances: if self.lightrag_factory: # The factory is a function, not an object with .create() method rag = self.lightrag_factory(str(self.base_working_dir), workspace) else: # Fallback: create a simple LightRAG instance with default config # This is not ideal but works for testing from lightrag import LightRAG from lightrag.utils import EmbeddingFunc # We need to import the same configuration as used in create_app # For now, raise error raise NotImplementedError("LightRAG factory not provided") self._rag_instances[workspace] = rag return self._rag_instances[workspace] def get_document_manager(self, workspace: str = "") -> DocumentManager: """Get or create a DocumentManager instance for the given workspace.""" if not workspace: workspace = self.args.workspace if workspace not in self._doc_managers: # Create a new DocumentManager with workspace-specific input directory input_dir = self.base_input_dir / workspace if workspace else self.base_input_dir self._doc_managers[workspace] = DocumentManager(str(input_dir), workspace=workspace) return self._doc_managers[workspace] def workspace_exists(self, name: str) -> bool: """Check if a workspace exists.""" return (self.base_working_dir / name).exists()