""" Optimized Document Processor with Persistent Classifier Uses GPU acceleration for both PaddleOCR and OpenCLIP with complete dependency isolation """ import os import sys import asyncio import logging from pathlib import Path from typing import Dict, Any, List, Optional import json # Add paths sys.path.insert(0, "LightRAG-main") # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class OptimizedDocumentProcessor: """Optimized document processor with GPU acceleration and dependency isolation""" def __init__(self): self.ocr_processor = None self.classifier_client = None self._initialize_components() def _initialize_components(self): """Initialize OCR and classifier components with dependency isolation""" logger.info("=== INITIALIZING OPTIMIZED DOCUMENT PROCESSOR ===") # Initialize OCR processor (PaddleOCR with GPU REQUIRED - no fallback) try: from simple_ocr_processor import SimpleOCRProcessor self.ocr_processor = SimpleOCRProcessor() logger.info("✅ OCR processor initialized with GPU") except Exception as e: logger.error(f"❌ OCR processor GPU initialization failed: {e}") raise RuntimeError(f"PaddleOCR GPU is required but failed to initialize: {e}") # Initialize persistent classifier client (OpenCLIP with GPU) try: from persistent_classifier_client import PersistentClassifierClient self.classifier_client = PersistentClassifierClient() if self.classifier_client.available: logger.info("✅ Persistent classifier client initialized") else: logger.warning("⚠️ Persistent classifier not available, image classification disabled") self.classifier_client = None except Exception as e: logger.error(f"❌ Classifier client initialization failed: {e}") self.classifier_client = None async def process_document(self, file_path: str) -> Dict[str, Any]: """Process document with text-first extraction and image classification""" logger.info(f"📄 Processing document: {file_path}") result = { "success": False, "file_path": file_path, "text_content": "", "images": [], "metadata": { "file_type": Path(file_path).suffix.lower(), "processing_time": 0, "ocr_used": False, "classification_used": False, "gpu_accelerated": False } } start_time = asyncio.get_event_loop().time() try: # Step 1: Extract text first for all file types text_content = await self._extract_text(file_path) result["text_content"] = text_content result["metadata"]["text_extracted"] = bool(text_content.strip()) # Step 2: Extract and process images images = await self._extract_images(file_path) if images: processed_images = await self._process_images(images) result["images"] = processed_images result["metadata"]["images_processed"] = len(images) # Step 2.5: Append image classification results to text content classification_text = self._build_classification_text(processed_images) if classification_text: result["text_content"] += "\n\n" + classification_text logger.info(f"📸 Added image classification metadata to text content") # Step 3: Update metadata processing_time = asyncio.get_event_loop().time() - start_time result["metadata"]["processing_time"] = processing_time result["metadata"]["ocr_used"] = self.ocr_processor is not None result["metadata"]["classification_used"] = self.classifier_client is not None result["metadata"]["gpu_accelerated"] = True # Both use GPU when available result["success"] = True logger.info(f"✅ Document processing completed in {processing_time:.2f}s") except Exception as e: result["success"] = False result["metadata"]["error"] = str(e) logger.error(f"❌ Document processing failed: {e}") return result async def _extract_text(self, file_path: str) -> str: """Extract text from document using appropriate method""" file_ext = Path(file_path).suffix.lower() if file_ext in ['.txt']: # Simple text file with open(file_path, 'r', encoding='utf-8') as f: return f.read() elif file_ext in ['.pdf']: # PDF file - extract text directly try: import PyPDF2 with open(file_path, 'rb') as f: pdf_reader = PyPDF2.PdfReader(f) text = "" for page in pdf_reader.pages: text += page.extract_text() + "\n" return text except Exception as e: logger.warning(f"PDF text extraction failed, will use OCR: {e}") return "" elif file_ext in ['.docx']: # Word document - extract text directly try: from docx import Document doc = Document(file_path) text = "" for paragraph in doc.paragraphs: text += paragraph.text + "\n" return text except Exception as e: logger.warning(f"DOCX text extraction failed, will use OCR: {e}") return "" else: # Unknown file type, try OCR logger.info(f"Unknown file type {file_ext}, using OCR") return "" async def _extract_images(self, file_path: str) -> List[str]: """Extract images from document""" file_ext = Path(file_path).suffix.lower() output_dir = "extracted_images" os.makedirs(output_dir, exist_ok=True) if file_ext in ['.pdf']: # Extract images from PDF try: from pdf2image import convert_from_path images = convert_from_path(file_path) image_paths = [] for i, image in enumerate(images): img_path = os.path.join(output_dir, f"pdf_page_{i+1}.png") image.save(img_path, 'PNG') image_paths.append(img_path) return image_paths except Exception as e: logger.warning(f"PDF image extraction failed: {e}") return [] elif file_ext in ['.docx']: # Extract images from Word document try: from word_image_extractor import extract_images_from_docx return extract_images_from_docx(file_path, output_dir) except Exception as e: logger.warning(f"DOCX image extraction failed: {e}") return [] elif file_ext in ['.png', '.jpg', '.jpeg', '.bmp', '.tiff']: # Single image file return [file_path] else: # No images for other file types return [] async def _process_images(self, image_paths: List[str]) -> List[Dict[str, Any]]: """Process images with conditional classification: OCR-first, classification only if no text""" processed_images = [] for img_path in image_paths: image_data = { "path": img_path, "ocr_text": "", "ocr_confidence": 0.0, "classification": [], "processing_notes": "" } # Step 1: OCR text extraction (GPU mode) if self.ocr_processor: try: ocr_result = await asyncio.to_thread(self.ocr_processor.extract_text_from_image, img_path) if ocr_result.get("text", "").strip(): image_data["ocr_text"] = ocr_result.get("text", "") image_data["ocr_confidence"] = ocr_result.get("confidence", 0.0) image_data["processing_notes"] = "Text extracted via OCR" logger.info(f"📝 OCR extracted {len(image_data['ocr_text'])} chars from {os.path.basename(img_path)}") else: # No text detected - proceed to classification image_data["processing_notes"] = "No text detected, proceeding to classification" logger.info(f"🖼️ No text in {os.path.basename(img_path)}, using classification") except Exception as e: logger.warning(f"OCR failed for {img_path}: {e}") image_data["processing_notes"] = f"OCR failed: {str(e)}" # Step 2: Image classification (GPU mode) - ONLY if no text was detected if self.classifier_client and not image_data["ocr_text"].strip(): try: classification_results = self.classifier_client.classify_image(img_path) image_data["classification"] = classification_results if classification_results: image_data["processing_notes"] += " | Classified via OpenCLIP" logger.info(f"🔍 Classified {os.path.basename(img_path)}: {classification_results[0]['label']}") except Exception as e: logger.warning(f"Classification failed for {img_path}: {e}") image_data["processing_notes"] += f" | Classification failed: {str(e)}" processed_images.append(image_data) return processed_images def _build_classification_text(self, processed_images: List[Dict[str, Any]]) -> str: """Build text representation of image classification results for indexing""" classification_lines = [] for i, img_data in enumerate(processed_images): if img_data.get("classification"): # Add image classification labels to the text content top_classification = img_data["classification"][0] # Get highest confidence result label = top_classification["label"] confidence = top_classification["confidence"] classification_lines.append(f"Image {i+1}: {label} (confidence: {confidence:.1%})") # Add additional classifications if they have high confidence for j, cls in enumerate(img_data["classification"][1:4]): # Next top 3 if cls["confidence"] > 0.1: # Only include if confidence > 10% classification_lines.append(f" Also: {cls['label']} (confidence: {cls['confidence']:.1%})") if classification_lines: return "Image Classifications:\n" + "\n".join(classification_lines) return "" async def test_optimized_processor(): """Test the optimized document processor""" print("🧪 TESTING OPTIMIZED DOCUMENT PROCESSOR") print("=" * 50) processor = OptimizedDocumentProcessor() # Test with test.docx test_file = "test.docx" if not os.path.exists(test_file): print(f"❌ Test file not found: {test_file}") return print(f"📄 Processing: {test_file}") result = await processor.process_document(test_file) print(f"\n📊 PROCESSING RESULTS:") print(f" Success: {'✅' if result['success'] else '❌'}") print(f" Processing Time: {result['metadata']['processing_time']:.2f}s") print(f" Text Extracted: {result['metadata']['text_extracted']}") print(f" Images Processed: {result['metadata'].get('images_processed', 0)}") print(f" OCR Used: {result['metadata']['ocr_used']}") print(f" Classification Used: {result['metadata']['classification_used']}") print(f" GPU Accelerated: {result['metadata']['gpu_accelerated']}") if result["text_content"]: print(f"\n📝 TEXT CONTENT (first 500 chars):") print(result["text_content"][:500] + "..." if len(result["text_content"]) > 500 else result["text_content"]) if result["images"]: print(f"\n🖼️ IMAGE PROCESSING RESULTS:") for i, img in enumerate(result["images"]): print(f" Image {i+1}:") if img["ocr_text"]: print(f" OCR: {len(img['ocr_text'])} chars, confidence: {img['ocr_confidence']:.3f}") if img["classification"]: top_result = img["classification"][0] print(f" Classification: {top_result['label']} (confidence: {top_result['confidence']:.3f})") # Check for bee detection if "bee" in top_result["label"].lower(): print(f" 🎯 BEE DETECTED WITH {top_result['confidence']:.1%} CONFIDENCE!") # Performance summary print(f"\n⚡ PERFORMANCE SUMMARY:") print(f" Total processing time: {result['metadata']['processing_time']:.2f}s") if result["metadata"].get('images_processed', 0) > 0: per_image_time = result['metadata']['processing_time'] / result['metadata']['images_processed'] print(f" Per image processing time: {per_image_time:.3f}s") return result if __name__ == "__main__": asyncio.run(test_optimized_processor())