""" LightRAG Optimizations Integration Module This module integrates the three optimizations into the main LightRAG system: 1. Optimized entity extraction with proper LLM formatting 2. Batch graph operations for merging stage 3. Performance monitoring for merging stage phase 2 Each optimization can be enabled/disabled via configuration. """ import os import asyncio import time from typing import Dict, Any, Optional from functools import wraps import logging logger = logging.getLogger(__name__) class LightRAGOptimizations: """Integration class for LightRAG performance optimizations""" def __init__(self, enable_entity_extraction_opt: bool = True, enable_graph_operations_opt: bool = True, enable_performance_monitoring: bool = True): self.enable_entity_extraction_opt = enable_entity_extraction_opt self.enable_graph_operations_opt = enable_graph_operations_opt self.enable_performance_monitoring = enable_performance_monitoring # Import optimization modules self.entity_extraction_optimizer = None self.graph_operations_optimizer = None self.performance_monitor = None self._load_optimizations() def _load_optimizations(self): """Load optimization modules based on configuration""" try: if self.enable_entity_extraction_opt: from optimize_entity_extraction import EntityExtractionOptimizer self.entity_extraction_optimizer = EntityExtractionOptimizer() logger.info("✅ Entity extraction optimization loaded") if self.enable_graph_operations_opt: from optimize_graph_operations import GraphOperationsOptimizer self.graph_operations_optimizer = GraphOperationsOptimizer() logger.info("✅ Graph operations optimization loaded") if self.enable_performance_monitoring: from optimize_performance_monitoring import PerformanceMonitor self.performance_monitor = PerformanceMonitor() logger.info("✅ Performance monitoring loaded") except ImportError as e: logger.warning(f"Failed to load optimization modules: {e}") logger.warning("Continuing without optimizations") async def optimize_extract_entities(self, chunk: Dict[str, Any], global_config: Dict[str, Any], pipeline_status=None, pipeline_status_lock=None, llm_response_cache=None, text_chunks_storage=None): """Optimized entity extraction with proper LLM formatting""" if self.entity_extraction_optimizer: try: return await self.entity_extraction_optimizer.optimize_extraction_formatting( chunk, global_config, pipeline_status, pipeline_status_lock, llm_response_cache, text_chunks_storage ) except Exception as e: logger.warning(f"Entity extraction optimization failed, falling back to original: {e}") # Fall back to original implementation from lightrag.operate import extract_entities return await extract_entities( chunk, global_config, pipeline_status, pipeline_status_lock, llm_response_cache, text_chunks_storage ) else: # Use original implementation from lightrag.operate import extract_entities return await extract_entities( chunk, global_config, pipeline_status, pipeline_status_lock, llm_response_cache, text_chunks_storage ) async def optimize_merge_nodes_and_edges(self, chunk_results: list, knowledge_graph_inst, entity_vdb, relationships_vdb, global_config: Dict[str, Any], full_entities_storage, full_relations_storage, doc_id: str, pipeline_status=None, pipeline_status_lock=None, llm_response_cache=None, current_file_number: int = 1, total_files: int = 1, file_path: str = ""): """Optimized merging stage with batch graph operations and performance monitoring""" # Start performance monitoring if enabled performance_metrics = None if self.performance_monitor: performance_metrics = await self.performance_monitor.start_merging_stage_phase2( doc_id, current_file_number, total_files, file_path ) try: if self.graph_operations_optimizer: # Use optimized batch processing result = await self.graph_operations_optimizer.batch_merge_nodes_and_edges( chunk_results, knowledge_graph_inst, entity_vdb, relationships_vdb, global_config, full_entities_storage, full_relations_storage, doc_id, pipeline_status, pipeline_status_lock, llm_response_cache, current_file_number, total_files, file_path ) else: # Use original implementation from lightrag.operate import merge_nodes_and_edges result = await merge_nodes_and_edges( chunk_results, knowledge_graph_inst, entity_vdb, relationships_vdb, global_config, full_entities_storage, full_relations_storage, doc_id, pipeline_status, pipeline_status_lock, llm_response_cache, current_file_number, total_files, file_path ) # Record successful completion if self.performance_monitor and performance_metrics: await self.performance_monitor.record_success(performance_metrics) return result except Exception as e: # Record failure if self.performance_monitor and performance_metrics: await self.performance_monitor.record_failure(performance_metrics, str(e)) raise def get_configuration(self) -> Dict[str, Any]: """Get current optimization configuration""" return { "entity_extraction_optimization": self.enable_entity_extraction_opt, "graph_operations_optimization": self.enable_graph_operations_opt, "performance_monitoring": self.enable_performance_monitoring, "optimizations_loaded": { "entity_extraction": self.entity_extraction_optimizer is not None, "graph_operations": self.graph_operations_optimizer is not None, "performance_monitoring": self.performance_monitor is not None } } class OptimizedLightRAG: """ Wrapper class that adds optimization capabilities to LightRAG This can be used as a drop-in replacement for LightRAG """ def __init__(self, lightrag_instance, **optimization_kwargs): self.lightrag = lightrag_instance self.optimizations = LightRAGOptimizations(**optimization_kwargs) def __getattr__(self, name): """Delegate all other attributes to the underlying LightRAG instance""" return getattr(self.lightrag, name) async def _process_extract_entities_optimized(self, chunk: Dict[str, Any], pipeline_status=None, pipeline_status_lock=None): """Optimized version of _process_extract_entities""" return await self.optimizations.optimize_extract_entities( chunk, global_config=self.lightrag.get_global_config(), pipeline_status=pipeline_status, pipeline_status_lock=pipeline_status_lock, llm_response_cache=self.lightrag.llm_response_cache, text_chunks_storage=self.lightrag.text_chunks ) async def ainsert_optimized(self, input, split_by_character=None, split_by_character_only=False, ids=None, file_paths=None, track_id=None): """Optimized async insert with all optimizations enabled""" # Use original enqueue logic if track_id is None: from lightrag.utils import generate_track_id track_id = generate_track_id("insert") await self.lightrag.apipeline_enqueue_documents(input, ids, file_paths, track_id) # Use optimized processing await self.apipeline_process_enqueue_documents_optimized( split_by_character, split_by_character_only ) return track_id async def apipeline_process_enqueue_documents_optimized(self, split_by_character=None, split_by_character_only=False): """Optimized document processing pipeline""" # This would replace the original apipeline_process_enqueue_documents method # with optimized versions of entity extraction and merging # Get pipeline status shared data and lock from lightrag.kg.shared_storage import get_namespace_data, get_pipeline_status_lock pipeline_status = await get_namespace_data("pipeline_status") pipeline_status_lock = get_pipeline_status_lock() async with pipeline_status_lock: if not pipeline_status.get("busy", False): # Get documents to process (same logic as original) from lightrag.base import DocStatus processing_docs, failed_docs, pending_docs = await asyncio.gather( self.lightrag.doc_status.get_docs_by_status(DocStatus.PROCESSING), self.lightrag.doc_status.get_docs_by_status(DocStatus.FAILED), self.lightrag.doc_status.get_docs_by_status(DocStatus.PENDING), ) to_process_docs = {} to_process_docs.update(processing_docs) to_process_docs.update(failed_docs) to_process_docs.update(pending_docs) if not to_process_docs: logger.info("No documents to process") return # Set pipeline status (same as original) from datetime import datetime, timezone pipeline_status.update({ "busy": True, "job_name": "Optimized Job", "job_start": datetime.now(timezone.utc).isoformat(), "docs": len(to_process_docs), "batchs": len(to_process_docs), "cur_batch": 0, "request_pending": False, "latest_message": "", "history_messages": [], }) else: pipeline_status["request_pending"] = True logger.info("Another process is already processing the document queue. Request queued.") return try: # Process documents with optimizations await self._process_documents_optimized( to_process_docs, split_by_character, split_by_character_only, pipeline_status, pipeline_status_lock ) finally: async with pipeline_status_lock: pipeline_status["busy"] = False async def _process_documents_optimized(self, to_process_docs, split_by_character, split_by_character_only, pipeline_status, pipeline_status_lock): """Optimized document processing with all optimizations""" # This would contain the optimized processing logic # For now, we'll use the original processing but with optimized entity extraction and merging # In a full implementation, this would replace the original processing logic logger.info(f"Processing {len(to_process_docs)} documents with optimizations") # The actual implementation would go here, replacing the original processing # with calls to our optimized methods # For now, we'll just log that we're using optimizations async with pipeline_status_lock: pipeline_status["latest_message"] = "Using optimized processing pipeline" pipeline_status["history_messages"].append("Optimized processing pipeline activated") # Factory function to create optimized LightRAG instance def create_optimized_lightrag(lightrag_instance, **optimization_kwargs): """ Create an optimized LightRAG instance with all performance optimizations Args: lightrag_instance: Original LightRAG instance **optimization_kwargs: Optimization configuration options Returns: OptimizedLightRAG instance """ return OptimizedLightRAG(lightrag_instance, **optimization_kwargs) # Configuration helper def get_optimization_config_from_env(): """Get optimization configuration from environment variables""" return { "enable_entity_extraction_opt": os.getenv("OPTIMIZE_ENTITY_EXTRACTION", "true").lower() == "true", "enable_graph_operations_opt": os.getenv("OPTIMIZE_GRAPH_OPERATIONS", "true").lower() == "true", "enable_performance_monitoring": os.getenv("ENABLE_PERFORMANCE_MONITORING", "true").lower() == "true", } # Example usage: """ # In your application code: from lightrag import LightRAG from lightrag_optimizations_integration import create_optimized_lightrag # Create original LightRAG instance rag = LightRAG() # Create optimized version optimized_rag = create_optimized_lightrag(rag) # Use optimized instance track_id = await optimized_rag.ainsert_optimized(["Your document content"]) """