312 lines
14 KiB
Python
312 lines
14 KiB
Python
"""
|
|
Optimized Document Processor with Persistent Classifier
|
|
Uses GPU acceleration for both PaddleOCR and OpenCLIP with complete dependency isolation
|
|
"""
|
|
import os
|
|
import sys
|
|
import asyncio
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Dict, Any, List, Optional
|
|
import json
|
|
|
|
# Add paths
|
|
sys.path.insert(0, "LightRAG-main")
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class OptimizedDocumentProcessor:
|
|
"""Optimized document processor with GPU acceleration and dependency isolation"""
|
|
|
|
def __init__(self):
|
|
self.ocr_processor = None
|
|
self.classifier_client = None
|
|
self._initialize_components()
|
|
|
|
def _initialize_components(self):
|
|
"""Initialize OCR and classifier components with dependency isolation"""
|
|
logger.info("=== INITIALIZING OPTIMIZED DOCUMENT PROCESSOR ===")
|
|
|
|
# Initialize OCR processor (PaddleOCR with GPU REQUIRED - no fallback)
|
|
try:
|
|
from simple_ocr_processor import SimpleOCRProcessor
|
|
self.ocr_processor = SimpleOCRProcessor()
|
|
logger.info("✅ OCR processor initialized with GPU")
|
|
except Exception as e:
|
|
logger.error(f"❌ OCR processor GPU initialization failed: {e}")
|
|
raise RuntimeError(f"PaddleOCR GPU is required but failed to initialize: {e}")
|
|
|
|
# Initialize persistent classifier client (OpenCLIP with GPU)
|
|
try:
|
|
from persistent_classifier_client import PersistentClassifierClient
|
|
self.classifier_client = PersistentClassifierClient()
|
|
if self.classifier_client.available:
|
|
logger.info("✅ Persistent classifier client initialized")
|
|
else:
|
|
logger.warning("⚠️ Persistent classifier not available, image classification disabled")
|
|
self.classifier_client = None
|
|
except Exception as e:
|
|
logger.error(f"❌ Classifier client initialization failed: {e}")
|
|
self.classifier_client = None
|
|
|
|
async def process_document(self, file_path: str) -> Dict[str, Any]:
|
|
"""Process document with text-first extraction and image classification"""
|
|
logger.info(f"📄 Processing document: {file_path}")
|
|
|
|
result = {
|
|
"success": False,
|
|
"file_path": file_path,
|
|
"text_content": "",
|
|
"images": [],
|
|
"metadata": {
|
|
"file_type": Path(file_path).suffix.lower(),
|
|
"processing_time": 0,
|
|
"ocr_used": False,
|
|
"classification_used": False,
|
|
"gpu_accelerated": False
|
|
}
|
|
}
|
|
|
|
start_time = asyncio.get_event_loop().time()
|
|
|
|
try:
|
|
# Step 1: Extract text first for all file types
|
|
text_content = await self._extract_text(file_path)
|
|
result["text_content"] = text_content
|
|
result["metadata"]["text_extracted"] = bool(text_content.strip())
|
|
|
|
# Step 2: Extract and process images
|
|
images = await self._extract_images(file_path)
|
|
if images:
|
|
processed_images = await self._process_images(images)
|
|
result["images"] = processed_images
|
|
result["metadata"]["images_processed"] = len(images)
|
|
|
|
# Step 2.5: Append image classification results to text content
|
|
classification_text = self._build_classification_text(processed_images)
|
|
if classification_text:
|
|
result["text_content"] += "\n\n" + classification_text
|
|
logger.info(f"📸 Added image classification metadata to text content")
|
|
|
|
# Step 3: Update metadata
|
|
processing_time = asyncio.get_event_loop().time() - start_time
|
|
result["metadata"]["processing_time"] = processing_time
|
|
result["metadata"]["ocr_used"] = self.ocr_processor is not None
|
|
result["metadata"]["classification_used"] = self.classifier_client is not None
|
|
result["metadata"]["gpu_accelerated"] = True # Both use GPU when available
|
|
|
|
result["success"] = True
|
|
logger.info(f"✅ Document processing completed in {processing_time:.2f}s")
|
|
|
|
except Exception as e:
|
|
result["success"] = False
|
|
result["metadata"]["error"] = str(e)
|
|
logger.error(f"❌ Document processing failed: {e}")
|
|
|
|
return result
|
|
|
|
async def _extract_text(self, file_path: str) -> str:
|
|
"""Extract text from document using appropriate method"""
|
|
file_ext = Path(file_path).suffix.lower()
|
|
|
|
if file_ext in ['.txt']:
|
|
# Simple text file
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
return f.read()
|
|
|
|
elif file_ext in ['.pdf']:
|
|
# PDF file - extract text directly
|
|
try:
|
|
import PyPDF2
|
|
with open(file_path, 'rb') as f:
|
|
pdf_reader = PyPDF2.PdfReader(f)
|
|
text = ""
|
|
for page in pdf_reader.pages:
|
|
text += page.extract_text() + "\n"
|
|
return text
|
|
except Exception as e:
|
|
logger.warning(f"PDF text extraction failed, will use OCR: {e}")
|
|
return ""
|
|
|
|
elif file_ext in ['.docx']:
|
|
# Word document - extract text directly
|
|
try:
|
|
from docx import Document
|
|
doc = Document(file_path)
|
|
text = ""
|
|
for paragraph in doc.paragraphs:
|
|
text += paragraph.text + "\n"
|
|
return text
|
|
except Exception as e:
|
|
logger.warning(f"DOCX text extraction failed, will use OCR: {e}")
|
|
return ""
|
|
|
|
else:
|
|
# Unknown file type, try OCR
|
|
logger.info(f"Unknown file type {file_ext}, using OCR")
|
|
return ""
|
|
|
|
async def _extract_images(self, file_path: str) -> List[str]:
|
|
"""Extract images from document"""
|
|
file_ext = Path(file_path).suffix.lower()
|
|
output_dir = "extracted_images"
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
if file_ext in ['.pdf']:
|
|
# Extract images from PDF
|
|
try:
|
|
from pdf2image import convert_from_path
|
|
images = convert_from_path(file_path)
|
|
image_paths = []
|
|
for i, image in enumerate(images):
|
|
img_path = os.path.join(output_dir, f"pdf_page_{i+1}.png")
|
|
image.save(img_path, 'PNG')
|
|
image_paths.append(img_path)
|
|
return image_paths
|
|
except Exception as e:
|
|
logger.warning(f"PDF image extraction failed: {e}")
|
|
return []
|
|
|
|
elif file_ext in ['.docx']:
|
|
# Extract images from Word document
|
|
try:
|
|
from word_image_extractor import extract_images_from_docx
|
|
return extract_images_from_docx(file_path, output_dir)
|
|
except Exception as e:
|
|
logger.warning(f"DOCX image extraction failed: {e}")
|
|
return []
|
|
|
|
elif file_ext in ['.png', '.jpg', '.jpeg', '.bmp', '.tiff']:
|
|
# Single image file
|
|
return [file_path]
|
|
|
|
else:
|
|
# No images for other file types
|
|
return []
|
|
|
|
async def _process_images(self, image_paths: List[str]) -> List[Dict[str, Any]]:
|
|
"""Process images with conditional classification: OCR-first, classification only if no text"""
|
|
processed_images = []
|
|
|
|
for img_path in image_paths:
|
|
image_data = {
|
|
"path": img_path,
|
|
"ocr_text": "",
|
|
"ocr_confidence": 0.0,
|
|
"classification": [],
|
|
"processing_notes": ""
|
|
}
|
|
|
|
# Step 1: OCR text extraction (GPU mode)
|
|
if self.ocr_processor:
|
|
try:
|
|
ocr_result = await asyncio.to_thread(self.ocr_processor.extract_text_from_image, img_path)
|
|
if ocr_result.get("text", "").strip():
|
|
image_data["ocr_text"] = ocr_result.get("text", "")
|
|
image_data["ocr_confidence"] = ocr_result.get("confidence", 0.0)
|
|
image_data["processing_notes"] = "Text extracted via OCR"
|
|
logger.info(f"📝 OCR extracted {len(image_data['ocr_text'])} chars from {os.path.basename(img_path)}")
|
|
else:
|
|
# No text detected - proceed to classification
|
|
image_data["processing_notes"] = "No text detected, proceeding to classification"
|
|
logger.info(f"🖼️ No text in {os.path.basename(img_path)}, using classification")
|
|
except Exception as e:
|
|
logger.warning(f"OCR failed for {img_path}: {e}")
|
|
image_data["processing_notes"] = f"OCR failed: {str(e)}"
|
|
|
|
# Step 2: Image classification (GPU mode) - ONLY if no text was detected
|
|
if self.classifier_client and not image_data["ocr_text"].strip():
|
|
try:
|
|
classification_results = self.classifier_client.classify_image(img_path)
|
|
image_data["classification"] = classification_results
|
|
if classification_results:
|
|
image_data["processing_notes"] += " | Classified via OpenCLIP"
|
|
logger.info(f"🔍 Classified {os.path.basename(img_path)}: {classification_results[0]['label']}")
|
|
except Exception as e:
|
|
logger.warning(f"Classification failed for {img_path}: {e}")
|
|
image_data["processing_notes"] += f" | Classification failed: {str(e)}"
|
|
|
|
processed_images.append(image_data)
|
|
|
|
return processed_images
|
|
|
|
def _build_classification_text(self, processed_images: List[Dict[str, Any]]) -> str:
|
|
"""Build text representation of image classification results for indexing"""
|
|
classification_lines = []
|
|
|
|
for i, img_data in enumerate(processed_images):
|
|
if img_data.get("classification"):
|
|
# Add image classification labels to the text content
|
|
top_classification = img_data["classification"][0] # Get highest confidence result
|
|
label = top_classification["label"]
|
|
confidence = top_classification["confidence"]
|
|
|
|
classification_lines.append(f"Image {i+1}: {label} (confidence: {confidence:.1%})")
|
|
|
|
# Add additional classifications if they have high confidence
|
|
for j, cls in enumerate(img_data["classification"][1:4]): # Next top 3
|
|
if cls["confidence"] > 0.1: # Only include if confidence > 10%
|
|
classification_lines.append(f" Also: {cls['label']} (confidence: {cls['confidence']:.1%})")
|
|
|
|
if classification_lines:
|
|
return "Image Classifications:\n" + "\n".join(classification_lines)
|
|
return ""
|
|
|
|
|
|
async def test_optimized_processor():
|
|
"""Test the optimized document processor"""
|
|
print("🧪 TESTING OPTIMIZED DOCUMENT PROCESSOR")
|
|
print("=" * 50)
|
|
|
|
processor = OptimizedDocumentProcessor()
|
|
|
|
# Test with test.docx
|
|
test_file = "test.docx"
|
|
if not os.path.exists(test_file):
|
|
print(f"❌ Test file not found: {test_file}")
|
|
return
|
|
|
|
print(f"📄 Processing: {test_file}")
|
|
result = await processor.process_document(test_file)
|
|
|
|
print(f"\n📊 PROCESSING RESULTS:")
|
|
print(f" Success: {'✅' if result['success'] else '❌'}")
|
|
print(f" Processing Time: {result['metadata']['processing_time']:.2f}s")
|
|
print(f" Text Extracted: {result['metadata']['text_extracted']}")
|
|
print(f" Images Processed: {result['metadata'].get('images_processed', 0)}")
|
|
print(f" OCR Used: {result['metadata']['ocr_used']}")
|
|
print(f" Classification Used: {result['metadata']['classification_used']}")
|
|
print(f" GPU Accelerated: {result['metadata']['gpu_accelerated']}")
|
|
|
|
if result["text_content"]:
|
|
print(f"\n📝 TEXT CONTENT (first 500 chars):")
|
|
print(result["text_content"][:500] + "..." if len(result["text_content"]) > 500 else result["text_content"])
|
|
|
|
if result["images"]:
|
|
print(f"\n🖼️ IMAGE PROCESSING RESULTS:")
|
|
for i, img in enumerate(result["images"]):
|
|
print(f" Image {i+1}:")
|
|
if img["ocr_text"]:
|
|
print(f" OCR: {len(img['ocr_text'])} chars, confidence: {img['ocr_confidence']:.3f}")
|
|
if img["classification"]:
|
|
top_result = img["classification"][0]
|
|
print(f" Classification: {top_result['label']} (confidence: {top_result['confidence']:.3f})")
|
|
|
|
# Check for bee detection
|
|
if "bee" in top_result["label"].lower():
|
|
print(f" 🎯 BEE DETECTED WITH {top_result['confidence']:.1%} CONFIDENCE!")
|
|
|
|
# Performance summary
|
|
print(f"\n⚡ PERFORMANCE SUMMARY:")
|
|
print(f" Total processing time: {result['metadata']['processing_time']:.2f}s")
|
|
if result["metadata"].get('images_processed', 0) > 0:
|
|
per_image_time = result['metadata']['processing_time'] / result['metadata']['images_processed']
|
|
print(f" Per image processing time: {per_image_time:.3f}s")
|
|
|
|
return result
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(test_optimized_processor()) |