558 lines
21 KiB
Python
558 lines
21 KiB
Python
"""
|
|
Optimized Document Processor with Async Pipeline and Batch OCR
|
|
Replaces the sequential processing with parallel pipeline stages
|
|
"""
|
|
|
|
import os
|
|
import logging
|
|
import asyncio
|
|
import concurrent.futures
|
|
from typing import Dict, List, Any, Optional, Tuple
|
|
from dataclasses import dataclass
|
|
import tempfile
|
|
from pathlib import Path
|
|
import time
|
|
from collections import defaultdict
|
|
|
|
# Import required libraries
|
|
import fitz # PyMuPDF
|
|
import docx
|
|
import openpyxl
|
|
from pptx import Presentation
|
|
from bs4 import BeautifulSoup
|
|
import pandas as pd
|
|
|
|
from .optimized_ocr_processor import OptimizedOCRProcessor, BatchOCRResult
|
|
from .production_config import get_config
|
|
|
|
# Configure logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class ProcessingResult:
|
|
"""Result of document processing"""
|
|
success: bool
|
|
content: str
|
|
metadata: Dict[str, Any]
|
|
error: Optional[str] = None
|
|
tables: List[Dict[str, Any]] = None
|
|
images: List[Dict[str, Any]] = None
|
|
processing_time: float = 0.0
|
|
|
|
|
|
class AsyncDocumentProcessor:
|
|
"""
|
|
Async document processor with parallel pipeline stages and batch OCR
|
|
"""
|
|
|
|
def __init__(self, batch_size: int = 4, max_workers: int = 2):
|
|
"""
|
|
Initialize async document processor
|
|
|
|
Args:
|
|
batch_size: Number of images to process in each OCR batch
|
|
max_workers: Maximum number of parallel workers
|
|
"""
|
|
self.config = get_config()
|
|
self.batch_size = batch_size
|
|
self.max_workers = max_workers
|
|
|
|
# Initialize optimized OCR processor
|
|
self.ocr_processor = OptimizedOCRProcessor(
|
|
use_gpu=self.config.performance.USE_GPU,
|
|
languages=self.config.document_processing.OCR_LANGUAGES,
|
|
batch_size=batch_size,
|
|
max_workers=max_workers
|
|
)
|
|
|
|
self.supported_extensions = self.config.document_processing.SUPPORTED_EXTENSIONS
|
|
|
|
# Initialize image classifier if available
|
|
self.image_classifier = None
|
|
try:
|
|
# Add the workspace directory to path where fast_image_classifier.py is located
|
|
import sys
|
|
workspace_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
if workspace_dir not in sys.path:
|
|
sys.path.insert(0, workspace_dir)
|
|
from fast_image_classifier import get_image_classifier
|
|
self.image_classifier = get_image_classifier()
|
|
logger.info("Image classifier initialized successfully")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to initialize image classifier: {e}")
|
|
|
|
# Performance metrics
|
|
self.metrics = {
|
|
"documents_processed": 0,
|
|
"total_processing_time": 0.0,
|
|
"pages_processed": 0,
|
|
"images_processed": 0,
|
|
"ocr_batches_processed": 0
|
|
}
|
|
|
|
logger.info(f"Async document processor initialized (batch_size: {batch_size}, workers: {max_workers})")
|
|
|
|
async def process_document(self, file_path: str) -> ProcessingResult:
|
|
"""
|
|
Process document with async pipeline
|
|
|
|
Args:
|
|
file_path: Path to document file
|
|
|
|
Returns:
|
|
ProcessingResult object
|
|
"""
|
|
start_time = time.time()
|
|
file_path = Path(file_path)
|
|
|
|
if not file_path.exists():
|
|
return ProcessingResult(
|
|
success=False,
|
|
content="",
|
|
metadata={"error": "File not found"},
|
|
error="File not found",
|
|
processing_time=time.time() - start_time
|
|
)
|
|
|
|
# Determine file type and process accordingly
|
|
extension = file_path.suffix.lower()
|
|
|
|
try:
|
|
if extension in ['.pdf']:
|
|
result = await self._process_pdf_async(file_path)
|
|
elif extension in ['.doc', '.docx']:
|
|
result = await self._process_word_async(file_path)
|
|
elif extension in ['.xls', '.xlsx']:
|
|
result = await self._process_excel_async(file_path)
|
|
elif extension in ['.ppt', '.pptx']:
|
|
result = await self._process_powerpoint_async(file_path)
|
|
elif extension in ['.txt', '.csv', '.html']:
|
|
result = await self._process_text_async(file_path)
|
|
elif extension in ['.jpg', '.jpeg', '.png', '.tiff', '.bmp', '.gif']:
|
|
result = await self._process_image_async(file_path)
|
|
else:
|
|
result = ProcessingResult(
|
|
success=False,
|
|
content="",
|
|
metadata={"error": f"Unsupported file type: {extension}"},
|
|
error=f"Unsupported file type: {extension}",
|
|
processing_time=time.time() - start_time
|
|
)
|
|
|
|
# Update metrics
|
|
processing_time = time.time() - start_time
|
|
result.processing_time = processing_time
|
|
self.metrics["documents_processed"] += 1
|
|
self.metrics["total_processing_time"] += processing_time
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing {file_path}: {e}")
|
|
processing_time = time.time() - start_time
|
|
return ProcessingResult(
|
|
success=False,
|
|
content="",
|
|
metadata={"error": str(e)},
|
|
error=str(e),
|
|
processing_time=processing_time
|
|
)
|
|
|
|
async def _process_pdf_async(self, file_path: Path) -> ProcessingResult:
|
|
"""
|
|
Process PDF files with async pipeline
|
|
|
|
Args:
|
|
file_path: Path to PDF file
|
|
|
|
Returns:
|
|
ProcessingResult object
|
|
"""
|
|
pdf_document = None
|
|
try:
|
|
# Open PDF
|
|
pdf_document = fitz.open(str(file_path))
|
|
total_pages = len(pdf_document)
|
|
|
|
# Create async tasks for each page
|
|
page_tasks = []
|
|
for page_num in range(total_pages):
|
|
task = self._process_pdf_page_async(pdf_document[page_num], page_num)
|
|
page_tasks.append(task)
|
|
|
|
# Process pages in parallel
|
|
page_results = await asyncio.gather(*page_tasks, return_exceptions=True)
|
|
|
|
# Combine results
|
|
content_parts = []
|
|
tables = []
|
|
images = []
|
|
processed_with_ocr = False
|
|
|
|
for i, result in enumerate(page_results):
|
|
if isinstance(result, Exception):
|
|
logger.error(f"Error processing page {i}: {result}")
|
|
content_parts.append(f"Page {i + 1}: [Processing error: {str(result)}]")
|
|
else:
|
|
page_content, page_tables, page_images, used_ocr = result
|
|
content_parts.append(page_content)
|
|
tables.extend(page_tables)
|
|
images.extend(page_images)
|
|
if used_ocr:
|
|
processed_with_ocr = True
|
|
|
|
full_content = "\n\n".join(content_parts)
|
|
|
|
# Update metrics
|
|
self.metrics["pages_processed"] += total_pages
|
|
|
|
return ProcessingResult(
|
|
success=True,
|
|
content=full_content,
|
|
metadata={
|
|
"pages": total_pages,
|
|
"file_type": "pdf",
|
|
"processed_with_ocr": processed_with_ocr
|
|
},
|
|
tables=tables,
|
|
images=images
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"PDF processing failed: {e}")
|
|
raise
|
|
finally:
|
|
if pdf_document:
|
|
pdf_document.close()
|
|
|
|
async def _process_pdf_page_async(self, page, page_num: int) -> Tuple[str, List, List, bool]:
|
|
"""
|
|
Process a single PDF page asynchronously
|
|
|
|
Args:
|
|
page: PDF page object
|
|
page_num: Page number (0-indexed)
|
|
|
|
Returns:
|
|
Tuple of (content, tables, images, used_ocr)
|
|
"""
|
|
try:
|
|
# Try text extraction first
|
|
text = page.get_text()
|
|
if text.strip():
|
|
return f"Page {page_num + 1}:\n{text}", [], [], False
|
|
|
|
# Fall back to OCR for scanned pages
|
|
logger.info(f"Page {page_num + 1} has no text, using high-resolution OCR")
|
|
|
|
# Use higher resolution for better OCR accuracy
|
|
mat = fitz.Matrix(2, 2) # 2x resolution
|
|
pix = page.get_pixmap(matrix=mat)
|
|
img_data = pix.tobytes("png")
|
|
|
|
# Save temporary image for OCR
|
|
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as temp_file:
|
|
temp_file.write(img_data)
|
|
temp_path = temp_file.name
|
|
|
|
try:
|
|
if self.ocr_processor.ocr_available:
|
|
# Use async OCR
|
|
ocr_results = await self.ocr_processor.extract_text_from_images_batch_async([temp_path])
|
|
|
|
if ocr_results and ocr_results[0].text.strip():
|
|
content = f"Page {page_num + 1} (OCR):\n{ocr_results[0].text}"
|
|
|
|
# Extract tables from OCR
|
|
ocr_tables = self.ocr_processor.extract_tables_from_image(temp_path)
|
|
|
|
# Create image metadata
|
|
images = [{
|
|
"path": temp_path,
|
|
"index": page_num,
|
|
"ocr_text": ocr_results[0].text,
|
|
"ocr_confidence": ocr_results[0].confidence
|
|
}]
|
|
|
|
return content, ocr_tables, images, True
|
|
else:
|
|
return f"Page {page_num + 1}: [Scanned content - no text detected by OCR]", [], [], True
|
|
else:
|
|
return f"Page {page_num + 1}: [Image content - OCR not available]", [], [], False
|
|
|
|
finally:
|
|
os.unlink(temp_path)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing PDF page {page_num + 1}: {e}")
|
|
return f"Page {page_num + 1}: [Processing error: {str(e)}]", [], [], False
|
|
|
|
async def _process_word_async(self, file_path: Path) -> ProcessingResult:
|
|
"""
|
|
Process Word documents asynchronously
|
|
|
|
Args:
|
|
file_path: Path to Word document
|
|
|
|
Returns:
|
|
ProcessingResult object
|
|
"""
|
|
try:
|
|
doc = docx.Document(str(file_path))
|
|
|
|
# Extract text from paragraphs
|
|
content_parts = []
|
|
for para in doc.paragraphs:
|
|
if para.text.strip():
|
|
content_parts.append(para.text)
|
|
|
|
# Extract tables
|
|
tables = []
|
|
for table in doc.tables:
|
|
table_data = []
|
|
for row in table.rows:
|
|
row_data = [cell.text for cell in row.cells]
|
|
table_data.append(row_data)
|
|
|
|
if table_data:
|
|
tables.append({
|
|
"data": table_data,
|
|
"rows": len(table_data),
|
|
"columns": max(len(row) for row in table_data) if table_data else 0
|
|
})
|
|
|
|
# Extract and process images asynchronously
|
|
images = await self._extract_word_images_async(file_path)
|
|
|
|
# Add image content to text
|
|
for img in images:
|
|
if "ocr_text" in img:
|
|
content_parts.append(f"[Image {img['index'] + 1} OCR Text]: {img['ocr_text']}")
|
|
elif "primary_classification" in img:
|
|
content_parts.append(f"[Image {img['index'] + 1} Classification]: {img['primary_classification']}")
|
|
|
|
full_content = "\n".join(content_parts)
|
|
|
|
return ProcessingResult(
|
|
success=True,
|
|
content=full_content,
|
|
metadata={
|
|
"file_type": "word",
|
|
"paragraphs": len([p for p in content_parts if not p.startswith('[')]),
|
|
"tables_count": len(tables),
|
|
"images_count": len(images)
|
|
},
|
|
tables=tables,
|
|
images=images
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Word document processing failed: {e}")
|
|
raise
|
|
|
|
async def _extract_word_images_async(self, file_path: Path) -> List[Dict[str, Any]]:
|
|
"""
|
|
Extract and process images from Word document asynchronously
|
|
|
|
Args:
|
|
file_path: Path to Word document
|
|
|
|
Returns:
|
|
List of image metadata dictionaries
|
|
"""
|
|
images = []
|
|
|
|
try:
|
|
import zipfile
|
|
import os
|
|
|
|
# Create temporary directory for extracted images
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
# Extract images from docx using zipfile
|
|
with zipfile.ZipFile(str(file_path), 'r') as zip_ref:
|
|
image_files = []
|
|
for file_info in zip_ref.filelist:
|
|
if file_info.filename.startswith('word/media/'):
|
|
# Extract the image
|
|
image_filename = os.path.basename(file_info.filename)
|
|
image_path = os.path.join(temp_dir, image_filename)
|
|
|
|
# Extract and save
|
|
with zip_ref.open(file_info.filename) as source, open(image_path, 'wb') as target:
|
|
target.write(source.read())
|
|
|
|
image_files.append((len(image_files), image_path))
|
|
logger.info(f"Extracted image: {image_path}")
|
|
|
|
if image_files:
|
|
logger.info(f"Found {len(image_files)} images in Word document")
|
|
|
|
# Process images in batches
|
|
for batch_start in range(0, len(image_files), self.batch_size):
|
|
batch = image_files[batch_start:batch_start + self.batch_size]
|
|
|
|
# Prepare batch for OCR
|
|
image_paths = [path for _, path in batch]
|
|
indices = [idx for idx, _ in batch]
|
|
|
|
# Process batch with OCR
|
|
if self.ocr_processor.ocr_available:
|
|
ocr_results = await self.ocr_processor.extract_text_from_images_batch_async(image_paths)
|
|
|
|
for i, (idx, image_path) in enumerate(batch):
|
|
if i < len(ocr_results):
|
|
ocr_result = ocr_results[i]
|
|
image_metadata = {
|
|
"path": image_path,
|
|
"index": idx,
|
|
"ocr_text": ocr_result.text,
|
|
"ocr_confidence": ocr_result.confidence
|
|
}
|
|
|
|
# Only classify if OCR found no text
|
|
if not ocr_result.text.strip() and self.image_classifier:
|
|
try:
|
|
classification_results = self.image_classifier.classify_image(image_path, top_k=3)
|
|
image_metadata["classification"] = classification_results
|
|
if classification_results:
|
|
image_metadata["primary_classification"] = classification_results[0]["label"]
|
|
except Exception as classify_error:
|
|
logger.error(f"Image classification failed: {classify_error}")
|
|
|
|
images.append(image_metadata)
|
|
|
|
# Update metrics
|
|
self.metrics["images_processed"] += len(batch)
|
|
self.metrics["ocr_batches_processed"] += 1
|
|
|
|
return images
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Image extraction from Word document failed: {e}")
|
|
return images
|
|
|
|
async def _process_excel_async(self, file_path: Path) -> ProcessingResult:
|
|
"""
|
|
Process Excel files asynchronously
|
|
|
|
Args:
|
|
file_path: Path to Excel file
|
|
|
|
Returns:
|
|
ProcessingResult object
|
|
"""
|
|
try:
|
|
workbook = openpyxl.load_workbook(str(file_path))
|
|
content_parts = []
|
|
tables = []
|
|
|
|
for sheet_name in workbook.sheetnames:
|
|
sheet = workbook[sheet_name]
|
|
content_parts.append(f"Sheet: {sheet_name}")
|
|
|
|
# Extract data from cells
|
|
sheet_data = []
|
|
for row in sheet.iter_rows(values_only=True):
|
|
if any(cell is not None for cell in row):
|
|
sheet_data.append([str(cell) if cell is not None else "" for cell in row])
|
|
|
|
if sheet_data:
|
|
tables.append({
|
|
"data": sheet_data,
|
|
"sheet": sheet_name,
|
|
"rows": len(sheet_data),
|
|
"columns": max(len(row) for row in sheet_data) if sheet_data else 0
|
|
})
|
|
|
|
# Add sample content (first few rows)
|
|
sample_rows = min(5, len(sheet_data))
|
|
for i in range(sample_rows):
|
|
content_parts.append(" | ".join(sheet_data[i]))
|
|
|
|
workbook.close()
|
|
full_content = "\n".join(content_parts)
|
|
|
|
return ProcessingResult(
|
|
success=True,
|
|
content=full_content,
|
|
metadata={
|
|
"file_type": "excel",
|
|
"sheets": len(workbook.sheetnames),
|
|
"tables_count": len(tables)
|
|
},
|
|
tables=tables
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Excel processing failed: {e}")
|
|
raise
|
|
|
|
async def _process_powerpoint_async(self, file_path: Path) -> ProcessingResult:
|
|
"""
|
|
Process PowerPoint presentations asynchronously
|
|
|
|
Args:
|
|
file_path: Path to PowerPoint file
|
|
|
|
Returns:
|
|
ProcessingResult object
|
|
"""
|
|
try:
|
|
presentation = Presentation(str(file_path))
|
|
content_parts = []
|
|
|
|
for i, slide in enumerate(presentation.slides):
|
|
content_parts.append(f"Slide {i + 1}:")
|
|
|
|
# Extract text from slide shapes
|
|
slide_text = []
|
|
for shape in slide.shapes:
|
|
if hasattr(shape, "text") and shape.text.strip():
|
|
slide_text.append(shape.text)
|
|
|
|
if slide_text:
|
|
content_parts.extend(slide_text)
|
|
content_parts.append("") # Empty line between slides
|
|
|
|
full_content = "\n".join(content_parts)
|
|
|
|
return ProcessingResult(
|
|
success=True,
|
|
content=full_content,
|
|
metadata={
|
|
"file_type": "powerpoint",
|
|
"slides": len(presentation.slides)
|
|
}
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"PowerPoint processing failed: {e}")
|
|
raise
|
|
|
|
async def _process_text_async(self, file_path: Path) -> ProcessingResult:
|
|
"""
|
|
Process text-based files asynchronously
|
|
|
|
Args:
|
|
file_path: Path to text file
|
|
|
|
Returns:
|
|
ProcessingResult object
|
|
"""
|
|
try:
|
|
extension = file_path.suffix.lower()
|
|
|
|
if extension == '.csv':
|
|
# Process CSV with pandas
|
|
df = pd.read_csv(file_path)
|
|
content = df.to_string(index=False)
|
|
tables = [{
|
|
"data": df.values.tolist(),
|
|
"columns": df.columns.tolist(),
|
|
"rows": len(df),
|
|
"columns_count": len(df.columns)
|
|
}]
|
|
|
|
return ProcessingResult
|